首页 > 代码库 > 写自己的Socket框架(一)

写自己的Socket框架(一)

本系列仅介绍可用于生产环境的C#异步Socket框架,如果您在其他地方看到类似的代码,不要惊讶,那可能就是我在参考开源代码时,直接“剽窃”过来的。

 

1、在脑海里思考一下整个socket的链接的处理流程,于是便有了下图。

2、首先就开始监听,代码如下:

public override bool Start()        {            this._socket = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);            //设置KeeyAlive,如果客户端不主动发消息时,Tcp本身会发一个心跳包,来通知服务器,这是一个保持通讯的链接。            //避免等到下一次通讯时,才知道链接已经断开。            this._socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);            this._socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);            try            {                this._socket.Bind(base.SocketConfig.Point);                this._socket.Listen(base.SocketConfig.Backlog);                this._socket_args = new SocketAsyncEventArgs();                this._socket_args.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptSocketCompleted);                //在链接过来的时候,如果IO没有挂起,则AcceptAsync为False,表明同步完成。                if (!this._socket.AcceptAsync(this._socket_args))                {                    AcceptSocketCompleted(this._socket, this._socket_args);                }                return true;            }            catch (Exception ex)            {                return false;            }        }void AcceptSocketCompleted(object sender, SocketAsyncEventArgs e)        {            System.Net.Sockets.Socket socket = null;            if (e.SocketError != SocketError.Success)            {                    return;            }            else            {                socket = e.AcceptSocket;            }            e.AcceptSocket = null;            bool willRaiseEvent = false;            try            {                //继续监听该端口,在处理逻辑时,不影响其他链接的数据传送。                willRaiseEvent = this._socket.AcceptAsync(e);            }            catch (Exception ex)            {                willRaiseEvent = true;            }            if (socket != null)                OnNewClientAccepted(socket, null);            if (!willRaiseEvent)                AcceptSocketCompleted(null, e);        }
View Code

3、这个时候链接过来了,就要开始入队列了,如果没有这方面的需求,这一步可以忽略,代码如下:

public class SocketProxy    {        public System.Net.Sockets.Socket Client;        public DateTime Timeout = DateTime.Now;    }public class SocketConnectionQueue : IDisposable    {        private Queue<SocketProxy> _queue;        private readonly object _syncObject = new object();        private bool _isStop = false;        private Thread _thread;        public Action<SocketProxy> Connected;        public SocketConnectionQueue()        {            if (_queue == null)            {                _queue = new Queue<SocketProxy>();            }            if (_thread == null)            {                _thread = new Thread(Thread_Work)                {                    IsBackground = true,                    Priority = ThreadPriority.Highest                };                _thread.Start();            }        }        public void Push(SocketProxy connect)        {            lock (_syncObject)            {                if (_queue != null)                {                    _queue.Enqueue(connect);                }            }        }        public void Thread_Work()        {            while (!_isStop)            {                SocketProxy[] socketConnect = null;                lock (_syncObject)                {                    if (_queue.Count > 0)                    {                        socketConnect = new SocketProxy[_queue.Count];                        _queue.CopyTo(socketConnect, 0);                        _queue.Clear();                    }                }                if (socketConnect != null && socketConnect.Length > 0)                {                    foreach (var client in socketConnect)                    {                        if (Connected != null)                        {                            Connected.Invoke(client);                        }                    }                }                Thread.Sleep(10);            }        }        public void Dispose()        {            _isStop = true;            if (_thread != null)            {                _thread.Join();            }        }    }
View Code

4、入完队列,就要开始从链接池子里面分配资源了,你也可以不做链接池,在每次请求过来的时候去实例化一个链接,然后将这个链接入池,我的做法是在程序初始化的时候就分配好一定的资源,代码如下:

