首页 > 代码库 > spark机器学习-第3章

spark机器学习-第3章

1.安装工具ipython
https://www.continuum.io/downloads
选择自己需要的版本
 
2.安装过程
(1)赋权限
chmod u+x ./Anaconda2-4.2.0-Linux-x86_64.sh
 
(2)回车
[root@hadoop161 tool]# ./Anaconda2-4.2.0-Linux-x86_64.sh
 
Welcome to Anaconda2 4.2.0 (by Continuum Analytics, Inc.)
 
In order to continue the installation process, please review the license
agreement.
Please, press ENTER to continue
>>>
================
Anaconda License
================
 
Copyright 2016, Continuum Analytics, Inc.
 
All rights reserved under the 3-clause BSD License:
 
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
 
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
 
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
 
* Neither the name of Continuum Analytics, Inc. nor the names of its
contributors may be used to endorse or promote products derived from this
software without specific prior written permission.
 
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL CONTINUUM ANALYTICS, INC. BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
 
 
Notice of Third Party Software Licenses
=======================================
 
Anaconda contains open source software packages from third parties. These
are available on an "as is" basis and subject to their individual license
agreements. These licenses are available in Anaconda or at
http://docs.continuum.io/anaconda/pkg-docs . Any binary packages of these
third party tools you obtain via Anaconda are subject to their individual
licenses as well as the Anaconda license. Continuum reserves the right to
change which third party tools are provided in Anaconda.
 
In particular, Anaconda contains re-distributable, run-time, shared-library
files from the Intel (TM) Math Kernel Library ("MKL binaries").  You are
specifically authorized to use the MKL binaries with your installation of
Anaconda.  You are also authorized to redistribute the MKL binaries with
Anaconda or in the conda package that contains them.  If needed,
instructions for removing the MKL binaries after installation of Anaconda
are available at http://www.continuum.io.
 
Cryptography Notice
===================
This distribution includes cryptographic software. The country in which you
currently reside may have restrictions on the import, possession, use,
and/or re-export to another country, of encryption software. BEFORE using
any encryption software, please check your country‘s laws, regulations and
policies concerning the import, possession, or use, and re-export of
encryption software, to see if this is permitted. See the Wassenaar
Arrangement <http://www.wassenaar.org/> for more information.
 
Continuum Analytics has self-classified this software as Export Commodity
Control Number (ECCN) 5D002.C.1, which includes information security
software using or performing cryptographic functions with asymmetric
algorithms. The form and manner of this distribution makes it eligible for
export under the License Exception ENC Technology Software Unrestricted
(TSU) exception (see the BIS Export Administration Regulations, Section
740.13) for both object code and source code.
 
The following packages are included in this distribution that relate to
cryptography:
 
openssl
The OpenSSL Project is a collaborative effort to develop a robust,
commercial-grade, full-featured, and Open Source toolkit implementing the
Transport Layer Security (TLS) and Secure Sockets Layer (SSL) protocols as
well as a full-strength general purpose cryptography library.
 
pycrypto
A collection of both secure hash functions (such as SHA256 and RIPEMD160),
and various encryption algorithms (AES, DES, RSA, ElGamal, etc.).
 
pyopenssl
A thin Python wrapper around (a subset of) the OpenSSL library.
 
kerberos (krb5, non-Windows platforms)
A network authentication protocol designed to provide strong authentication
for client/server applications by using secret-key cryptography.
 
cryptography
A Python library which exposes cryptographic recipes and primitives.
 
Do you approve the license terms? [yes|no]
>>>
Please answer ‘yes‘ or ‘no‘:
 
>>> yes
 
Anaconda2 will now be installed into this location:
/root/anaconda2
 
  - Press ENTER to confirm the location
  - Press CTRL-C to abort the installation
  - Or specify a different location below
 
