首页 > 代码库 > Node.js结合使用MongDb的Map.reduce功能进行大量数据简化处理办法
Node.js结合使用MongDb的Map.reduce功能进行大量数据简化处理办法
一年前,准备使用mongDb自带的map,reduce功能模拟hadoop,换个思路做一个简易的大数据分拆再结合存储的办法;
这个功能可以用于数据日志或者游戏数据之类,进行周期性归纳和按照自己需求重组数据;
以下代码实现了将每日数据collecttion:gameLog日期的数据统计出不同的collection.具体不详述,只讲思路和遇到的问题;
主要代码如下:
var MongoClient = require(‘mongodb‘).MongoClient;
MongoClient.connect(‘mongodb://127.0.0.1:27017/analytics‘, function(err, db) {//db analytics
if(err) throw err;
var mapFn = function(){
emit({userId:this.userId,serverId:this.serverId,appId:this.appId},this);
}
var reduceFn = function(key, values) {
var r={lt:0,lc:0,lip:0,os:0};
for(var i=0;i<values.length;i++){
if(values[i].type==‘login‘){
if(r.lt==0) r.lt=values[i].time;
r.lc+=1;
r.lip=lip;
}else if(values[i].type==‘logout‘){
r.os+=parseInt(values[i].onlineSec);
}
}
return r;
};
var options = {out:‘gameLogResult‘};
var date = new Date();
date.setDate(date.getDate()-1);
var month=date.getMonth()+1;
var logDay=date.getFullYear()+"-"+month+"-"+date.getDate();
var collection = db.collection(‘gameLog‘+logDay);
collection.mapReduce(mapFn, reduceFn, options, function (err, collection) {
var gameLogResult=db.collection(‘gameLogResult‘);
var cursor = gameLogResult.find({});
cursor.each(function(err, doc) {
if(doc.value.type){
if(doc.value.type==‘login‘){
doc.value=http://www.mamicode.com/{lt:doc.value.time,lc:1,os:0};
}else if(doc.value.type==‘logout‘)(
doc.value=http://www.mamicode.com/{lt:0,lc:0,os:doc.value.onlineSec};
)
}
if(doc && !doc.value.type){
db.collection(‘player‘).findOne({aid:doc._id.appId,uid:doc._id.userId},function(err,result){
doc.value.dt=logDay;
if(result){
var serverIndex=-1;
for(var i=0;i<result.servers.length;i++){
if(result.servers[i].sid==doc._id.serverId){
serverIndex=i;
break;
}
}
if(serverIndex>=0){
var ddIndex=-1;
for(var i=0;i<result.servers[serverIndex].dd.length;i++){
if(result.servers[serverIndex].dd[i].dt==doc.value.dt){
ddIndex=i;
break;
}
}
if(ddIndex>=0){
result.servers[serverIndex].dd[ddIndex]=doc.value
}else{
result.servers[serverIndex].dd.unshift(doc.value);
}
result.servers[serverIndex].lt=doc.value.lt>0?doc.value.lt:date.getTime();
}else{
var server={sid:doc._id.serverId};
server.ct=doc.value.lt>0?doc.value.lt:date.getTime();
server.lt=doc.value.lt>0?doc.value.lt:date.getTime();
server.dd=[];
server.dd.push(doc.value);
result.servers[serverIndex].push(server);
}
result.lt=doc.value.lt>0?doc.value.lt:date.getTime();
db.collection(‘player‘).update({aid:doc._id.appId,uid:doc._id.userId},{$set:{servers:result.servers},$set:{lt:result.lt}},function(err,data){});
}else{
var player={aid:doc._id.appId,uid:doc._id.userId};
player.ct=doc.value.lt>0?doc.value.lt:date.getTime();
player.lt=doc.value.lt>0?doc.value.lt:date.getTime();
player.servers=[];
var server={sid:doc._id.serverId};
server.ct=doc.value.lt>0?doc.value.lt:date.getTime();
server.lt=doc.value.lt>0?doc.value.lt:date.getTime();
server.dd=[];
server.dd.push(doc.value);
player.servers.push(server);
db.collection(‘player‘).insert(player,{w:1},function(err,objects){});
}
});
}
//if(!doc) process.exit(1);
});
})
})
重点说明下使用map.reduce的时候为何不能合并数据,这个问题困扰了我们两个人2天,最后在stackOverflow上找到有人做了解释;
ps.思维混乱,稍后再写~