首页 > 代码库 > FCL 系列 - 4. FCL.Net.dll
FCL 系列 - 4. FCL.Net.dll
使用.NET 的 Socket 对象,Select 模型。
自定义封包: 包长度+消息体
客户端-服务端架构,有心跳包机制。
客户端源代码:
using System;using System.Collections.Generic;using System.Text;using System.Net;using System.Net.Sockets;using System.Threading;using System.Windows.Forms;using Wagwei.FCL.Net.Implementation;namespace Wagwei.FCL.Net.Socket{ /// <summary> Socket 客户端类<remarks> /// <para></para> /// </remarks> /// </summary> [DeveloperInfo("Wagwei")] public class ClientOp { /* ###### 客户端基础信息 ###### */ public string m_sServerIP { get; set; } public int m_nPort { get; set; } public string m_sUserName { get; set; } public string m_sUserPwd { get; set; } /* ###### Socket客户端 ###### */ public System.Net.Sockets.Socket m_socketClient; /* ###### 接受消息 ###### */ Thread m_threadRecv;//线程 ManualResetEvent m_mreReceive = new ManualResetEvent(false); ManualResetEvent m_mreReceiveDone1 = new ManualResetEvent(false); ManualResetEvent m_mreReceiveDone2 = new ManualResetEvent(false); public int m_nReceivePollingInterval = 100;//接受消息的轮询间隔 /* ###### 自动连接 ###### */ Thread m_threadAutoConnect;//线程 ManualResetEvent m_mreAutoConnect = new ManualResetEvent(false); public int m_nAutoConnectPollingInterval = 5000;//断线重连的轮询间隔 int m_nKeepAliveSeconds = 20;//30;//心跳允许时间 DateTime m_datatimeKeepAlive = DateTime.Now; //心跳时间 /* ###### 消息事件 ###### */ public delegate void DelegateHaveText(Text text); public event DelegateHaveText m_eventHaveText; Queue<Text> _queueHaveText; object _objLockQueueHaveText; Thread m_threadHaveText; ManualResetEvent m_mreHaveText = new ManualResetEvent(false); /* ###### 提示信息事件 ###### */ public delegate void DelegateInfo(string info); public event DelegateInfo m_eventInfo; static ClientOp() { //Wagwei.FCL.Core.Implementation.Internal.WagLicense.Check(); } public ClientOp() { _queueHaveText = new Queue<Text>(); _objLockQueueHaveText = new object(); } public ClientOp(string serverIP, int port, string userName, string userPwd) : this() { m_sServerIP = serverIP; m_nPort = port; m_sUserName = userName; m_sUserPwd = userPwd; } /// <summary> /// 身份验证 /// </summary> /// <param name="args">ServerIp + UserName + UserPwd + Port</param> /// <returns></returns> public void Logon(params string[] args) { if (args.Length > 3) { m_sServerIP = args[0]; m_sUserName = args[1]; m_sUserPwd = args[2]; m_nPort = int.Parse(args[3]); } IPAddress iPAdrr = IPAddress.Parse(m_sServerIP); IPEndPoint iPEndP = new IPEndPoint(iPAdrr, m_nPort); try { m_socketClient = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); m_socketClient.Connect(iPEndP); //在Winsock2中定义了许多Socket IO控制类型, 其中有一项: KeepAliveValues,控制TCP keep-alive数据包的发送以及发送间隔 //默认值为2个小时, 当间隔时间超过这个设定后, socket就会连续发送5次连接信号, 若客户端无回应, 则此 client socket会断开 //我们可以如下调整这个间隔时间: //m_SocketClient.IOControl(IOControlCode.KeepAliveValues, BitConverter.GetBytes(7200), null);//24 * 60 * 60 this.m_socketClient.ReceiveBufferSize = SocketCommon.m_ReceiveBufferSize; this.m_socketClient.SendBufferSize = SocketCommon.m_SendBufferSize; Text text = new Text(); text.m_sFrom = m_sUserName; text.m_asTo = new string[] { "server" }; text.m_enumCmdType = CMD_SERVER.LOGON; text.m_sUserPwd = m_sUserPwd; text.m_datetimeSend = DateTime.Now; m_socketClient.Send(BinarySerializer.Serialize(text, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize)); } catch (SocketException ex) { throw new Exception(ex.Message); } } /// <summary> /// 自动连接 /// </summary> public void EnabledAutoConnect(bool flg) { if (flg) { if (this.m_threadAutoConnect == null) { this.m_threadAutoConnect = new Thread(new ThreadStart(delegate { while (true) { this.m_mreAutoConnect.WaitOne(); //获取一个值, 该值指示Socket是在上次Send还是Receive操作时连接到远程主机 //if (this.m_socketClient.Connected) //{ // Thread.Sleep(this.m_nReconnectPollingInterval); //} double nDelay = DateTime.Now.Subtract(this.m_datatimeKeepAlive).TotalSeconds; if ((nDelay < this.m_nKeepAliveSeconds) && (this.m_socketClient != null)) { if (this.m_socketClient.Connected) { //发送心跳 Text text = new Text(); text.m_sFrom = this.m_sUserName; text.m_asTo = new string[] { "server" }; text.m_enumCmdType = CMD_SERVER.KEEP_ALIVE; text.m_datetimeSend = DateTime.Now; if (this.SendText(text) != 1) { this.m_socketClient = null; } // } else { this.m_socketClient = null; } // Thread.Sleep(this.m_nAutoConnectPollingInterval); } else { //string sInfo = string.Format("{0} > {1}", nDelay, this.m_nKeepAliveSeconds); //MessageBox.Show("kkk " + sInfo); //System.Diagnostics.Debug.WriteLine(sInfo); // this.m_mreReceiveDone1.Set(); this.m_mreReceiveDone2.Set(); this.m_datatimeKeepAlive = DateTime.Now; try { this.Logon(); this.SetEventInfo("成功连接到服务器端 0x10001000"); } catch (Exception ex) { string sErr = "AutoConnect: " + ex.Message; this.SetEventInfo(sErr); SocketCommon.AppendLog("receive.log", sErr); } } } })); this.m_threadAutoConnect.Start(); this.m_mreAutoConnect.Set(); } } else { this.m_mreAutoConnect.Reset(); if (this.m_threadAutoConnect != null) { this.m_threadAutoConnect.Abort(); this.m_threadAutoConnect = null; } } } /// <summary> /// 启动或停止接受消息 /// </summary> public void EnabledReceive(bool flag) { if (flag) { if (this.m_threadRecv != null) { if (!this.m_threadRecv.IsAlive) { goto END; } else { this.m_mreReceive.Set(); return; } } END: this.m_mreReceive.Reset(); if (this.m_threadRecv != null) { m_threadRecv.Abort(); } m_threadRecv = null; m_threadRecv = new Thread(ReceiveText); m_threadRecv.Start(); this.m_mreReceive.Set(); return; } else { this.m_mreReceive.Reset(); } } /// <summary> /// 启动或停止接受消息通知 /// </summary> /// <param name="flag"></param> public void EnabledHaveText(bool flag) { if (flag) { if (this.m_threadHaveText != null) { if (!this.m_threadHaveText.IsAlive) { goto END; } else { this.m_mreHaveText.Set(); return; } } END: this.m_mreHaveText.Reset(); if (this.m_threadHaveText != null) { this.m_threadHaveText.Abort(); } m_threadHaveText = null; m_threadHaveText = new Thread(HaveText); m_threadHaveText.Start(); this.m_mreHaveText.Set(); return; } else { this.m_mreHaveText.Reset(); } } void HaveText() { while (true) { this.m_mreHaveText.WaitOne(); Text? text = null; lock (_objLockQueueHaveText) { if (_queueHaveText.Count > 0) { text = _queueHaveText.Dequeue(); } } if (text == null) { Thread.Sleep(5); } else { if (this.m_eventHaveText != null) { this.m_eventHaveText((Text)text); } } } } /// <summary> /// 异步发送消息, 发送速度不可太快 /// </summary> public void SendTextAsync(Text text) { if (!this.m_socketClient.Connected) { throw new Exception("SendTextAsync: 与服务器已断开连接"); } byte[] byteData =http://www.mamicode.com/ BinarySerializer.Serialize(text, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize); try { this.m_socketClient.BeginSend(byteData, 0, byteData.Length, 0, new AsyncCallback(SendCallback), this.m_socketClient); } catch (Exception ex) { string sErr = "SendTextAsync: " + ex.Message; this.SetEventInfo(sErr); SocketCommon.AppendLog("exceptions.log", sErr); } } /// <summary> /// 同步发送消息, 发送速度不可太快 /// <remarks> /// 返回值: -1 未连接, 0 暂时无法发送, 1 发送成功, 2 发送失败 /// </remarks> /// </summary> public int SendText(Text text) { if (this.m_socketClient != null) { if (this.m_socketClient.Connected) { if (this.m_socketClient.Poll(1000, SelectMode.SelectWrite)) { try { m_socketClient.Send(BinarySerializer.Serialize( text, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize)); //返回值, 已发送到 Socket 的字节数。 } catch (Exception ex) { string sErr = "SendText: " + ex.Message; this.SetEventInfo(sErr); SocketCommon.AppendLog("exceptions.log", sErr); return 2; } return 1; } } return 0; } return -1; } void SendCallback(IAsyncResult ar) { try { System.Net.Sockets.Socket client = (System.Net.Sockets.Socket)ar.AsyncState; client.EndSend(ar); } catch (Exception ex) { throw new Exception(ex.Message); } } void ReceiveText() { while (true) { this.m_mreReceive.WaitOne(); if (this.m_socketClient != null) { try { if (m_socketClient.Connected) { if (m_socketClient.Poll(1000, SelectMode.SelectRead)) { //注: Receive时返回的字节数,不一定等于要求读取的字节数。 //系统只是在数据包到达时,尽可能的读取要求的字节数 RecvState state = new RecvState(); state.m_socketWork = m_socketClient; m_mreReceiveDone1.Reset(); m_mreReceiveDone2.Reset(); m_socketClient.BeginReceive(state.m_arraySize, 0, SocketCommon.m_PacketLenSize, SocketFlags.None, new AsyncCallback(ReceiveCallBackSize), state); m_mreReceiveDone1.WaitOne(); state.m_nLenAllData = BitConverter.ToInt32(state.m_arraySize, 0); m_socketClient.BeginReceive(state.m_arrayData, 0, state.m_nLenAllData, SocketFlags.None, new AsyncCallback(ReceiveCallbackData), state); m_mreReceiveDone2.WaitOne(); /* @@@@@@ 有新消息 @@@@@@ */ byte[] arrayRealData = http://www.mamicode.com/new byte[state.m_nLenAllData]; for (int i = 0; i < state.m_nLenAllData; i++) { arrayRealData[i] = state.m_arrayData[i]; } Text text = (Text)BinarySerializer.Deserialize(arrayRealData); if (text.m_enumCmdType.Equals(CMD_SERVER.TEXT)) { //-------------------------------------------------------- /*被NEW替换*/ /* //OLD if (this.m_eventHaveText != null) { this.m_eventHaveText(text);//事件的阻塞会导致下次心跳时间的迟来 } */ //NEW if (this.m_eventHaveText != null) { lock (_objLockQueueHaveText) { _queueHaveText.Enqueue(text); } } //-------------------------------------------------------- } else if (text.m_enumCmdType.Equals(CMD_SERVER.KEEP_ALIVE)) { this.m_datatimeKeepAlive = DateTime.Now; //text.m_datetimeSend;如果服务器时间与本机时间不一致则有问题 } /* @@@@@@@@@@@@@@@@@@@@@@ */ } } else { //不可设定异常 //bug //throw new Exception("连接已断开"); // System.Threading.Thread.Sleep(this.m_nReceivePollingInterval); continue; } } catch (Exception ex) { string sErr = "ReceiveText: " + ex.Message; this.SetEventInfo(sErr); SocketCommon.AppendLog("receive.log", sErr); //break;//bug 不可退出循环 continue; } } else { System.Threading.Thread.Sleep(this.m_nReceivePollingInterval); continue; } } } private void ReceiveCallBackSize(IAsyncResult ar) { RecvState state = (RecvState)ar.AsyncState; System.Net.Sockets.Socket client = state.m_socketWork; int bytesRead = client.EndReceive(ar); if (bytesRead > 0) { state.m_nLenSize += bytesRead; if (state.m_nLenSize == SocketCommon.m_PacketLenSize) { m_mreReceiveDone1.Set(); state.m_nLenAllData = BitConverter.ToInt32(state.m_arraySize, 0); } else { m_socketClient.BeginReceive(state.m_arraySize, state.m_nLenSize, SocketCommon.m_PacketLenSize - state.m_nLenSize, SocketFlags.None, new AsyncCallback(ReceiveCallbackData), state); } } else if (bytesRead == 0) { //m_mreReceiveDone1.Set(); string sErr = "ReceiveCallBackSize: the peer has performed an orderly shutdown"; //throw new Exception(sErr); this.SetEventInfo(sErr); SocketCommon.AppendLog("exceptions.log", sErr); } else { //m_mreReceiveDone1.Set(); string sErr = "ReceiveCallBackSize: an error occurred"; //throw new Exception(sErr); this.SetEventInfo(sErr); SocketCommon.AppendLog("exceptions.log", sErr); } } private void ReceiveCallbackData(IAsyncResult ar) { try { RecvState state = (RecvState)ar.AsyncState; System.Net.Sockets.Socket client = state.m_socketWork; int bytesRead = client.EndReceive(ar); if (bytesRead > 0) { state.m_nlenData += bytesRead; if (state.m_nlenData != state.m_nLenAllData) { client.BeginReceive(state.m_arrayData, state.m_nlenData, state.m_nLenAllData - state.m_nlenData, SocketFlags.None, new AsyncCallback(ReceiveCallbackData), state); } else { m_mreReceiveDone2.Set(); } } else if (bytesRead == 0) { //m_mreReceiveDone2.Set(); string sErr = "ReceiveCallbackData: the peer has performed an orderly shutdown"; //throw new Exception(sErr); this.SetEventInfo(sErr); SocketCommon.AppendLog("exceptions.log", sErr); } else { //m_mreReceiveDone2.Set(); string sErr = "ReceiveCallbackData: an error occurred"; //throw new Exception(sErr); this.SetEventInfo(sErr); SocketCommon.AppendLog("exceptions.log", sErr); } } catch (Exception e) { //m_MRE_ReceiveDone2.Set(); string sErr = "ReceiveCallbackData: " + e.Message; //throw new Exception(sErr); this.SetEventInfo(sErr); SocketCommon.AppendLog("exceptions.log", sErr); } } /// <summary> /// 错误通知 /// </summary> private void SetEventInfo(string info) { if (this.m_eventInfo != null) { this.m_eventInfo(info); } } /// <summary> /// 关闭 /// </summary> public void Close() { this.EnabledAutoConnect(false); this.m_mreReceive.Reset(); if (this.m_threadRecv != null) { this.m_threadRecv.Abort(); } this.m_mreHaveText.Reset(); if (this.m_threadHaveText != null) { this.m_threadHaveText.Abort(); } if (this.m_socketClient != null) { this.m_socketClient.Close(); } } }}
服务端源代码:
using System;using System.Collections.Generic;using System.Text;using System.Net.Sockets;using System.Net;using System.Threading;using System.Windows.Forms;using Wagwei.FCL.Net.Implementation;namespace Wagwei.FCL.Net.Socket{ //特别说明: //(1) 当客户端连接过多, 消息转发量过大时候, 会导致传输的数据包严重延时, 这也导致了心跳异常, 此时禁用心跳则不会持续报错 //(2) //高并发指标: 1. 并发连接数 2. 每秒可创建多少连接 3. 其他 /// <summary>Socket服务端类</summary> [DeveloperInfo("Wagwei")] public class ServerOp { public int m_nListeningInterval = 1200;//监听时间间隔, 建议大于500ms public Dictionary<string, System.Net.Sockets.Socket> m_dicSocket; Thread m_threadListener; //监听线程 Thread m_threadReceiver; //接受(直接转发)线程 Thread m_threadSend; //间接转发线程 ManualResetEvent m_mreListener = new ManualResetEvent(false); ManualResetEvent m_mreReceiver = new ManualResetEvent(false); ManualResetEvent m_mreSend = new ManualResetEvent(false); System.Net.Sockets.Socket m_socketListener; //监听Socket List<Text> m_listText;//未处理的信息 //List<User> m_listUser;//在线所有客户端 public delegate void DelegateClientAccount (string clientName,string type); public event DelegateClientAccount m_eventClientAccount; // ****** 信息(to server) ****** public delegate void DelegateServerHaveText(Text text); public event DelegateServerHaveText m_eventServerHaveText; Queue<Text> m_queueHaveText; object m_objLockQueueHaveText; Thread m_threadHaveText; ManualResetEvent m_mreHaveText = new ManualResetEvent(false); // ****** 提示 ****** public delegate void DelegateInfo(string info); public event DelegateInfo m_eventInfo; // ****** 验证 ****** public delegate bool DelegateVerify(); public event DelegateVerify m_eventVerify; static ServerOp() { //Wagwei.FCL.Core.Implementation.Internal.WagLicense.Check(); } public ServerOp() { m_queueHaveText = new Queue<Text>(); m_objLockQueueHaveText = new object(); } /// <summary>初始化</summary> /// <param name="ip">IP地址</param> /// <param name="port">端口号</param> public void Init(string ip, int port) { IPEndPoint iPEndP = new IPEndPoint(IPAddress.Parse(ip), port);//port为0即为随机端口 this.m_socketListener = new System.Net.Sockets.Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); this.m_socketListener.Bind(iPEndP); this.m_socketListener.Listen(120);//挂起连接数 this.m_socketListener.ReceiveBufferSize = SocketCommon.m_ReceiveBufferSize; this.m_socketListener.SendBufferSize = SocketCommon.m_SendBufferSize; this.m_dicSocket = new Dictionary<string, System.Net.Sockets.Socket>(0); this.m_listText = new List<Text>(); //this.m_listUser = new List<User>(); //监听 this.m_threadListener = new Thread(Listener); this.m_threadListener.Name = "thread_fcl_net_socket_listener"; this.m_threadListener.Start(); //接受消息 this.m_threadReceiver = new Thread(ReceiveText); this.m_threadReceiver.Name = "thread_fcl_net_socket_receiver"; this.m_threadReceiver.Start(); //转发消息 this.m_threadSend = new Thread(SwitchSendText); this.m_threadSend.Name = "thread_fcl_net_socket_send"; this.m_threadSend.Start(); } public void EnabledListener(bool enable) { if (enable) { this.m_mreListener.Set(); } else { this.m_mreListener.Reset(); } } public void EnabledReceiver(bool enabled) { if (enabled) { this.m_mreReceiver.Set(); } else { this.m_mreReceiver.Reset(); } } public void EnabledSend(bool enabled) { if (enabled) { this.m_mreSend.Set(); } else { this.m_mreSend.Reset(); } } #region to server /// <summary>启动或停止接受消息通知</summary> /// <param name="flag"></param> public void EnabledHaveText(bool flag) { if (flag) { if (this.m_threadHaveText != null) { if (!this.m_threadHaveText.IsAlive) { goto END; } else { this.m_mreHaveText.Set(); return; } } END: this.m_mreHaveText.Reset(); if (this.m_threadHaveText != null) { this.m_threadHaveText.Abort(); } m_threadHaveText = null; m_threadHaveText = new Thread(HaveText); m_threadHaveText.Start(); this.m_mreHaveText.Set(); return; } else { this.m_mreHaveText.Reset(); } } void HaveText() { while (true) { this.m_mreHaveText.WaitOne(); Text? text = null; lock (m_objLockQueueHaveText) { if (m_queueHaveText.Count > 0) { text = m_queueHaveText.Dequeue(); } } if (text == null) { Thread.Sleep(5); } else { if (this.m_eventServerHaveText != null) { this.m_eventServerHaveText((Text)text); } } } } #endregion /// <summary>监听连接</summary> void Listener() { //System.Net.Sockets.Socket currentSocket = null; while (true) { this.m_mreListener.WaitOne(); try { if (this.m_socketListener.Poll(1000, SelectMode.SelectRead)) { System.Net.Sockets.Socket client = this.m_socketListener.Accept();//接受一个请求 //currentSocket = client; client.SendBufferSize = SocketCommon.m_SendBufferSize; client.ReceiveBufferSize = SocketCommon.m_ReceiveBufferSize; if (!client.Poll(1000, SelectMode.SelectRead)) { client.Disconnect(false); client.Close(); goto END; } byte[] byteData = http://www.mamicode.com/new byte[SocketCommon.m_PacketSize]; byte[] byteSize = new byte[SocketCommon.m_PacketLenSize]; int lenSize = client.Receive(byteSize, SocketCommon.m_PacketLenSize, SocketFlags.None); int lenData = http://www.mamicode.com/BitConverter.ToInt32(byteSize, 0); client.Receive(byteData, lenData, SocketFlags.None); byte[] arrayRealData = http://www.mamicode.com/new byte[lenData]; for (int i = 0; i < lenData; i++) { arrayRealData[i] = byteData[i]; } Text text = (Text)BinarySerializer.Deserialize(arrayRealData); //身份验证 if (text.m_enumCmdType.Equals(CMD_SERVER.LOGON) && text.m_sFrom != "server" && (m_eventVerify == null || m_eventVerify())) { } else { client.Disconnect(false); client.Close(); goto END; } lock (this.m_dicSocket) { //重复登录 if (this.m_dicSocket.ContainsKey(text.m_sFrom)) { if (this.m_dicSocket[text.m_sFrom].Poll(1000, SelectMode.SelectWrite)) { Text textToClient = new Text(); textToClient.m_sFrom = "server"; textToClient.m_asTo = new string[] { text.m_sFrom }; textToClient.m_enumCmdType = CMD_LOGON_RESULT.FORCED_OFFLINE; textToClient.m_objContent = "被迫下线"; this.m_dicSocket[text.m_sFrom].Send(BinarySerializer.Serialize(textToClient, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize)); } this.m_dicSocket[text.m_sFrom].Disconnect(false); this.m_dicSocket[text.m_sFrom].Close(); this.m_dicSocket.Remove(text.m_sFrom); //User userFound = this.m_listUser.Find(delegate(User user) { return user.m_sUserName == text.m_sFrom; }); //if (string.IsNullOrEmpty(userFound.m_sUserName))//struct为值类型, 不能与null比较 //{ // this.m_listUser.Remove(userFound); //} } //非重复登录 this.m_dicSocket.Add(text.m_sFrom, client); if (m_eventClientAccount != null) { m_eventClientAccount(text.m_sFrom, "logon"); } //立即发送心跳给客户端 //... 或者客户端主动发送心跳 } //用户登记 //this.m_listUser.Add(new User(text.m_sFrom, DateTime.Now)); //服务器发送一条成功连接信息给客户端 //if (client.Poll(1000, SelectMode.SelectWrite)) //{ // Text textToClient = new Text(); // textToClient._from = "server"; // textToClient._to = new string[] { text._from }; // textToClient._cmd_type = CMD_LOGON_RESULT.SUCCESS; // textToClient._content = "成功连接到服务器"; // client.Send(BinarySerializer.Serialize(textToClient, Common.m_PacketLenSize, Common.m_PacketLenSize)); //} continue; } else { System.Threading.Thread.Sleep(m_nListeningInterval); continue; } } catch (Exception ex) { string s = ex.ToString(); this.SetEventOnRunError(ex.Message); SocketCommon.AppendLog("listener.log", ex.ToString()); goto END; } END: System.Threading.Thread.Sleep(m_nListeningInterval); } } /// <summary>接受消息</summary> void ReceiveText() { while (true) { this.m_mreReceiver.WaitOne(); System.Net.Sockets.Socket socketCurrent = null; try { List<System.Net.Sockets.Socket> tmp = new List<System.Net.Sockets.Socket>(); //IEnumerator enumerator = this.dicSocket.Values.GetEnumerator(); //while (enumerator.MoveNext()){tmp.Add((Socket)enumerator.Current);} lock (this.m_dicSocket) { foreach (System.Net.Sockets.Socket s in this.m_dicSocket.Values) { tmp.Add(s); } } if (tmp.Count > 0) { System.Net.Sockets.Socket.Select(tmp, null, null, 1000);//select foreach (System.Net.Sockets.Socket socketOne in tmp) { socketCurrent = socketOne; byte[] bytesData = http://www.mamicode.com/new byte[SocketCommon.m_PacketSize]; byte[] bytesSize = new byte[SocketCommon.m_PacketLenSize]; int DataSize; int recvSizeRtn = 0; recvSizeRtn += socketOne.Receive(bytesSize, SocketCommon.m_PacketLenSize, SocketFlags.None); if (recvSizeRtn > 0) { while (recvSizeRtn != SocketCommon.m_PacketLenSize) { recvSizeRtn += socketOne.Receive(bytesSize, recvSizeRtn, SocketCommon.m_PacketLenSize - recvSizeRtn, SocketFlags.None); } DataSize = BitConverter.ToInt32(bytesSize, 0); } else if (recvSizeRtn == 0) { throw new Exception("the peer has performed an orderly shutdown"); } else { throw new Exception("an error occurred"); } int recvDataRtn = 0; recvDataRtn += socketOne.Receive(bytesData, DataSize, SocketFlags.None); if (recvDataRtn > 0) { while (recvDataRtn != DataSize) { recvDataRtn += socketOne.Receive(bytesData, recvDataRtn, DataSize - recvDataRtn, SocketFlags.None); } byte[] aRealData = http://www.mamicode.com/new byte[DataSize]; for (int i = 0; i < DataSize; i++) { aRealData[i] = bytesData[i]; } Text text = (Text)BinarySerializer.Deserialize(aRealData); //当即转发, 否则保存到listText中 List<System.Net.Sockets.Socket> listSocket = new List<System.Net.Sockets.Socket>(); int count = text.m_asTo.Length; lock (this.m_dicSocket) { for (int i = 0; i < count; i++) { if (this.m_dicSocket.ContainsKey(text.m_asTo[i])) { listSocket.Add(this.m_dicSocket[text.m_asTo[i]]); } else { //如果接收方为服务器本身则另做处理 if (text.m_asTo[i] == "server") { if (this.m_eventServerHaveText != null) { //this.m_eventServerHaveText(text); lock (m_objLockQueueHaveText) { m_queueHaveText.Enqueue(text); } }//end if if (text.m_enumCmdType.Equals(CMD_SERVER.TEXT)) { }//end if else if (text.m_enumCmdType.Equals(CMD_SERVER.KEEP_ALIVE)) { //if (text.m_sContentRemark == "wag") //{ // MessageBox.Show("wag"); //} Text textKeepAlive = new Text(); textKeepAlive.m_sFrom = "server"; textKeepAlive.m_asTo = new string[] { text.m_sFrom }; textKeepAlive.m_enumCmdType = CMD_SERVER.KEEP_ALIVE; textKeepAlive.m_datetimeSend = DateTime.Now; socketOne.Send(BinarySerializer.Serialize(textKeepAlive, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize)); }//end else if }//end if else { //flg 2014-02-23 19:31 //lock (this.m_listText) //{ // Text _text = new Text(); // _text.m_sFrom = text.m_sFrom; // _text.m_sFromIp = text.m_sFromIp; // _text.m_asTo = new string[] { text.m_asTo[i] }; // _text.m_datetimeSend = text.m_datetimeSend; // _text.m_typeContent = text.m_typeContent; // _text.m_objContent = text.m_objContent; // _text.m_sUserPwd = text.m_sUserPwd; // _text.m_enumCmdType = text.m_enumCmdType; // this.m_listText.Add(_text); //} }//end else }//end else }//end for }//end lock for (int i = 0; i < listSocket.Count; i++) { Text _text = new Text(); if (listSocket[i].Poll(1000, SelectMode.SelectWrite)) { _text.m_sFrom = text.m_sFrom; _text.m_sFromIp = text.m_sFromIp; _text.m_asTo = new string[] { text.m_asTo[i] }; _text.m_datetimeSend = text.m_datetimeSend; _text.m_typeContent = text.m_typeContent; _text.m_objContent = text.m_objContent; _text.m_sContentRemark = text.m_sContentRemark; _text.m_sUserPwd = text.m_sUserPwd; _text.m_enumCmdType = text.m_enumCmdType; listSocket[i].Send(BinarySerializer.Serialize(_text, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize)); continue; }//end if }//end for } else if (recvDataRtn == 0) { throw new Exception("the peer has performed an orderly shutdown"); } else { throw new Exception("an error occurred"); } }//end foreach } else { System.Threading.Thread.Sleep(5); continue; } } catch (Exception ex) { string s = ex.ToString(); this.SetEventOnRunError("ReceiveText: " + ex.ToString()); this.RemoveAbnormal(socketCurrent); SocketCommon.AppendLog("receive.log", ex.ToString()); continue; } } } public void AppendOneText(Text text) { lock (this.m_listText) { this.m_listText.Add(text); } } /// <summary>转发消息</summary> void SwitchSendText() { while (true) { this.m_mreSend.WaitOne(); System.Net.Sockets.Socket currentSocket = null; try { if (this.m_listText.Count > 0) { lock (this.m_listText) { List<Text> listRemove = new List<Text>(); foreach (Text text in this.m_listText) { int count = text.m_asTo.Length; System.Net.Sockets.Socket socket = null; for (int i = 0; i < count; i++) { this.m_dicSocket.TryGetValue(text.m_asTo[i], out socket); if (socket != null) { currentSocket = socket; if (socket.Poll(2000, SelectMode.SelectWrite)) { Text _text = new Text(); _text.m_sFrom = text.m_sFrom; _text.m_sFromIp = text.m_sFromIp; _text.m_asTo = new string[] { text.m_asTo[i] }; _text.m_datetimeSend = text.m_datetimeSend; _text.m_typeContent = text.m_typeContent; _text.m_objContent = text.m_objContent; _text.m_sContentRemark = text.m_sContentRemark; _text.m_sUserPwd = text.m_sUserPwd; _text.m_enumCmdType = text.m_enumCmdType; socket.Send(BinarySerializer.Serialize(_text, SocketCommon.m_PacketLenSize, SocketCommon.m_PacketLenSize)); } } } listRemove.Add(text); } foreach (Text t in listRemove) { m_listText.Remove(t); } listRemove.Clear(); } } else { System.Threading.Thread.Sleep(5); continue; } } catch (Exception ex) { string s = ex.ToString(); this.SetEventOnRunError(ex.Message); this.RemoveAbnormal(currentSocket); SocketCommon.AppendLog("send.log", s); continue; } } } /// <summary> /// 移除不正常的socket对象, 并且将对应的用户移除 /// </summary> private void RemoveAbnormal(System.Net.Sockets.Socket currentSocket) { //lock (this.m_dicSocket) //{ // foreach (KeyValuePair<string, System.Net.Sockets.Socket> item in this.m_dicSocket) // { // if (item.Value =http://www.mamicode.com/= currentSocket)>// { // this.m_dicSocket.Remove(item.Key); // User userFind = this.m_listUser.Find(delegate(User user) { return user.m_sUserName == item.Key; }); // if (userFind != null) // { // this.m_listUser.Remove(userFind); // } // break; // } // } //} lock (this.m_dicSocket) { List<string> listKey = new List<string>(); foreach (KeyValuePair<string, System.Net.Sockets.Socket> item in this.m_dicSocket) { if (item.Value =http://www.mamicode.com/= currentSocket) { listKey.Add(item.Key); } } foreach (string key in listKey) { this.m_dicSocket.Remove(key); //User userFind = this.m_listUser.Find(delegate(User user) { return user.m_sUserName == key; }); //if (userFind != null) //{ // this.m_listUser.Remove(userFind); //} if (m_eventClientAccount != null) { m_eventClientAccount(key,"logoff"); } } } } /// <summary> /// 错误通知 /// </summary> private void SetEventOnRunError(string error) { if (this.m_eventInfo != null) { this.m_eventInfo(error); } } /// <summary> /// 关闭 /// </summary> public void Close() { this.m_mreListener.Reset(); if (this.m_threadListener != null) { this.m_threadListener.Abort(); } this.m_mreReceiver.Reset(); if (this.m_threadReceiver != null) { this.m_threadReceiver.Abort(); } this.m_mreSend.Reset(); if (this.m_threadSend != null) { this.m_threadSend.Abort(); } this.m_mreHaveText.Reset(); if (this.m_threadHaveText != null) { this.m_threadHaveText.Abort(); } if (this.m_socketListener != null) { this.m_socketListener.Close(); } this.m_listText.Clear(); //this.m_listUser.Clear(); this.m_dicSocket.Clear(); } }}
FCL 系列 - 4. FCL.Net.dll
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。