[/root/anaconda2] >>>
PREFIX=/root/anaconda2
installing: python-2.7.12-1 ...
installing: _license-1.1-py27_1 ...
installing: _nb_ext_conf-0.3.0-py27_0 ...
installing: alabaster-0.7.9-py27_0 ...
installing: anaconda-clean-1.0.0-py27_0 ...
installing: anaconda-client-1.5.1-py27_0 ...
installing: anaconda-navigator-1.3.1-py27_0 ...
installing: argcomplete-1.0.0-py27_1 ...
installing: astroid-1.4.7-py27_0 ...
installing: astropy-1.2.1-np111py27_0 ...
installing: babel-2.3.4-py27_0 ...
installing: backports-1.0-py27_0 ...
installing: backports_abc-0.4-py27_0 ...
installing: beautifulsoup4-4.5.1-py27_0 ...
installing: bitarray-0.8.1-py27_0 ...
installing: blaze-0.10.1-py27_0 ...
installing: bokeh-0.12.2-py27_0 ...
installing: boto-2.42.0-py27_0 ...
installing: bottleneck-1.1.0-np111py27_0 ...
installing: cairo-1.12.18-6 ...
installing: cdecimal-2.3-py27_2 ...
installing: cffi-1.7.0-py27_0 ...
installing: chest-0.2.3-py27_0 ...
installing: click-6.6-py27_0 ...
installing: cloudpickle-0.2.1-py27_0 ...
installing: clyent-1.2.2-py27_0 ...
installing: colorama-0.3.7-py27_0 ...
installing: configobj-5.0.6-py27_0 ...
installing: configparser-3.5.0-py27_0 ...
installing: contextlib2-0.5.3-py27_0 ...
installing: cryptography-1.5-py27_0 ...
installing: curl-7.49.0-1 ...
installing: cycler-0.10.0-py27_0 ...
installing: cython-0.24.1-py27_0 ...
installing: cytoolz-0.8.0-py27_0 ...
installing: dask-0.11.0-py27_0 ...
installing: datashape-0.5.2-py27_0 ...
installing: dbus-1.10.10-0 ...
installing: decorator-4.0.10-py27_0 ...
installing: dill-0.2.5-py27_0 ...
installing: docutils-0.12-py27_2 ...
installing: dynd-python-0.7.2-py27_0 ...
installing: entrypoints-0.2.2-py27_0 ...
installing: enum34-1.1.6-py27_0 ...
installing: et_xmlfile-1.0.1-py27_0 ...
installing: expat-2.1.0-0 ...
installing: fastcache-1.0.2-py27_1 ...
installing: filelock-2.0.6-py27_0 ...
installing: flask-0.11.1-py27_0 ...
installing: flask-cors-2.1.2-py27_0 ...
installing: fontconfig-2.11.1-6 ...
installing: freetype-2.5.5-1 ...
installing: funcsigs-1.0.2-py27_0 ...
installing: functools32-3.2.3.2-py27_0 ...
installing: futures-3.0.5-py27_0 ...
installing: get_terminal_size-1.0.0-py27_0 ...
installing: gevent-1.1.2-py27_0 ...
installing: glib-2.43.0-1 ...
installing: greenlet-0.4.10-py27_0 ...
installing: grin-1.2.1-py27_3 ...
installing: gst-plugins-base-1.8.0-0 ...
installing: gstreamer-1.8.0-0 ...
installing: h5py-2.6.0-np111py27_2 ...
installing: harfbuzz-0.9.39-1 ...
installing: hdf5-1.8.17-1 ...
installing: heapdict-1.0.0-py27_1 ...
installing: icu-54.1-0 ...
installing: idna-2.1-py27_0 ...
installing: imagesize-0.7.1-py27_0 ...
installing: ipaddress-1.0.16-py27_0 ...
installing: ipykernel-4.5.0-py27_0 ...
installing: ipython-5.1.0-py27_0 ...
installing: ipython_genutils-0.1.0-py27_0 ...
installing: ipywidgets-5.2.2-py27_0 ...
installing: itsdangerous-0.24-py27_0 ...
installing: jbig-2.1-0 ...
installing: jdcal-1.2-py27_1 ...
installing: jedi-0.9.0-py27_1 ...
installing: jinja2-2.8-py27_1 ...
installing: jpeg-8d-2 ...
installing: jsonschema-2.5.1-py27_0 ...
installing: jupyter-1.0.0-py27_3 ...
installing: jupyter_client-4.4.0-py27_0 ...
installing: jupyter_console-5.0.0-py27_0 ...
installing: jupyter_core-4.2.0-py27_0 ...
installing: lazy-object-proxy-1.2.1-py27_0 ...
installing: libdynd-0.7.2-0 ...
installing: libffi-3.2.1-0 ...
installing: libgcc-4.8.5-2 ...
installing: libgfortran-3.0.0-1 ...
installing: libpng-1.6.22-0 ...
installing: libsodium-1.0.10-0 ...
installing: libtiff-4.0.6-2 ...
installing: libxcb-1.12-0 ...
installing: libxml2-2.9.2-0 ...
installing: libxslt-1.1.28-0 ...
installing: llvmlite-0.13.0-py27_0 ...
installing: locket-0.2.0-py27_1 ...
installing: lxml-3.6.4-py27_0 ...
installing: markupsafe-0.23-py27_2 ...
installing: matplotlib-1.5.3-np111py27_0 ...
installing: mistune-0.7.3-py27_0 ...
installing: mkl-11.3.3-0 ...
installing: mkl-service-1.1.2-py27_2 ...
installing: mpmath-0.19-py27_1 ...
installing: multipledispatch-0.4.8-py27_0 ...
installing: nb_anacondacloud-1.2.0-py27_0 ...
installing: nb_conda-2.0.0-py27_0 ...
installing: nb_conda_kernels-2.0.0-py27_0 ...
installing: nbconvert-4.2.0-py27_0 ...
installing: nbformat-4.1.0-py27_0 ...
installing: nbpresent-3.0.2-py27_0 ...
installing: networkx-1.11-py27_0 ...
installing: nltk-3.2.1-py27_0 ...
installing: nose-1.3.7-py27_1 ...
installing: notebook-4.2.3-py27_0 ...
installing: numba-0.28.1-np111py27_0 ...
installing: numexpr-2.6.1-np111py27_0 ...
installing: numpy-1.11.1-py27_0 ...
installing: odo-0.5.0-py27_1 ...
installing: openpyxl-2.3.2-py27_0 ...
installing: openssl-1.0.2j-0 ...
installing: pandas-0.18.1-np111py27_0 ...
installing: partd-0.3.6-py27_0 ...
installing: patchelf-0.9-0 ...
installing: path.py-8.2.1-py27_0 ...
installing: pathlib2-2.1.0-py27_0 ...
installing: patsy-0.4.1-py27_0 ...
installing: pep8-1.7.0-py27_0 ...
installing: pexpect-4.0.1-py27_0 ...
installing: pickleshare-0.7.4-py27_0 ...
installing: pillow-3.3.1-py27_0 ...
installing: pip-8.1.2-py27_0 ...
installing: pixman-0.32.6-0 ...
installing: pkginfo-1.3.2-py27_0 ...
installing: ply-3.9-py27_0 ...
installing: prompt_toolkit-1.0.3-py27_0 ...
installing: psutil-4.3.1-py27_0 ...
installing: ptyprocess-0.5.1-py27_0 ...
installing: py-1.4.31-py27_0 ...
installing: pyasn1-0.1.9-py27_0 ...
installing: pycairo-1.10.0-py27_0 ...
installing: pycosat-0.6.1-py27_1 ...
installing: pycparser-2.14-py27_1 ...
installing: pycrypto-2.6.1-py27_4 ...
installing: pycurl-7.43.0-py27_0 ...
installing: pyflakes-1.3.0-py27_0 ...
installing: pygments-2.1.3-py27_0 ...
installing: pylint-1.5.4-py27_1 ...
installing: pyopenssl-16.0.0-py27_0 ...
installing: pyparsing-2.1.4-py27_0 ...
installing: pyqt-5.6.0-py27_0 ...
installing: pytables-3.2.3.1-np111py27_0 ...
installing: pytest-2.9.2-py27_0 ...
installing: python-dateutil-2.5.3-py27_0 ...
installing: pytz-2016.6.1-py27_0 ...
installing: pyyaml-3.12-py27_0 ...
installing: pyzmq-15.4.0-py27_0 ...
installing: qt-5.6.0-0 ...
installing: qtawesome-0.3.3-py27_0 ...
installing: qtconsole-4.2.1-py27_1 ...
installing: qtpy-1.1.2-py27_0 ...
installing: readline-6.2-2 ...
installing: redis-3.2.0-0 ...
installing: redis-py-2.10.5-py27_0 ...
installing: requests-2.11.1-py27_0 ...
installing: rope-0.9.4-py27_1 ...
installing: scikit-image-0.12.3-np111py27_1 ...
installing: scikit-learn-0.17.1-np111py27_2 ...
installing: scipy-0.18.1-np111py27_0 ...
installing: setuptools-27.2.0-py27_0 ...
installing: simplegeneric-0.8.1-py27_1 ...
installing: singledispatch-3.4.0.3-py27_0 ...
installing: sip-4.18-py27_0 ...
installing: six-1.10.0-py27_0 ...
installing: snowballstemmer-1.2.1-py27_0 ...
installing: sockjs-tornado-1.0.3-py27_0 ...
installing: sphinx-1.4.6-py27_0 ...
installing: spyder-3.0.0-py27_0 ...
installing: sqlalchemy-1.0.13-py27_0 ...
installing: sqlite-3.13.0-0 ...
installing: ssl_match_hostname-3.4.0.2-py27_1 ...
installing: statsmodels-0.6.1-np111py27_1 ...
installing: sympy-1.0-py27_0 ...
installing: terminado-0.6-py27_0 ...
installing: tk-8.5.18-0 ...
installing: toolz-0.8.0-py27_0 ...
installing: tornado-4.4.1-py27_0 ...
installing: traitlets-4.3.0-py27_0 ...
installing: unicodecsv-0.14.1-py27_0 ...
installing: wcwidth-0.1.7-py27_0 ...
installing: werkzeug-0.11.11-py27_0 ...
installing: wheel-0.29.0-py27_0 ...
installing: widgetsnbextension-1.2.6-py27_0 ...
installing: wrapt-1.10.6-py27_0 ...
installing: xlrd-1.0.0-py27_0 ...
installing: xlsxwriter-0.9.3-py27_0 ...
installing: xlwt-1.1.2-py27_0 ...
installing: xz-5.2.2-0 ...
installing: yaml-0.1.6-0 ...
installing: zeromq-4.1.4-0 ...
installing: zlib-1.2.8-3 ...
installing: anaconda-4.2.0-np111py27_0 ...
installing: ruamel_yaml-0.11.14-py27_0 ...
installing: conda-4.2.9-py27_0 ...
installing: conda-build-2.0.2-py27_0 ...
Python 2.7.12 :: Continuum Analytics, Inc.
creating default environment...
installation finished.
Do you wish the installer to prepend the Anaconda2 install location
to PATH in your /root/.bashrc ? [yes|no]
[no] >>> yes
 
