首页 > 代码库 > node 内存消息队列

node 内存消息队列

var net = require(net)var clients = []    ,msgs = {}function unWrapMsg(data){    data = data.toString().trim()    var _d = data.split(: , 2)    _d[1] = _d[1] || ‘‘    var p1 = _d[0].trim()        ,p2 = _d[1].trim()        ,p3 = data.slice(_d[0].length + _d[1].length + 2)    return [p1 , p2 , p3]}function wrapMsg(p1 , p2 ,p3){    return [p1.toString() , p2.toString() , p3.toString()].join(:) + "\n"}exports.startServer = function(config){    var cmd = {        push : function(drawer , msg){            if (! msgs[drawer]) msgs[drawer] = []            msgs[drawer].push(msg)        },        pull : function(drawer ){            if (! msgs[drawer]) msgs[drawer] = []            return msgs[drawer].shift()        },        clean : function(drawer){            if (drawer) msgs[drawer] = []            else msgs = {}        }    }    net.createServer(function (socket) {        socket.name = socket.remoteAddress + ":" + socket.remotePort        clients.push(socket)        //socket.write("Welcome " + socket.name + "\n")        socket.on(data, function (data) {            data = unWrapMsg(data)            //console.log(‘from client ‘ ,data)            var act = data[0]                ,drawer = data[1]                ,msg = data[2]            if (!cmd[act]) return            var ret = cmd[act](drawer ,msg)            //console.log(act ,msg , ret)            socket.write(wrapMsg(on + act , JSON.stringify(ret || ‘‘) ,msg) )            //broadcast(socket.name + "> " + data, socket)        })        socket.on(end, function () {            clients.splice(clients.indexOf(socket), 1)            //broadcast(socket.name + " left the chat.\n")        })        function broadcast(message, sender) {            clients.forEach(function (client) {                // Don‘t want to send it to sender                if (client === sender) return                client.write(message)            })            process.stdout.write(message)        }    }).listen(config.port)    return cmd}exports.startClient = function(config){    var _cs = []        ,cmd = {            onpull : function(ret ,cbkid){                    var cbk = _cs[cbkid]                    cbk && cbk(ret)                    delete _cs[cbkid]                    }            }    var client = net.connect(config , function() {        console.log(client connected)        //client.write(‘world!\r\n‘)    })client.on(data, function(data) {        data = unWrapMsg(data)        //console.log(‘from server‘ , data)        var act = data[0]        if (!cmd[act]) return        cmd[act](data[1] , data[2])    })    client.on(end, function() {        console.log(client disconnected)    })    return {        push : function(drawer , msg){            client.write(wrapMsg(push , drawer , msg))                }        ,pull : function(drawer ,cbk){            var cbkid = _cs.push(cbk) - 1            client.write(wrapMsg(pull , drawer,cbkid))                }        ,clean : function(drawer ){            client.write(wrapMsg(clean , drawer ))                }    }}

基于内存的生产/消费 MQ

server : 

var nmq = require(‘./nmq.js‘)

var s = nmq.startServer({port : 5000})
var i = 0
setInterval(function(){
s.push(‘t‘ , [‘server_‘,i++])
}, 1000)


client:

var nmq = require(‘./nmq.js‘)

var s = nmq.startClient({port : 5000})
var i = 0
setInterval(function(){
//if (+new Date % 2) s.push(‘t‘ ,‘client_‘+i)
s.pull(‘t‘ , function(x){
console.log(‘I got it‘ , i++ , x)
})
}, 1000)

 

node 内存消息队列