public class SocketConnectionPool : IDisposable    {        private ServerConfig _serverConfig;        public IAppServer AppServer;        private ConcurrentStack<SocketConnection> _connectPool;        private long connect_id = 0;        private byte[] _buffer;        private readonly object _syncObject = new object();        private SocketConnectionQueue _queue;        public Action<System.Net.Sockets.Socket, SocketConnection> Connected;        public long GenerateId()        {            if (connect_id == long.MaxValue)            {                connect_id = 0;            }            connect_id++;            return connect_id;        }        public SocketConnectionPool(IAppServer server)        {            this.AppServer = server;            this._serverConfig = server.AppConfig;        }        public void Init()        {            var connects = new List<SocketConnection>(this._serverConfig.MaxConnectionNumber);            _buffer = new byte[this._serverConfig.BufferSize];            SocketAsyncEventArgs arg;            for (var i = 0; i < this._serverConfig.MaxConnectionNumber; i++)            {                arg = new SocketAsyncEventArgs();                arg.SetBuffer(_buffer, 0, _buffer.Length);                connects.Add(new SocketConnection(arg, this));            }            _connectPool = new ConcurrentStack<SocketConnection>(connects);            if (_queue == null)            {                _queue = new SocketConnectionQueue();            }                        _queue.Connected = OnConnected;        }        public void Push(System.Net.Sockets.Socket socket)        {            SocketProxy proxy = new SocketProxy()            {                Client = socket            };            _queue.Push(proxy);        }        public void OnConnected(SocketProxy proxy)        {            //如果发现队列里面的链接,在Timeout时间内,都没有分配到资源,则关掉链接并丢弃。            int timeout = (int)(DateTime.Now - proxy.Timeout).TotalSeconds;            if (timeout >= this._serverConfig.Timeout)            {                proxy.Client.Close();                return;            }            else            {                //没有分配到资源重新入列。                SocketConnection connect = this.GetConnectionFromPool();                if (connect == null)                {                    _queue.Push(proxy);                }                else                {                    if (this.Connected != null)                    {                        this.Connected(proxy.Client, connect);                    }                }            }        }        /// <summary>        /// 从链接池去取链接(LIFO)        /// </summary>        /// <returns></returns>        public SocketConnection GetConnectionFromPool()        {            //_queue.Push();            SocketConnection connect;            if (!_connectPool.TryPop(out connect))            {                return null;            }            lock (_syncObject)            {                long connect_id = this.GenerateId();                connect.ConnectId = connect_id;            }            return connect;        }        /// <summary>        /// 释放链接,并放回链接池        /// </summary>        /// <param name="connect"></param>        public void ReleaseConnection(SocketConnection connect)        {            _connectPool.Push(connect);            LogHelper.Debug(connect.ConnectId + "放回ConnectPool");        }        public void Dispose()        {            _queue.Dispose();        }    }
View Code

在Init()里面初始化了很多个SocketConnection,这个就是我们用来管理具体的单个链接的class,代码如下:

public class SocketConnection    {        public SocketFlag Flag { get; private set; }                public SocketConnectionPool Pool { get { return _pool; } private set { } }        private SocketConnectionPool _pool;        public SocketAsyncEventArgs RecevieEventArgs { get; set; }        public long ConnectId { get; set; }        public SocketConnection()        {            this.Flag = SocketFlag.Error;        }        public SocketConnection(SocketAsyncEventArgs args, SocketConnectionPool pool)        {            RecevieEventArgs = args;            RecevieEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketEventArgs_Completed);                        this.Flag = SocketFlag.Busy;            this._pool = pool;        }        void SocketEventArgs_Completed(object sender, SocketAsyncEventArgs e)        {            var socketSession = e.UserToken as SocketSession;            if (socketSession == null)            {                this.Flag = SocketFlag.Error;                this.Close();                return;            }            switch (e.LastOperation)            {                 case SocketAsyncOperation.Receive:                    socketSession.ReceiveData(e);                    break;                default:                    break;            }        }        public void Initialise(SocketSession session)        {            this.RecevieEventArgs.UserToken = session;            this.Flag = SocketFlag.Busy;            session.Closed += () =>            {                this.Close();            };        }        public void Reset()        {            //ConnectId = 0;            this.RecevieEventArgs.UserToken = null;            this.Flag = SocketFlag.Idle;        }        private void Close()        {            this.Reset();            LogHelper.Debug(ConnectId + " reset");            this._pool.ReleaseConnection(this);        }    }
View Code