Prepending PATH=/root/anaconda2/bin to PATH in /root/.bashrc
A backup will be made to: /root/.bashrc-anaconda2.bak
 
 
For this change to become active, you have to open a new terminal.
 
Thank you for installing Anaconda2!
 
Share your notebooks and packages on Anaconda Cloud!
Sign up for free: https://anaconda.org
 
[root@hadoop161 tool]#
 
 
3.启动spark客户端shell

 

就是修改 ~/.bashrc 文件,添加以下内容:

export PYSPARK_DRIVER_PYTHON=ipython 
export PYSPARK_DRIVER_PYTHON_OPTS=”notebook”

然后source ~/.bashrc,就可以通过启动 pyspark 来启动 IPython Notebook 了。
4.运行代码
http://www.cnblogs.com/NaughtyBaby/p/5469469.html
 
(1)启动hadoop的hdfs
(2)启动spark,进入spark安装目录
cd  /hadoop/spark-2.0.1-bin-hadoop2.7
 
启动
bin/pyspark
看到下面界面
(3)http://localhost:8889/tree#
技术分享

 

技术分享
选择new ---->python default
技术分享
 技术分享

 

进行命令行
技术分享

 

技术分享
好了,现在开始执行吧!
5.执行书中代码
(1)获取用户第一行数据
本地文件
PATH = "file:///spark/sparkdata"
user_data = http://www.mamicode.com/sc.textFile("%s/ml-100k/u.user" % PATH)
#user_data = http://www.mamicode.com/sc.textFile("file:///spark/sparkdata/ml-100k/u.user")
user_data.first()
 
