首页 > 代码库 > engine.io分析2--socket.io的基石


转载请注明: TheViper http://www.cnblogs.com/TheViper


var engine = require(‘./node_modules/engine.io/lib/engine.io.js‘);var server = engine.listen(8000,{    transports:[‘polling‘]});server.on(‘connection‘, function(socket){  socket.send(‘utf 8 string‘);});




exports = module.exports = function() {  // backwards compatible use as `.attach`  // if first argument is an http server  if (arguments.length && arguments[0] instanceof http.Server) {    return attach.apply(this, arguments);  }  // if first argument is not an http server, then just make a regular eio server  return exports.Server.apply(null, arguments);};



  if (~this.transports.indexOf(‘websocket‘)) {    this.ws = new WebSocketServer({ noServer: true, clientTracking: false });  }


function listen(port, options, fn) {  if (‘function‘ == typeof options) {    fn = options;    options = {};  }  var server = http.createServer(function (req, res) {    res.writeHead(501);    res.end(‘Not Implemented‘);  });  server.listen(port, fn);  // create engine server  var engine = exports.attach(server, options);  engine.httpServer = server;  return engine;};


var engine = require(‘engine.io‘);var http = require(‘http‘).createServer().listen(3000);var server = engine.attach(http);



function attach(server, options) {  var engine = new exports.Server(options);  engine.attach(server, options);  return engine;};


  // cache and clean up listeners  var listeners = server.listeners(‘request‘).slice(0);  server.removeAllListeners(‘request‘);  server.on(‘close‘, self.close.bind(self));  // add request handler  server.on(‘request‘, function(req, res){    if (check(req)) {      debug(‘intercepting request for path "%s"‘, path);      self.handleRequest(req, res);    } else {      for (var i = 0, l = listeners.length; i < l; i++) {        listeners[i].call(server, req, res);      }    }  });


  function check (req) {    return path == req.url.substr(0, path.length);  }


接着是self.handleRequest(req, res);

Server.prototype.handleRequest = function(req, res){  debug(‘handling "%s" http request "%s"‘, req.method, req.url);  this.prepare(req);  req.res = res;  var self = this;  this.verify(req, false, function(err, success) {    if (!success) {      sendErrorMessage(req, res, err);      return;    }    if (req._query.sid) {      debug(‘setting new request for existing client‘);      self.clients[req._query.sid].transport.onRequest(req);    } else {      self.handshake(req._query.transport, req);    }  });};


Server.prototype.verify = function(req, upgrade, fn){  // transport check  var transport = req._query.transport;  if (!~this.transports.indexOf(transport)) {    debug(‘unknown transport "%s"‘, transport);    return fn(Server.errors.UNKNOWN_TRANSPORT, false);  }  // sid check  var sid = req._query.sid;  if (sid) {    if (!this.clients.hasOwnProperty(sid))      return fn(Server.errors.UNKNOWN_SID, false);    if (!upgrade && this.clients[sid].transport.name !== transport) {      debug(‘bad request: unexpected transport without upgrade‘);      return fn(Server.errors.BAD_REQUEST, false);    }  } else {    // handshake is GET only    if (‘GET‘ != req.method) return fn(Server.errors.BAD_HANDSHAKE_METHOD, false);    if (!this.allowRequest) return fn(null, true);    return this.allowRequest(req, fn);  }  fn(null, true);};



 if (‘GET‘ != req.method),这里要判断下,因为engine.io会通过请求的方法的不同确定后面的执行流程,默认get方法维持长连接,当然发出握手请求的也是get方法,这个后面会说到。



  this.verify(req, false, function(err, success) {    if (!success) {      sendErrorMessage(req, res, err);      return;    }    if (req._query.sid) {      debug(‘setting new request for existing client‘);      self.clients[req._query.sid].transport.onRequest(req);    } else {      self.handshake(req._query.transport, req);    }  });


Server.prototype.handshake = function(transport, req){  var id = base64id.generateId();  debug(‘handshaking client "%s"‘, id);  var transportName = transport;  try {    var transport = new transports[transport](req);    if (‘polling‘ == transportName) {      transport.maxHttpBufferSize = this.maxHttpBufferSize;    }    if (req._query && req._query.b64) {      transport.supportsBinary = false;    } else {      transport.supportsBinary = true;    }  }  catch (e) {    sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);    return;  }  var socket = new Socket(id, this, transport, req);  var self = this;  if (false !== this.cookie) {    transport.on(‘headers‘, function(headers){      headers[‘Set-Cookie‘] = self.cookie + ‘=‘ + id;    });  }  transport.onRequest(req);  this.clients[id] = socket;  this.clientsCount++;  socket.once(‘close‘, function(){    delete self.clients[id];    self.clientsCount--;  });  this.emit(‘connection‘, socket);};

 var transport = new transports[transport](req);这里的transports是transports = require(‘./transports‘)。加载的是transprots文件夹下面的index.js,不是transprots.js.

var XHR = require(‘./polling-xhr‘);var JSONP = require(‘./polling-jsonp‘);module.exports = exports = {  polling: polling,  websocket: require(‘./websocket‘)};exports.polling.upgradesTo = [‘websocket‘];function polling (req) {  if (‘string‘ == typeof req._query.j) {    return new JSONP(req);  } else {    return new XHR(req);  }}


function XHR(req){  Polling.call(this, req);}XHR.prototype.__proto__ = Polling.prototype;


function Polling (req) {  Transport.call(this, req);}Polling.prototype.__proto__ = Transport.prototype;


 回到主线,var socket = new Socket(id, this, transport, req);这里开始实例化socket了。socket里面做了什么,后面会说到。然后在header上设置cookie,值是上面产生的随机id.方便开发者做权限控制。

接着是transport.onRequest(req); 进入polling-xhr.js

XHR.prototype.onRequest = function (req) {  if (‘OPTIONS‘ == req.method) {    var res = req.res;    var headers = this.headers(req);    headers[‘Access-Control-Allow-Headers‘] = ‘Content-Type‘;    res.writeHead(200, headers);    res.end();  } else {    Polling.prototype.onRequest.call(this, req);  }};



Polling.prototype.onRequest = function (req) {  var res = req.res;    if (‘GET‘ == req.method) {    this.onPollRequest(req, res);  } else if (‘POST‘ == req.method) {    this.onDataRequest(req, res);  } else {    res.writeHead(500);    res.end();  }};



Polling.prototype.onPollRequest = function (req, res) {  if (this.req) {    debug(‘request overlap‘);    // assert: this.res, ‘.req and .res should be (un)set together‘    this.onError(‘overlap from client‘);    res.writeHead(500);    return;  }  debug(‘setting request‘);  this.req = req;  this.res = res;  var self = this;  function onClose () {    self.onError(‘poll connection closed prematurely‘);  }  function cleanup () {    req.removeListener(‘close‘, onClose);    self.req = self.res = null;  }  req.cleanup = cleanup;  req.on(‘close‘, onClose);  this.writable = true;  this.emit(‘drain‘);  // if we‘re still writable but had a pending close, trigger an empty send  if (this.writable && this.shouldClose) {    debug(‘triggering empty send to append close packet‘);    this.send([{ type: ‘noop‘ }]);  }};



function Socket (id, server, transport, req) {  this.id = id;  this.server = server;  this.upgraded = false;  this.readyState = ‘opening‘;  this.writeBuffer = [];  this.packetsFn = [];  this.sentCallbackFn = [];  this.request = req;  // Cache IP since it might not be in the req later  this.remoteAddress = req.connection.remoteAddress;  this.checkIntervalTimer = null;  this.upgradeTimeoutTimer = null;  this.pingTimeoutTimer = null;  this.setTransport(transport);  this.onOpen();}


Socket.prototype.setTransport = function (transport) {  this.transport = transport;  this.transport.once(‘error‘, this.onError.bind(this));  this.transport.on(‘packet‘, this.onPacket.bind(this));  this.transport.on(‘drain‘, this.flush.bind(this));  this.transport.once(‘close‘, this.onClose.bind(this, ‘transport close‘));  //this function will manage packet events (also message callbacks)  this.setupSendCallback();};


Socket.prototype.onOpen = function () {  this.readyState = ‘open‘;  // sends an `open` packet  this.transport.sid = this.id;  this.sendPacket(‘open‘, JSON.stringify({      sid: this.id    , upgrades: this.getAvailableUpgrades()    , pingInterval: this.server.pingInterval    , pingTimeout: this.server.pingTimeout  }));  this.emit(‘open‘);  this.setPingTimeout();};


Socket.prototype.setPingTimeout = function () {  var self = this;  clearTimeout(self.pingTimeoutTimer);  self.pingTimeoutTimer = setTimeout(function () {    self.onClose(‘ping timeout‘);  }, self.server.pingInterval + self.server.pingTimeout);};


Socket.prototype.sendPacket = function (type, data, callback) {  if (‘closing‘ != this.readyState) {    debug(‘sending packet "%s" (%s)‘, type, data);    var packet = { type: type };    if (data) packet.data =http://www.mamicode.com/ data;    // exports packetCreate event    this.emit(‘packetCreate‘, packet);    this.writeBuffer.push(packet);    //add send callback to object    this.packetsFn.push(callback);    this.flush();  }};


Socket.prototype.flush = function () {  if (‘closed‘ != this.readyState && this.transport.writable    && this.writeBuffer.length) {    debug(‘flushing buffer to transport‘);    this.emit(‘flush‘, this.writeBuffer);    this.server.emit(‘flush‘, this, this.writeBuffer);    var wbuf = this.writeBuffer;    this.writeBuffer = [];    if (!this.transport.supportsFraming) {      this.sentCallbackFn.push(this.packetsFn);    } else {      this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);    }    this.packetsFn = [];    this.transport.send(wbuf);    this.emit(‘drain‘);    this.server.emit(‘drain‘, this);  }};


Polling.prototype.send = function (packets) {  if (this.shouldClose) {    debug(‘appending close packet to payload‘);    packets.push({ type: ‘close‘ });    this.shouldClose();    this.shouldClose = null;  }  var self = this;  parser.encodePayload(packets, this.supportsBinary, function(data) {    self.write(data);  });};


Polling.prototype.write = function (data) {  debug(‘writing "%s"‘, data);  this.doWrite(data);  this.req.cleanup();  this.writable = false;};


XHR.prototype.doWrite = function(data){  // explicit UTF-8 is required for pages not served under utf  var isString = typeof data =http://www.mamicode.com/= ‘string‘;  var contentType = isString    ? ‘text/plain; charset=UTF-8‘    : ‘application/octet-stream‘;  var contentLength = ‘‘ + (isString ? Buffer.byteLength(data) : data.length);  var headers = {    ‘Content-Type‘: contentType,    ‘Content-Length‘: contentLength  };  // prevent XSS warnings on IE  // https://github.com/LearnBoost/socket.io/pull/1333  var ua = this.req.headers[‘user-agent‘];  if (ua && (~ua.indexOf(‘;MSIE‘) || ~ua.indexOf(‘Trident/‘))) {    headers[‘X-XSS-Protection‘] = ‘0‘;  }  this.res.writeHead(200, this.headers(this.req, headers));  this.res.end(data);};
