首页 > 代码库 > python实现指定目录下JAVA文件单词计数的多进程版本
python实现指定目录下JAVA文件单词计数的多进程版本
要说明的是, 串行版本足够快了, 在我的酷睿双核 debian7.6 下运行只要 0.2s , 简直是难以超越。 多进程版本难以避免大量的进程创建和数据同步与传输开销, 性能反而不如串行版本, 只能作为学习的示例了。 以后再优化吧。
#-------------------------------------------------------------------------------# Name: wordstat_multiprocessing.py# Purpose: statistic words in java files of given directory by multiprocessing## Author: qin.shuq## Created: 09/10/2014# Copyright: (c) qin.shuq 2014# Licence: <your licence>#-------------------------------------------------------------------------------import reimport osimport timeimport loggingfrom Queue import Emptyfrom multiprocessing import Process, Manager, Pool, Pipe, cpu_countLOG_LEVELS = { ‘DEBUG‘: logging.DEBUG, ‘INFO‘: logging.INFO, ‘WARN‘: logging.WARNING, ‘ERROR‘: logging.ERROR, ‘CRITICAL‘: logging.CRITICAL}ncpu = cpu_count()def initlog(filename) : logger = logging.getLogger() hdlr = logging.FileHandler(filename) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") hdlr.setFormatter(formatter) logger.addHandler(hdlr) logger.setLevel(LOG_LEVELS[‘INFO‘]) return loggererrlog = initlog("error.log")infolog = initlog("info.log")class FileObtainer(object): def __init__(self, dirpath, fileFilterFunc=None): self.dirpath = dirpath self.fileFilterFunc = fileFilterFunc def findAllFilesInDir(self): files = [] for path, dirs, filenames in os.walk(self.dirpath): if len(filenames) > 0: for filename in filenames: files.append(path+‘/‘+filename) if self.fileFilterFunc is None: return files else: return filter(self.fileFilterFunc, files)class MultiQueue(object): def __init__(self, qnum, timeout): manager = Manager() self.timeout = timeout self.qnum = qnum self.queues = [] self.pindex = 0 for i in range(self.qnum): qLines = manager.Queue() self.queues.append(qLines) def put(self, obj): self.queues[self.pindex].put(obj) self.pindex = (self.pindex+1) % self.qnum def get(self): for i in range(self.qnum): try: obj = self.queues[i].get(True, self.timeout) return obj except Empty, emp: print ‘Not Get.‘ errlog.error(‘In WordReading:‘ + str(emp)) return Nonedef readFile(filename): try: f = open(filename, ‘r‘) lines = f.readlines() infolog.info(‘[successful read file %s]\n‘ % filename) f.close() return lines except IOError, err: errorInfo = ‘file %s Not found \n‘ % filename errlog.error(errorInfo) return []def batchReadFiles(fileList, ioPool, mq): futureResult = [] for filename in fileList: futureResult.append(ioPool.apply_async(readFile, args=(filename,))) allLines = [] for res in futureResult: allLines.extend(res.get()) mq.put(allLines)class WordReading(object): def __init__(self, allFiles, mq): self.allFiles = allFiles self.mq = mq self.ioPool = Pool(ncpu*3) infolog.info(‘WordReading Initialized‘) def run(self): fileNum = len(allFiles) batchReadFiles(self.allFiles, self.ioPool, self.mq)def processLines(lines): result = {} linesContent = ‘‘.join(lines) matches = WordAnalyzing.wordRegex.findall(linesContent) if matches: for word in matches: if result.get(word) is None: result[word] = 0 result[word] += 1 return resultdef mergeToSrcMap(srcMap, destMap): for key, value in destMap.iteritems(): if srcMap.get(key): srcMap[key] = srcMap.get(key)+destMap.get(key) else: srcMap[key] = destMap.get(key) return srcMapclass WordAnalyzing(object): ‘‘‘ return Map<Word, count> the occurrence times of each word ‘‘‘ wordRegex = re.compile("[\w]+") def __init__(self, mq, conn): self.mq = mq self.cpuPool = Pool(ncpu) self.conn = conn self.resultMap = {} infolog.info(‘WordAnalyzing Initialized‘) def run(self): starttime = time.time() lines = [] futureResult = [] while True: lines = self.mq.get() if lines is None: break futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,))) resultMap = {} for res in futureResult: mergeToSrcMap(self.resultMap, res.get()) endtime = time.time() print ‘WordAnalyzing analyze cost: ‘, (endtime-starttime)*1000 , ‘ms‘ self.conn.send(‘OK‘) self.conn.close() def obtainResult(self): return self.resultMapclass PostProcessing(object): def __init__(self, resultMap): self.resultMap = resultMap def sortByValue(self): return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True) def obtainTopN(self, topN): sortedResult = self.sortByValue() sortedNum = len(sortedResult) topN = sortedNum if topN > sortedNum else topN for i in range(topN): topi = sortedResult[i] print topi[0], ‘ counts: ‘, topi[1]if __name__ == "__main__": dirpath = "/home/lovesqcc/workspace/java/javastudy/src/" if not os.path.exists(dirpath): print ‘dir %s not found.‘ % dirpath exit(1) fileObtainer = FileObtainer(dirpath, lambda f: f.endswith(‘.java‘)) allFiles = fileObtainer.findAllFilesInDir() mqTimeout = 0.01 mqNum = 1 mq = MultiQueue(mqNum, timeout=mqTimeout) p_conn, c_conn = Pipe() wr = WordReading(allFiles, mq) wa = WordAnalyzing(mq, c_conn) wr.run() wa.run() msg = p_conn.recv() if msg == ‘OK‘: pass # taking less time, parallel not needed. postproc = PostProcessing(wa.obtainResult()) postproc.obtainTopN(30) print ‘exit the program.‘
python实现指定目录下JAVA文件单词计数的多进程版本
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。