u‘1|24|M|technician|85711‘
 
hdfs上文件
user_data = http://www.mamicode.com/sc.textFile("/spark/sparkdata/ml-100k/u.user")
user_data.first()
 
(2)统计用户、性别、职业、邮编
user_fields = user_data.map(lambda line: line.split("|"))
num_users = user_fields.map(lambda fields: fields[0]).count()
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
print "Users: %d, genders: %d, occupations: %d, ZIP codes: %d" % (num_users, num_genders, num_occupations, num_zipcodes)
 
Users: 943, genders: 2, occupations: 21, ZIP codes: 795
 
(3)用直方图分析用户年龄
%pylab inline
 
ages = user_fields.map(lambda x: int(x[1])).collect()
hist(ages, bins=20, color=‘lightblue‘, normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)
技术分享
 技术分享

 

(4)用户职业分布情况
count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
x_axis = x_axis1[np.argsort(y_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]
 
pos = np.arange(len(x_axis))
width = 1.0
 
ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
 
plt.bar(pos, y_axis, width, color=‘lightblue‘)
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)
 
技术分享

 

技术分享
(5)countByValue会计算RDD里各不同值分别出现次数
count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()
print "Map-reduce approach:"
print dict(count_by_occupation2)
print ""
print "countByValue approach:"
print dict(count_by_occupation)
输出结果
Map-reduce approach:
{u‘administrator‘: 79, u‘retired‘: 14, u‘lawyer‘: 12, u‘healthcare‘: 16, u‘marketing‘: 26, u‘executive‘: 32, u‘scientist‘: 31, u‘student‘: 196, u‘technician‘: 27, u‘librarian‘: 51, u‘programmer‘: 66, u‘salesman‘: 12, u‘homemaker‘: 7, u‘engineer‘: 67, u‘none‘: 9, u‘doctor‘: 7, u‘writer‘: 45, u‘entertainment‘: 18, u‘other‘: 105, u‘educator‘: 95, u‘artist‘: 28}
 
