首页 > 代码库 > Node.js结合使用MongDb的Map.reduce功能进行大量数据简化处理办法

Node.js结合使用MongDb的Map.reduce功能进行大量数据简化处理办法

一年前,准备使用mongDb自带的map,reduce功能模拟hadoop,换个思路做一个简易的大数据分拆再结合存储的办法;

这个功能可以用于数据日志或者游戏数据之类,进行周期性归纳和按照自己需求重组数据;

 

以下代码实现了将每日数据collecttion:gameLog日期的数据统计出不同的collection.具体不详述,只讲思路和遇到的问题;

主要代码如下:

var MongoClient = require(‘mongodb‘).MongoClient;
MongoClient.connect(‘mongodb://127.0.0.1:27017/analytics‘, function(err, db) {//db analytics
if(err) throw err;


var mapFn = function(){
  emit({userId:this.userId,serverId:this.serverId,appId:this.appId},this);
}


var reduceFn = function(key, values) {
  var r={lt:0,lc:0,lip:0,os:0};
  for(var i=0;i<values.length;i++){
    if(values[i].type==‘login‘){
      if(r.lt==0) r.lt=values[i].time;
      r.lc+=1;
      r.lip=lip;
    }else if(values[i].type==‘logout‘){
      r.os+=parseInt(values[i].onlineSec);
    }
  }
  return r;
};

var options = {out:‘gameLogResult‘};
var date = new Date();
date.setDate(date.getDate()-1);
var month=date.getMonth()+1;
var logDay=date.getFullYear()+"-"+month+"-"+date.getDate();

var collection = db.collection(‘gameLog‘+logDay);
collection.mapReduce(mapFn, reduceFn, options, function (err, collection) {
  var gameLogResult=db.collection(‘gameLogResult‘);
  var cursor = gameLogResult.find({});
  cursor.each(function(err, doc) {
  if(doc.value.type){
    if(doc.value.type==‘login‘){
      doc.value=http://www.mamicode.com/{lt:doc.value.time,lc:1,os:0};
    }else if(doc.value.type==‘logout‘)(
      doc.value=http://www.mamicode.com/{lt:0,lc:0,os:doc.value.onlineSec};
    )
  }


  if(doc && !doc.value.type){
    db.collection(‘player‘).findOne({aid:doc._id.appId,uid:doc._id.userId},function(err,result){
    doc.value.dt=logDay;

    if(result){
      var serverIndex=-1;
      for(var i=0;i<result.servers.length;i++){
      if(result.servers[i].sid==doc._id.serverId){
        serverIndex=i;
        break;
     }
  }


  if(serverIndex>=0){
  var ddIndex=-1;
  for(var i=0;i<result.servers[serverIndex].dd.length;i++){
    if(result.servers[serverIndex].dd[i].dt==doc.value.dt){
      ddIndex=i;
      break;
    }
  }


  if(ddIndex>=0){
    result.servers[serverIndex].dd[ddIndex]=doc.value
  }else{
    result.servers[serverIndex].dd.unshift(doc.value);
  }
  result.servers[serverIndex].lt=doc.value.lt>0?doc.value.lt:date.getTime();
  }else{
  var server={sid:doc._id.serverId};
  server.ct=doc.value.lt>0?doc.value.lt:date.getTime();
  server.lt=doc.value.lt>0?doc.value.lt:date.getTime();
  server.dd=[];
  server.dd.push(doc.value);
  result.servers[serverIndex].push(server);
  }
  result.lt=doc.value.lt>0?doc.value.lt:date.getTime();
  db.collection(‘player‘).update({aid:doc._id.appId,uid:doc._id.userId},{$set:{servers:result.servers},$set:{lt:result.lt}},function(err,data){});
}else{
var player={aid:doc._id.appId,uid:doc._id.userId};
player.ct=doc.value.lt>0?doc.value.lt:date.getTime();
player.lt=doc.value.lt>0?doc.value.lt:date.getTime();
player.servers=[];
var server={sid:doc._id.serverId};
server.ct=doc.value.lt>0?doc.value.lt:date.getTime();
server.lt=doc.value.lt>0?doc.value.lt:date.getTime();
server.dd=[];
server.dd.push(doc.value);
player.servers.push(server);
db.collection(‘player‘).insert(player,{w:1},function(err,objects){});

}
});

}
//if(!doc) process.exit(1);
});
})

})

 

 

 

重点说明下使用map.reduce的时候为何不能合并数据,这个问题困扰了我们两个人2天,最后在stackOverflow上找到有人做了解释;

 

ps.思维混乱,稍后再写~