首页 > 代码库 > Python3 模拟MapReduce处理分析大数据文件——《Python宝典》

Python3 模拟MapReduce处理分析大数据文件——《Python宝典》

最近买了一本《Python宝典》在看,此书所讲Python知识的广度明显,但是深度略显不足,所以比较适合入门及提高级的读者来看。其中对于Python大数据处理一章的内容比较有兴趣,看明白了以后,我根据书上提供的案例对源代码进行了修改,也实现了模拟MapReduce的过程。

目标:从Apache的用户访问日志access.log中统计出页面资源的访问量。我们假设这个文件体积十分巨大。

access中的信息结构:66.249.68.43 - - [04/Aug/2011:01:06:48 +0800] "GET /某页面地址 HTTP/1.1" 200 4100

含义:66.249.68.43 来源ip, [04/Aug/2011:01:06:48 +0800] 日期时间,"GET /某页面地址 HTTP/1.1" 来源方式,200 返回状态,4100 返回字节数

步骤:1、将大文件切割成多个小文件;2、计算每个小文件中的页面访问次数(map过程),每一个小文件对应一个计算结果文件;3、将每个小文件中的计算结果文件进行合并累加,统计出最终的页面资源访问量,结果保存到reduceResult.txt中。

文件结构:


其中,access.log是原始的日志文件,smallFiles中保存分割后的一组小文件,mapFiles中存入每个小文件对应的处理结果文件,reduceResult.txt保存最终的处理结果。

以下是源码:

'''
Created on 2014-12-19

@author: guoxiyue
@file:0fileSplit.py
@function:文件分割
'''
import os,os.path,time;
sourceFile=open('files/access.log','r',encoding='utf8'); #打开原始大文件
targetDir='files/smallFiles'; # 设置小文件的保存目录
smallFileSize=30; #设置每个小文件中的记录条数
tempList=[]; #临时列表,用于暂存小文件中的记录内容
fileNum=1; # 小文件序号
readLine=sourceFile.readline(); #先读一行
while(readLine): #循环
    lineNum=1
    while(lineNum<=smallFileSize): #控制每个小文件的记录条数不超过设定值
        tempList.append(readLine); #将当前行的读取结果保存到临时列表中
        readLine=sourceFile.readline(); #再读下一行
        lineNum+=1;# 行号自增
        if not readLine:break;#如果读到空,则说明大文件读完,退出内层循环
    tempFile=open('files/smallFiles/access_'+str(fileNum)+'.txt','w',encoding='utf8');#将读取到的30条记录保存到文件中
    tempFile.writelines(tempList);
    tempFile.close();
    tempList=[];#清空临时列表
    print('files/smallFiles/access_'+str(fileNum)+'.txt  创建于 '+str(time.asctime()));
    fileNum+=1; #文件序号自增
sourceFile.close();      

'''
Created on 2014-12-19

@author: guoxiyue
@file:1map.py
@function:map过程,分别计算每一个小文件中的页面资源访问量
'''
import os,os.path,re,time;

sourceFileList=os.listdir('files/smallFiles/'); #获取所有小文件文件名列表
targetDir="files/mapFiles/"; #设置处理结果保存目录
for eachFile in sourceFileList: #遍历小文件
    currentFile=open('files/smallFiles/'+eachFile,'r',encoding='utf8'); #打开小文件
    currentLine=currentFile.readline(); #先读一行
    tempDict={}; #临时字典
    while(currentLine): 
        p_re=re.compile("(GET|POST)\s(.*?)\sHTTP", re.IGNORECASE); #用正则表达式来提取访问资源
        match=p_re.findall(currentLine); #从当前行中提取出访问资源
        if(match):
            url=match[0][1]; #提出资源页面
            if url in tempDict: #获取当前资源的访问次数,并添加到字典中
                tempDict[url]+=1; 
            else:
                tempDict[url]=1;
        currentLine=currentFile.readline(); #再读下一行
    currentFile.close();
    
    #以下是将当前小文件的统计结果排序并保存
    tList=[];
    for key,value in sorted(tempDict.items(),key=lambda data:data[0],reverse=True):
        tList.append(key+' '+str(value));
        tList.append('\n')
    tempFile=open(targetDir+eachFile,'a',encoding='utf8');
    tempFile.writelines(tList);
    tempFile.close()
    print(targetDir+eachFile+'.txt  创建于 '+str(time.asctime()));
    

'''
Created on 2014-12-19

@author: guoxiyue
@file:2reduce.py
@function:reduce过程,汇总最终的页面资源访问量
'''
import os,os.path,re,time;
sourceFileList=os.listdir('files/mapFiles/'); #获取小文件的map结果文件名列表
targetFile='files/reduceResult.txt'; # 设置最终结果保存文件
tempDict={}; #临时字典 
p_re=re.compile('(.*?)(\d{1,}$)', re.IGNORECASE); #利用正则表达式抽取资源访问次数
for eachFile in sourceFileList:#遍历map文件
    currentFile=open('files/mapFiles/'+eachFile,'r',encoding='utf8'); #打开当前文件
    currentLine=currentFile.readline(); #读一行
    while(currentLine):
        subData=p_re.findall(currentLine) #提取出当前行中资源的访问次数
        if(subData[0][0] in tempDict): #将结果累加
            tempDict[subData[0][0]]+=int(subData[0][1]);
        else:
            tempDict[subData[0][0]]=int(subData[0][1]);
        currentLine=currentFile.readline();#再读一行
    currentFile.close();

 #以下是将所有map文件的统计结果排序并保存
tList=[];
for key,value in sorted(tempDict.items(),key=lambda data:data[1],reverse=True):
    tList.append(key+' '+str(value));
    tList.append('\n')
tempFile=open(targetFile,'a',encoding='utf8');
tempFile.writelines(tList);
tempFile.close()
print(targetFile+'   创建于 '+str(time.asctime()));

    

Python3 模拟MapReduce处理分析大数据文件——《Python宝典》