countByValue approach:
{u‘administrator‘: 79, u‘writer‘: 45, u‘retired‘: 14, u‘lawyer‘: 12, u‘doctor‘: 7, u‘marketing‘: 26, u‘executive‘: 32, u‘none‘: 9, u‘entertainment‘: 18, u‘healthcare‘: 16, u‘scientist‘: 31, u‘student‘: 196, u‘educator‘: 95, u‘technician‘: 27, u‘librarian‘: 51, u‘programmer‘: 66, u‘artist‘: 28, u‘salesman‘: 12, u‘other‘: 105, u‘homemaker‘: 7, u‘engineer‘: 67}
 
(6)获取电影数据中的一行
movie_data = http://www.mamicode.com/sc.textFile("%s/ml-100k/u.item" % PATH)
 
print movie_data.first()
 
num_movies = movie_data.count()
 
print "Movies: %d" % num_movies
 
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
Movies: 1682
 
(7)电影年龄分布图
处理年份异常
def convert_year(x):
    try:
        return int(x[-4:])
    except:
        return 1900 # there is a ‘bad‘ data point with a blank year, which we set to 1900 and will filter out later
 
movie_fields = movie_data.map(lambda lines: lines.split("|"))
years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
 
# we filter out any ‘bad‘ data points here
years_filtered = years.filter(lambda x: x != 1900)
# plot the movie ages histogram
movie_ages = years_filtered.map(lambda yr: 1998-yr).countByValue()
values = movie_ages.values()
bins = movie_ages.keys()
hist(values, bins=bins, color=‘lightblue‘, normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)
 
