首页 > 代码库 > 写自己的socket框架(二)

写自己的socket框架(二)

1、开始正常监听以后,就要开始接受数据了,整体流程图如下:

 

2、上一节看到我们在程序初始化的时候,初始化了很多个SocketConnection,用于管理客户端的链接,那应用层如何来操作,又什么时候来接受数据?于是我们便有了SocketSession,用于给应用层来管理整个会话过程,代码如下:

public class SocketSession : IDisposable    {        public string SessionId { get; private set; }        private System.Net.Sockets.Socket _connectSocket;        private IProtocol _protocol;        private SocketConnection _connect;        public SocketConnection Connection { get { return _connect; } }        private MemoryStream _memStream;        private delegate void ReceiveDataHandler(SocketAsyncEventArgs e);        private ReceiveDataHandler ReceiveHandler;        private delegate void ReceiveReadPackageHandler(byte[] b, int offset, SocketAsyncEventArgs e);        private ReceiveReadPackageHandler ReadPackageHandler;        public System.Net.Sockets.Socket ConnectSocket        {            get            {                return _connectSocket;            }            private set { }        }        public SocketSession(string sessionId)        {            this.SessionId = sessionId;        }        public SocketSession(System.Net.Sockets.Socket client, SocketConnection connect)            : this(Guid.NewGuid().ToString())        {            this._connectSocket = client;            this._connect = connect;            this._protocol = connect.Pool.AppServer.AppProtocol;            _memStream = new MemoryStream();            ReceiveHandler = ReceiveData;            ReadPackageHandler = this.ReadPackage;        }        internal void ReceiveData(SocketAsyncEventArgs e)        {            if (e.SocketError != SocketError.Success)            {                this.Close();                return;            }            if (e.BytesTransferred <= 0)            {                this.Close();                return;            }            try            {                if (this.Connection.Flag == SocketFlag.Busy)                {                    byte[] buffer = new byte[e.BytesTransferred];                    Array.Copy(e.Buffer, 0, buffer, 0, e.BytesTransferred);                    ReadPackage(buffer, 0, e);                    buffer = null;                }            }            catch (Exception ex)            {                this.Close();                return;            }        }        internal void ReceiveAsync(SocketAsyncEventArgs e)        {            if (e == null)            {                return;            }            bool isCompleted = true;            try            {                isCompleted = this._connectSocket.ReceiveAsync(e);            }            catch (Exception ex)            {                LogHelper.Debug(this.SessionId + ex.ToString());                this.Close();            }            if (!isCompleted)            {                this.ReceiveHandler.BeginInvoke(e, ReceiveHandlerCallBack, ReceiveHandler);            }        }        void ReceiveHandlerCallBack(IAsyncResult result)        {            try            {                (result.AsyncState as ReceiveDataHandler).EndInvoke(result);            }            catch (Exception e)            {                LogHelper.Debug(e.Message);            }        }        internal void OnDataRecevied(SessionEventArgs arg)        {            if (DataRecevied != null)            {                this._memStream.SetLength(0);                DataRecevied.Invoke(this, arg);            }        }        internal void Close()        {            try            {                this._connectSocket.Close();            }            catch (Exception ex)            {                LogHelper.Debug("关闭socket异常" + ex.ToString());            }            if (this.Closed != null)            {                this.Closed();            }        }        internal Action Closed;        internal Action<SocketSession, SessionEventArgs> DataRecevied;        public void Dispose()        {            if (_memStream != null)            {                _memStream.Close();                _memStream.Dispose();                _memStream = null;            }        }        public void Send(byte[] data)        {            try            {                if (this.Connection.Flag == SocketFlag.Busy)                {                    this._connectSocket.Send(data);                }            }            catch (Exception ex)            {                this.Close();            }        }        private void ReadPackage(byte[] data, int offset, SocketAsyncEventArgs e)        {            if (data =http://www.mamicode.com/= null || data.Length == 0)            {                return;            }            if (offset >= data.Length)            {                return;            }            if (offset == 0)            {                if (_memStream.Length > 0)                {                    _memStream.Write(data, 0, data.Length);                    data = _memStream.ToArray();                }            }            //粘包处理            OnReceivedCallBack(data, offset, e);            data = null;        }        private void OnReceivedCallBack(byte[] buffer, int offset, SocketAsyncEventArgs e)        {            byte[] data = http://www.mamicode.com/this._protocol.OnDataReceivedCallBack(buffer, ref offset);            if (offset == -1)            {                this.Close();                return;            }            if (data =http://www.mamicode.com/= null || data.Length == 0)            {                this._memStream.Write(buffer, offset, buffer.Length - offset);                this.ReceiveAsync(e);                return;            }            SessionEventArgs session_args = new SessionEventArgs();            session_args.Data = data;            this.OnDataRecevied(session_args);            if (offset < buffer.Length)            {                this.ReadPackageHandler.BeginInvoke(buffer, offset, e, ReadPackageCallBack, ReadPackageHandler);            }            else            {                this.ReceiveAsync(e);            }            data = null;        }        void ReadPackageCallBack(IAsyncResult result)        {            try            {                (result.AsyncState as ReceiveReadPackageHandler).EndInvoke(result);            }            catch (Exception ex)            {                LogHelper.Debug(ex.Message);            }        }    }
View Code

 

细心的童鞋可以发现,在ReceiveAsync方法里面,接收数据的地方,当同步接收完成的时候,我们调用了一个异步委托ReceiveHandler.BeginInvoke。

在解析出一个独立的包,并且缓冲区的数据里面还有多余的包的时候,我们也调用了一个异步的委托ReadPackageHandler.BeginInvoke。

如果缓冲区比较大,比如我现在是8K,而单个包很小,客户端又发送比较频繁的时候。会导致在解析包的时候,形成一个短暂的递归。递归就会不停的压堆,资源得不到释放。

运行一段时间后,有可能导致OutOfMemoryException,如果一直是同步接收数据,在Receive的地方,也有可能形成一个递归。于是便采用了异步调用的方式。

 

3、因为socket属于无边界的,代码层面的每一次Send,并不是真正意义上的直接发送给服务器,而只是写到了缓冲区,由系统来决定什么时候发。如果客户 端发送非常频繁的情况下,就可能导致服务器从缓冲区取出来的包,是由多个包一起组成的。从缓冲区取出来的包,并不能保证是一个独立的应用层的包,需要按既定的协议来解析包。

我们先假定一个简单的协议,一个包的前4个字节,表明这个包内容的长度。代码如下:

public class DefaultProtocol : IProtocol    {        public byte[] OnDataReceivedCallBack(byte[] data, ref int offset)        {            int length = BitConverter.ToInt32(data, offset);            int package_head = 4;            int package_length = length + package_head;            byte[] buffer = null;            if (length > 0)            {                if (offset + package_length <= data.Length)                {                    buffer = new byte[length];                    Array.Copy(data, offset + package_head, buffer, 0, length);                    offset += package_length;                }            }            else            {                offset = -1;            }            return buffer;        }    }
View Code

如果协议无法正常解析,则offset=-1,并关闭掉该链接。如果在解析完一个包以后,还有剩余的包, 于是在抛给应用层以后,便继续解析。如果单个包比较大,缓冲区一次放不下的时候,我们将数据暂时写入到内存流里面,然后将下一次接收到的数据,一并拿出来解析。

4、接收数据已经准备完毕以后,就需要将SocketConnection和SocketSession关联起来,代码如下:

public class AppServer : IAppServer    {        public delegate void DataRecevieHandler(SocketSession o, SessionEventArgs e);        public delegate void NewConnectionHandler(SocketSession o, EventArgs e);        public delegate void one rrorHandler(Exception e);        public event DataRecevieHandler DataRecevied;        public event NewConnectionHandler NewConnected;        public event one rrorHandler one rror;        private ISocketListener _listener;        private SocketConnectionPool _connectPool;        public AppServer(ServerConfig serverConfig)        {            this.AppConfig = serverConfig;            if (this.AppProtocol == null)            {                this.AppProtocol = new DefaultProtocol();            }            _connectPool = new SocketConnectionPool(this);            _connectPool.Connected = OnConnected;            _listener = new SocketListener(this.AppConfig);            _listener.NewClientAccepted += new NewClientAcceptHandler(listener_NewClientAccepted);            _listener.Error += new ErrorHandler(_listener_Error);        }        void OnDataRecevied(SocketSession session, SessionEventArgs e)        {            if (this.DataRecevied != null)            {                DataRecevied.BeginInvoke(session, e, DataReceviedCallBack, DataRecevied);            }        }        public bool Start()        {            _connectPool.Init();            return _listener.Start();        }        public void Stop()        {            _listener.Stop();        }        void _listener_Error(ISocketListener listener, Exception e)        {            if (this.OnError != null)            {                this.OnError.Invoke(e);            }        }        void listener_NewClientAccepted(ISocketListener listener, System.Net.Sockets.Socket client, object state)        {            _connectPool.Push(client);        }        public void OnConnected(System.Net.Sockets.Socket client, SocketConnection connect)        {            var session = new SocketSession(client, connect);            session.DataRecevied = OnDataRecevied;            connect.Initialise(session);            if (NewConnected != null)            {                NewConnected.BeginInvoke(session, EventArgs.Empty, NewConnectedCallBack, NewConnected);            }            if (connect.RecevieEventArgs != null)            {                session.ReceiveAsync(connect.RecevieEventArgs);            }        }        void DataReceviedCallBack(IAsyncResult result)        {            try            {                (result.AsyncState as DataRecevieHandler).EndInvoke(result);            }            catch (Exception e)            {                LogHelper.Debug(e.Message);            }        }        void NewConnectedCallBack(IAsyncResult result)        {            try            {                (result.AsyncState as NewConnectionHandler).EndInvoke(result);            }            catch (Exception e)            {                LogHelper.Debug(e.Message);            }        }        public ServerConfig AppConfig        {            get;            set;        }        public IProtocol AppProtocol        {            get;            set;        }    }
View Code


到这里,整个接收包的流程就结束了,但是发送的地方,我们发现是同步在发送,如果有特别需要的可以考虑写成异步方式,但我个人更倾向于,这一块留给应用层处理,在应用层写一个发送队列,然后有独立的线程来管理这个发送队列。