首页 > 代码库 > C# socket 实现消息中心向消息平台 转发消息 (修改)

C# socket 实现消息中心向消息平台 转发消息 (修改)

using System;using System.Collections.Generic;using System.Configuration;using System.Linq;using System.Net;using System.Net.Sockets;using System.Text;using System.Threading;using System.Threading.Tasks;using Jinher.AMP.SNS.Chat.Client;using Jinher.AMP.SNS.Chat.Deploy.CustomDTO;using Jinher.AMP.SNS.Chat.Packet;using Jinher.AMP.SNS.Chat.Utility;namespace Jinher.AMP.SNS.Chat.SocketManager{    /// <summary>    /// 消息服务基类    /// </summary>    public abstract class MessageCenter    {        /// <summary>        /// 表示是否连接上        /// </summary>        public bool IsConnected { get; protected set; }        /// <summary>        /// 表示是否进行第一次握手协议        /// </summary>        public bool IsHandStake { get; protected set; }        /// <summary>        /// 表示是否注册APP        /// </summary>        public bool IsRegisterApp { get; protected set; }        private string _ip = string.Empty;        private int _port = 0;        private object lockObject = new object();        List<byte[]> byteList = new List<byte[]>();        //通知        private SocketAsyncEventArgsPool pool = null;        private BufferManager m_bufferManager = null;        /// <summary>        /// 线程休眠时间(毫秒)        /// </summary>        private int threadSleepTime = 5000;        //连接对象        private Socket _socket = null;        private int maxNumber = 10;        protected Action<List<MessageTemplate>> MyAction = null;        protected Action<WebMessageDTO> receiveAction = null;        public MessageCenter()        {            //初始化,获取Host和Port            _ip = System.Configuration.ConfigurationManager.AppSettings["serverip"];            _port = int.Parse(System.Configuration.ConfigurationManager.AppSettings["port"]);            maxNumber = int.Parse(System.Configuration.ConfigurationManager.AppSettings["maxNumber"] ?? "10");            string timer = System.Configuration.ConfigurationManager.AppSettings["threadSleepTime"] ?? "5000";            threadSleepTime = int.Parse(timer);            //初始化连接对象            InitSocket();        }        /// <summary>        /// 带委托的构造函数        /// </summary>        /// <param name="action"></param>        public MessageCenter(Action<List<MessageTemplate>> action)            : this()        {            MyAction = action;        }        /// <summary>        /// 启动消息中心        /// </summary>        public void Run()        {            try            {                LogHelper.WriteLog("启动服务");                //第一次连接服务器                Connect();                //开始接受                AccpetTo();                //开始处理接受的数据                ProcessAccpet();                //保持连接                KeepConnect();                //推送消息队列                //SendAsync();                Task.Factory.StartNew(new Action(() =>                {                    client = ChatClient.Instance(ConfigurationManager.AppSettings["notificationUri"]);                    client.OnMsgReceiveed = null;                    client.OnMsgReceiveed += client_OnMsgReceiveed;                }));            }            catch (SocketException exception)            {                //socket出错                LogHelper.WriteLog("socket创建连接出错,尝试重新启动", exception);                Thread.Sleep(5000);                this.Run();            }            catch (Exception ex)            {                //未知错误                LogHelper.WriteLog("启动时发生未知错误!", ex);                throw;            }            finally            {                //重置连接                //ResetConnect();            }        }        #region 连接服务器        /// <summary>        /// 连接服务器        /// </summary>        protected void Connect()        {            try            {                if (_socket == null)                {                    InitSocket();                }                LogHelper.WriteLog("开始建立连接");                _socket.Connect(IPAddress.Parse(_ip), _port);                LogHelper.WriteLog("连接已经建立");            }            catch (Exception ex)            {                LogHelper.WriteLog("出错了", ex);                LogHelper.WriteLog("建立连接失败,等待重新建立连接");                Thread.Sleep(threadSleepTime);                Connect();                return;            }            //先进行第一次握手协议            HandShakeCmdOp();            //注册app            RegisterApp();            IsConnected = true;        }        /// <summary>        /// 先进行第一次握手协议        /// </summary>        private void HandShakeCmdOp()        {            if (!IsHandStake)            {                LogHelper.WriteLog("开始第一次握手");                _socket.Send(pmsMessage.HandShakePacket());                IsHandStake = true;                LogHelper.WriteLog("成功握手");            }        }        /// <summary>        /// 注册app        /// </summary>        private void RegisterApp()        {            //            一级命令:XNS_ROUTER            //二级命令:REGISTER_SOCIAL_APP            //并且规定AppId:99999999            LogHelper.WriteLog("开始注册APP");            _socket.Send(pmsMessage.RegisterPacket());            IsRegisterApp = true;            LogHelper.WriteLog("注册成功");        }        /// <summary>        /// 初始化连接对象        /// </summary>        private void InitSocket()        {            _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);        }        #endregion        #region 发送消息        /// <summary>        /// 发送消息(异步)        /// </summary>        /// <param name="buffer"></param>        /// <param name="count">表示失败重新发送次数</param>        /// <returns></returns>        protected void SendAsync(byte[] buffer, int count = 0)        {            //先将该消息尝试发送,如果发送失败,则连接后继续发送            //byteList.Add(buffer);            //启动任务,发送消息            Task.Factory.StartNew(new Action(() =>            {                try                {                    if (_socket != null && IsConnected && _socket.Connected && !_isResetConnecting)                    {                        lock (lockObject)                        {                            if (_socket != null && IsConnected && _socket.Connected && !_isResetConnecting)                            {                                SendByInit(buffer);                            }                            else                            {                                ResetConnect();                                SendAsync(buffer);                            }                        }                    }                    else                    {                        ResetConnect();                        SendAsync(buffer);                    }                }                catch (Exception)                {                    //重发一次                    if (count == 0)                    {                        //表示失败重新发送                        SendAsync(buffer, count + 1);                    }                    else                    {                        ResetConnect();                        SendAsync(buffer);                    }                }                finally                {                }            }));        }        /// <summary>        /// 发送数据()        /// </summary>        /// <param name="buffer"></param>        private void SendByInit(byte[] buffer)        {            _socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(SendComplated), _socket);        }        /// <summary>        /// 发送消息回调函数        /// </summary>        /// <param name="ar"></param>        private void SendComplated(IAsyncResult async)        {            try            {                Socket skt = async.AsyncState as Socket;                if (skt.Connected)                {                    skt.EndSend(async);                }                LogHelper.WriteLog("发送成功");            }            catch (SocketException ex)            {                //日志文件            }        }        #endregion        #region 接受消息(方式一)        /// <summary>        /// 接受消息方式一        /// </summary>        public void AccpetOne()        {            LogHelper.WriteLog("开始建立接受消息(方式一)");            pool = new SocketAsyncEventArgsPool(10);            // 预先分配一个对象池            SocketAsyncEventArgs readWriteEventArg;            for (int i = 0; i < maxNumber; i++)            {                m_bufferManager = new BufferManager(1024 * 1024);                m_bufferManager.InitBuffer();                //初始化 SocketAsyncEventArgs                readWriteEventArg = new SocketAsyncEventArgs();                readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Receive_Completed);                readWriteEventArg.UserToken = new AsyncUserToken() { Manager = m_bufferManager };                // 从缓冲池分配一个字节缓冲区SocketAsyncEventArg对象                m_bufferManager.SetBuffer(readWriteEventArg);                // add SocketAsyncEventArg to the pool                pool.Push(readWriteEventArg);            }            //取出  监视            Receive();        }        private void Receive()        {            if (IsConnected && IsRegisterApp && IsHandStake)            {                SocketAsyncEventArgs readEventArgs = null;                readEventArgs = pool.Pop();                if (readEventArgs != null)                {                    ((AsyncUserToken)readEventArgs.UserToken).Socket = _socket;                    _socket.ReceiveAsync(readEventArgs);                }            }        }        private void Receive_Completed(object sender, SocketAsyncEventArgs e)        {            Receive();            ReceiveAsync(e);        }        private void ReceiveAsync(SocketAsyncEventArgs e)        {            AsyncUserToken token = (AsyncUserToken)e.UserToken;            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)            {                //echo the data received back to the client                e.SetBuffer(0, e.BytesTransferred);                //string txt = Encoding.Default.GetString(e.Buffer, 0, e.BytesTransferred);                //Console.WriteLine(tmp);                //通知                #region 收到消息平台发送来的消息                //client.Send();                if (e.BytesTransferred > 0)                {                    byte[] temp = new byte[e.BytesTransferred];                    Array.Copy(e.Buffer, 0, temp, 0, e.BytesTransferred);                    ProcessAccpet(temp, e.BytesTransferred);                }                #endregion                BufferManager bufferManager = ((AsyncUserToken)(e.UserToken)).Manager;                if (bufferManager == null)                {                    bufferManager = new BufferManager(1024 * 1024);                    e.UserToken = new AsyncUserToken() { Manager = bufferManager };                }                bufferManager.FreeBuffer(e);                bufferManager.SetBuffer(e);                //回收                pool.Push(e);            }        }        #endregion        #region 接受消息(方式二)        List<byte> testList = new List<byte>();        private void AccpetTo()        {            LogHelper.WriteLog("开始建立接受消息(方式二)");            Task.Factory.StartNew(new Action(() =>            {                try                {                    byte[] bytes = new byte[1024 * 1024];                    while (true)                    {                        int r = _socket.Receive(bytes);                        //通知                        #region 收到消息平台发送来的消息                        if (r > 0)                        {                            byte[] temp = new byte[r];                            Array.Copy(bytes, 0, temp, 0, r);                            lock ("我要给集合增加数据")                            {                                testList.AddRange(temp);                            }                            // ProcessAccpet(temp, r);                        }                        #endregion                        Array.Clear(bytes, 0, r);                    }                }                catch (SocketException ex)                {                    LogHelper.WriteLog("socket出错", ex);                    ResetConnect();                }                catch (Exception ex)                {                    LogHelper.WriteLog("出错", ex);                }            }));        }        #endregion        #region 收到的消息进行处理        /// <summary>        /// 消息处理中转        /// </summary>        /// <param name="p"></param>        /// <param name="r"></param>        private async void ProcessAccpet(byte[] p, int r)        {            //if (r > 10)            //{            //    LogHelper.WriteLog("收到正常消息");            //    var result = pmsMessage.ReversePacketMessage(p, r);            //    //发送给客户端            //    ClientSend(result);            //    //发送给测试端            //    if (MyAction != null)            //    {            //        MyAction(result.ToList());            //    }            //}            //else            //{            //    LogHelper.WriteLog(string.Format("收到心跳包"));            //}        }        //标示处理任务是否正在运行        private bool IsProcessAccpetRuning = false;        private void ProcessAccpet()        {            if (IsProcessAccpetRuning)            {//表示任务正在执行                return;            }            else            {                IsProcessAccpetRuning = true;            }            Task.Factory.StartNew(new Action(() =>            {                //IsProcessAccpetRuning = false;                //Thread.Sleep(100);                //IsProcessAccpetRuning = true;                while (IsProcessAccpetRuning)                {                    if (testList.Count > 0)                    {                        List<byte> tempList = new List<byte>();                        lock ("我要给集合增加数据")                        {                            tempList = testList.ToList();                            testList.Clear();                        }                        //启动一个任务来发送数据                        Task.Factory.StartNew(new Action(() =>                        {                            var result = pmsMessage.ReversePacketMessage(tempList, tempList.Count);                            //发送给客户端                            ClientSend(result);                            //发送给测试端                            if (MyAction != null && result != null && result.Count > 0)                            {                                MyAction(result.ToList());                            }                        }));                    }                    else                    {                        Thread.Sleep(50);                    }                }                LogHelper.WriteLog("新的处理请求,停止处理接受");            }));        }        #endregion        #region 保持连接        /// <summary>        /// 重置连接        /// </summary>        public void ResetConnect()        {            if (!_isResetConnecting)            {                lock (lockObject)                {                    if (!_isResetConnecting)                    {                        _isResetConnecting = true;                        LogHelper.WriteLog("=========重新连接==============");                        lock (lockObject)                        {                            this.Dispose();                            InitSocket();                        }                        Thread.Sleep(10000);                        Run();                        _isResetConnecting = false;                    }                }            }        }        private Timer timer = null;        /// <summary>        /// 保持连接        /// </summary>        private void KeepConnect()        {            timer = new Timer(new TimerCallback((o) =>            {                if (IsConnected)                {                    //发送心跳包                    SendAsync(pmsMessage.GetHeardbeat());                    LogHelper.WriteLog("已经发送心跳包");                }            }), null, new TimeSpan(0, 0, 0, 5), new TimeSpan(0, 0, 0, 10));        }        #endregion        #region 通知相关        private ChatClient client = null;// ChatClient.Instance(ConfigurationManager.AppSettings["notificationUri"]);        PacketConvert pmsMessage = new PacketConvert();        private bool _isResetConnecting = false;        /// <summary>        /// 请求发送消息        /// </summary>        /// <param name="msg"></param>        protected void client_OnMsgReceiveed(WebMessageDTO msg)        {            //请求发送消息            LogHelper.WriteLog(string.Format("开始请求发送消息:消息id=>{0} **********消息内容=>:{1}", msg.MsgId, msg.Msg != null ? msg.Msg.Message : ""));            byte[] bytes = pmsMessage.PacketMessage(msg);            SendAsync(bytes);            //发送给测试程序            if (receiveAction != null)            {                receiveAction(msg);            }        }        private void ClientSend(IList<MessageTemplate> message)        {            if (message != null && client != null)                client.SendAsync(message);        }        #endregion        #region 资源清理        /// <summary>        /// 清理资源        /// </summary>        public void Dispose()        {            IsRegisterApp = false;            IsHandStake = false;            IsConnected = false;            if (_socket.Connected == true)                _socket.Shutdown(SocketShutdown.Both);            _socket.Dispose();            _socket = null;            pool = null;            m_bufferManager = null;        }        #endregion    }}
View Code

 

C# socket 实现消息中心向消息平台 转发消息 (修改)