技术分享
 技术分享

 

(8)评级数据分析
rating_data_raw = sc.textFile("%s/ml-100k/u.data" % PATH)
print rating_data_raw.first()
num_ratings = rating_data_raw.count()
print "Ratings: %d" % num_ratings
评级次数总共10万
196    242    3    881250949
Ratings: 100000
 
(9)基本统计,最高评级,最低评级,
rating_data = http://www.mamicode.com/rating_data_raw.map(lambda line: line.split("\t"))
ratings = rating_data.map(lambda fields: int(fields[2]))
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x, y))
mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
median_rating = np.median(ratings.collect())
ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies
print "Min rating: %d" % min_rating
print "Max rating: %d" % max_rating
print "Average rating: %2.2f" % mean_rating
print "Median rating: %d" % median_rating
print "Average # of ratings per user: %2.2f" % ratings_per_user
print "Average # of ratings per movie: %2.2f" % ratings_per_movie
 
结果:
Min rating: 1
Max rating: 5
Average rating: 3.53
Median rating: 4
Average # of ratings per user: 106.00
Average # of ratings per movie: 59.00
 
 
spark中类似函数states
# we can also use the stats function to get some similar information to the above
ratings.stats()
 
(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0, min: 1.0)
 
(10)评级分布直方图
# create plot of counts by rating value
count_by_rating = ratings.countByValue()
x_axis = np.array(count_by_rating.keys())
y_axis = np.array([float(c) for c in count_by_rating.values()])
# we normalize the y-axis here to percentages
y_axis_normed = y_axis / y_axis.sum()
 
pos = np.arange(len(x_axis))
width = 1.0
 
ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
 
plt.bar(pos, y_axis_normed, width, color=‘lightblue‘)
plt.xticks(rotation=30)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16, 10)
 
结果
技术分享
 技术分享

 

(11)计算各用户评级次数
# to compute the distribution of ratings per user, we first group the ratings by user id
user_ratings_grouped = rating_data.map(lambda fields: (int(fields[0]), int(fields[2]))).    groupByKey() 
# then, for each key (user id), we find the size of the set of ratings, which gives us the # ratings for that user 
user_ratings_byuser = user_ratings_grouped.map(lambda (k, v): (k, len(v)))
user_ratings_byuser.take(5)
 
[(2, 62), (4, 24), (6, 211), (8, 59), (10, 184)]
 
(12)用户评级分布直方图
# and finally plot the histogram
user_ratings_byuser_local = user_ratings_byuser.map(lambda (k, v): v).collect()
hist(user_ratings_byuser_local, bins=200, color=‘lightblue‘, normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(16,10)
 技术分享

 

技术分享
 
3.3 处理和转换数据
1.处理方法如下
(1)过滤掉或删除非规整或有值缺失的数据:者通常是必须的,但是会失去这些数据中好的信息。
(2)填充非规整或缺失的数据
(3)对异常值做鲁棒性:如统计技术(鲁棒回归)
(4)对可能的异常进行转换
 
mean_year = np.mean(years_pre_processed_array[years_pre_processed_array!=1900])
median_year = np.median(years_pre_processed_array[years_pre_processed_array!=1900])
idx_bad_data = http://www.mamicode.com/np.where(years_pre_processed_array==1900)[0]
years_pre_processed_array[idx_bad_data] = median_year
print "Mean year of release: %d" % mean_year
print "Median year of release: %d" % median_year
print "Index of ‘1900‘ after assigning median: %s" % np.where(years_pre_processed_array == 1900)[0]
 
Mean year of release: 1989
Median year of release: 1995
Index of ‘1900‘ after assigning median: []
 
3.4从数据中提取特征
1.特征:指那些用于模型训练的变量。几乎所有的机器学习模型都是与用向量表示的数值特征打交道;所以需要将原始数据转换为数值
 
2.特征概况
(1)数值特征(numerical feature):实数、整数
(2)类别特征(categorical feature):例如用户性别,职业,字典值
(3)文本特征(text feature):它们派生自数据中的文本内容,比如电影名、描述、评论
(4)其它特征:比如图像、视频、音频、经纬度等
 
all_occupations = user_fields.map(lambda fields: fields[3]).distinct().collect()
all_occupations.sort()
# create a new dictionary to hold the occupations, and assign the "1-of-k" indexes
idx = 0
all_occupations_dict = {}
for o in all_occupations:
    all_occupations_dict[o] = idx
    idx +=1
# try a few examples to see what "1-of-k" encoding is assigned
print "Encoding of ‘doctor‘: %d" % all_occupations_dict[‘doctor‘]
print "Encoding of ‘programmer‘: %d" % all_occupations_dict[‘programmer‘]
 
运行结果
Encoding of ‘doctor‘: 2
Encoding of ‘programmer‘: 14
 
 
# create a vector representation for "programmer" and encode it into a binary vector
K = len(all_occupations_dict)
binary_x = np.zeros(K)
k_programmer = all_occupations_dict[‘programmer‘]
binary_x[k_programmer] = 1
print "Binary feature vector: %s" % binary_x
print "Length of binary vector: %d" % K
 
结果:
Binary feature vector: [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  0.  1.  0.  0.  0.
  0.  0.  0.]
Length of binary vector: 21
 
将时间戳转为类别特征
# a function to extract the timestamps (in seconds) from the dataset
def extract_datetime(ts):
    import datetime
    return datetime.datetime.fromtimestamp(ts)
 
timestamps = rating_data.map(lambda fields: int(fields[3]))
hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour)
hour_of_day.take(5)
 
[7, 11, 23, 21, 21]
 
这里进行修改,源码中night范围错误
# a function for assigning "time-of-day" bucket given an hour of the day
def assign_tod(hr):
    times_of_day = {
                ‘morning‘ : range(7, 12),
                ‘lunch‘ : range(12, 14),
                ‘afternoon‘ : range(14, 18),
                ‘evening‘ : range(18, 23),
                ‘night‘ : {23,24,0,1,2,3,4,5,6,7}
                }
    for k, v in times_of_day.iteritems():
        if hr in v:
            return k
 
# now apply the "time of day" function to the "hour of day" RDD
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))
time_of_day.take(5)
 
[‘night‘, ‘morning‘, ‘night‘, ‘evening‘, ‘evening‘]
 
文本特征处理方法:词袋法(bag-of-word)
(1)分词(tokenization)
(2)删除停用词(stop words removal)
(3)提取词干(stemming)
(4)向量化(vectorization)
 
前5个标题进行测试extract_title
# we define a function to extract just the title from the raw movie title, removing the year of release
def extract_title(raw):
    import re
    grps = re.search("\((\w+)\)", raw)    # this regular expression finds the non-word (numbers) between parentheses
    if grps:
        return raw[:grps.start()].strip() # we strip the trailing whitespace from the title
    else:
        return raw
 
# first lets extract the raw movie titles from the movie fields
raw_titles = movie_fields.map(lambda fields: fields[1])
# next, we strip away the "year of release" to leave us with just the title text
# let‘s test our title extraction function on the first 5 titles
for raw_title in raw_titles.take(5):
    print extract_title(raw_title)
 
Toy Story
GoldenEye
Four Rooms
Get Shorty
Copycat
 
空白分词法,将标题分为词
# ok that looks good! let‘s apply it to all the titles
movie_titles = raw_titles.map(lambda m: extract_title(m))
# next we tokenize the titles into terms. We‘ll use simple whitespace tokenization
title_terms = movie_titles.map(lambda t: t.split(" "))
print title_terms.take(5)
 
[[u‘Toy‘, u‘Story‘], [u‘GoldenEye‘], [u‘Four‘, u‘Rooms‘], [u‘Get‘, u‘Shorty‘], [u‘Copycat‘]]
 
创建词典,并用一些词来测试出现次数
# next we would like to collect all the possible terms, in order to build out dictionary of term <-> index mappings
all_terms = title_terms.flatMap(lambda x: x).distinct().collect()
# create a new dictionary to hold the terms, and assign the "1-of-k" indexes
idx = 0
all_terms_dict = {}
for term in all_terms:
    all_terms_dict[term] = idx
    idx +=1
num_terms = len(all_terms_dict)
print "Total number of terms: %d" % num_terms
print "Index of term ‘Dead‘: %d" % all_terms_dict[‘Dead‘]
print "Index of term ‘Rooms‘: %d" % all_terms_dict[‘Rooms‘]
 
结果
Total number of terms: 2645
Index of term ‘Dead‘: 147
Index of term ‘Rooms‘: 1963
 
spark中zipWithIndex可以得到同样的效果
# we could also use Spark‘s ‘zipWithIndex‘ RDD function to create the term dictionary
all_terms_dict2 = title_terms.flatMap(lambda x: x).distinct().zipWithIndex().collectAsMap()
print "Index of term ‘Dead‘: %d" % all_terms_dict2[‘Dead‘]
print "Index of term ‘Rooms‘: %d" % all_terms_dict2[‘Rooms‘]
 
结果
Index of term ‘Dead‘: 147
Index of term ‘Rooms‘: 1963
 
# this function takes a list of terms and encodes it as a scipy sparse vector using an approach 
# similar to the 1-of-k encoding
def create_vector(terms, term_dict):
    from scipy import sparse as sp
    x = sp.csc_matrix((1, num_terms))
    for t in terms:
        if t in term_dict:
            idx = term_dict[t]
            x[0, idx] = 1
    return x
all_terms_bcast = sc.broadcast(all_terms_dict)
term_vectors = title_terms.map(lambda terms: create_vector(terms, all_terms_bcast.value))
term_vectors.take(5)
 
 
[<1x2645 sparse matrix of type ‘<type ‘numpy.float64‘>‘
     with 2 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type ‘<type ‘numpy.float64‘>‘
     with 1 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type ‘<type ‘numpy.float64‘>‘
     with 2 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type ‘<type ‘numpy.float64‘>‘
     with 2 stored elements in Compressed Sparse Column format>,
 <1x2645 sparse matrix of type ‘<type ‘numpy.float64‘>‘
     with 1 stored elements in Compressed Sparse Column format>]
 
正则化特征:实际上是对数据集中的单个特征进行转换。比如减去平均值、进行标准的正则转换
np.random.seed(42)
x = np.random.randn(10)
norm_x_2 = np.linalg.norm(x)
normalized_x = x / norm_x_2
print "x:\n%s" % x
print "2-Norm of x: %2.4f" % norm_x_2
print "Normalized x:\n%s" % normalized_x
print "2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x)
 
结果
x:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
2-Norm of x: 2.5908
Normalized x:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
2-Norm of normalized_x: 1.0000
正则化特征向量:通过对数据中的某一行的所有特征进行转换,以让转换后的特征的长度标准化。
from pyspark.mllib.feature import Normalizer
normalizer = Normalizer()
vector = sc.parallelize([x])
normalized_x_mllib = normalizer.transform(vector).first().toArray()
 
print "x:\n%s" % x
print "2-Norm of x: %2.4f" % norm_x_2
print "Normalized x MLlib:\n%s" % normalized_x_mllib
print "2-Norm of normalized_x_mllib: %2.4f" % np.linalg.norm(normalized_x_mllib)
 
结果
x:
[ 0.49671415 -0.1382643   0.64768854  1.52302986 -0.23415337 -0.23413696
  1.57921282  0.76743473 -0.46947439  0.54256004]
2-Norm of x: 2.5908
Normalized x MLlib:
[ 0.19172213 -0.05336737  0.24999534  0.58786029 -0.09037871 -0.09037237
  0.60954584  0.29621508 -0.1812081   0.20941776]
2-Norm of normalized_x_mllib: 1.0000

spark机器学习-第3章