UdpClientProvider.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. using System;
  2. using System.Net.Sockets;
  3. using System.Net;
  4. using System.Threading;
  5. using System.Text;
  6. namespace IFramework.Net.Udp
  7. {
  8. internal class UdpClientProvider : UdpSocket, IDisposable, IUdpClientProvider
  9. {
  10. #region 定义变量
  11. private bool _isDisposed = false;
  12. private int bufferSizeByConnection = 4096;
  13. private int maxNumberOfConnections = 64;
  14. private Encoding encoding = Encoding.UTF8;
  15. private LockParam lParam = new LockParam();
  16. private ManualResetEvent mReset = new ManualResetEvent(false);
  17. private SocketTokenManager<SocketAsyncEventArgs> sendTokenManager = null;
  18. private SocketBufferManager sendBufferManager = null;
  19. #endregion
  20. #region 属性
  21. public int SendBufferPoolNumber { get { return sendTokenManager.Count; } }
  22. /// <summary>
  23. /// 接收回调处理
  24. /// </summary>
  25. public OnReceivedHandler ReceivedCallbackHandler { get; set; }
  26. /// <summary>
  27. /// 发送回调处理
  28. /// </summary>
  29. public OnSentHandler SentCallbackHandler { get; set; }
  30. /// <summary>
  31. /// 接收缓冲区回调
  32. /// </summary>
  33. public OnReceivedSegmentHandler ReceivedOffsetHandler { get; set; }
  34. #endregion
  35. #region public method
  36. public void Dispose()
  37. {
  38. Dispose(true);
  39. GC.SuppressFinalize(this);
  40. }
  41. protected virtual void Dispose(bool isDisposing)
  42. {
  43. if (_isDisposed) return;
  44. if (isDisposing)
  45. {
  46. DisposeSocketPool();
  47. SafeClose();
  48. _isDisposed = true;
  49. }
  50. }
  51. private void DisposeSocketPool()
  52. {
  53. sendTokenManager?.ClearToCloseArgs();
  54. if (sendBufferManager != null)
  55. {
  56. sendBufferManager.Clear();
  57. }
  58. }
  59. /// <summary>
  60. /// 构造方法
  61. /// </summary>
  62. public UdpClientProvider(int bufferSizeByConnection, int maxNumberOfConnections)
  63. :base(bufferSizeByConnection)
  64. {
  65. this.maxNumberOfConnections = maxNumberOfConnections;
  66. this.bufferSizeByConnection = bufferSizeByConnection;
  67. }
  68. public void Disconnect()
  69. {
  70. Close();
  71. isConnected = false;
  72. }
  73. /// <summary>
  74. /// 尝试连接
  75. /// </summary>
  76. /// <param name="port"></param>
  77. /// <param name="ip"></param>
  78. /// <returns></returns>
  79. public bool Connect(int port, string ip)
  80. {
  81. Close();
  82. //关闭和释放Socket
  83. //CloseAll();
  84. CreateUdpSocket(port, IPAddress.Parse(ip));
  85. Initialize();
  86. return true;
  87. }
  88. //自己添加的方法
  89. private void CloseAll()
  90. {
  91. DisposeSocketPool();
  92. SafeClose();
  93. }
  94. public bool Send(SegmentOffset sendSegment, bool waiting = true)
  95. {
  96. try
  97. {
  98. bool isWillEvent = true;
  99. ArraySegment<byte>[] segItems = sendBufferManager.BufferToSegments(sendSegment.buffer, sendSegment.offset, sendSegment.size);
  100. foreach (var seg in segItems)
  101. {
  102. SocketAsyncEventArgs tArgs = sendTokenManager.GetEmptyWait((retry) =>
  103. {
  104. return true;
  105. }, waiting);
  106. if (tArgs == null)
  107. throw new Exception("发送缓冲池已用完,等待回收...");
  108. tArgs.RemoteEndPoint = ipEndPoint;
  109. if (!sendBufferManager.WriteBuffer(tArgs, seg.Array, seg.Offset, seg.Count))
  110. {
  111. sendTokenManager.Set(tArgs);
  112. throw new Exception(string.Format("发送缓冲区溢出...buffer block max size:{0}", sendBufferManager.BlockSize));
  113. }
  114. isWillEvent &= socket.SendToAsync(tArgs);
  115. if (!isWillEvent)
  116. {
  117. ProcessSent(tArgs);
  118. }
  119. }
  120. return isWillEvent;
  121. }
  122. catch (Exception ex)
  123. {
  124. Close();
  125. throw ex;
  126. }
  127. }
  128. /// <summary>
  129. /// 同步发送
  130. /// </summary>
  131. /// <param name="buffer"></param>
  132. /// <param name="recAct"></param>
  133. /// <param name="recBufferSize"></param>
  134. /// <returns></returns>
  135. public int SendSync(SegmentOffset sendSegment, SegmentOffset receiveSegment)
  136. {
  137. int sent = socket.SendTo(sendSegment.buffer, sendSegment.offset, sendSegment.size, 0, ipEndPoint);
  138. if (receiveSegment == null
  139. || receiveSegment.buffer == null
  140. || receiveSegment.size == 0) return sent;
  141. int cnt = socket.ReceiveFrom(receiveSegment.buffer,
  142. receiveSegment.size,
  143. SocketFlags.None,
  144. ref ipEndPoint);
  145. return sent;
  146. }
  147. /// <summary>
  148. /// 同步接收
  149. /// </summary>
  150. /// <param name="recAct"></param>
  151. /// <param name="recBufferSize"></param>
  152. public void ReceiveSync(SegmentOffset receiveSegment, Action<SegmentOffset> receiveAction)
  153. {
  154. int cnt = 0;
  155. do
  156. {
  157. cnt = socket.ReceiveFrom(receiveSegment.buffer,
  158. receiveSegment.size,
  159. SocketFlags.None,
  160. ref ipEndPoint);
  161. if (cnt <= 0) break;
  162. receiveAction(receiveSegment);
  163. } while (true);
  164. }
  165. /// <summary>
  166. /// 开始接收数据
  167. /// </summary>
  168. /// <param name="remoteEP"></param>
  169. public void StartReceive()
  170. {
  171. using (LockWait lwait = new LockWait(ref lParam))
  172. {
  173. SocketAsyncEventArgs sArgs = new SocketAsyncEventArgs();
  174. sArgs.Completed += IO_Completed;
  175. sArgs.UserToken = socket;
  176. sArgs.RemoteEndPoint = ipEndPoint;
  177. sArgs.AcceptSocket = socket;
  178. sArgs.SetBuffer(receiveBuffer, 0, bufferSizeByConnection);
  179. bool isAsync = socket.ReceiveFromAsync(sArgs);
  180. if (!isAsync)
  181. {
  182. ProcessReceive(sArgs);
  183. }
  184. }
  185. }
  186. #endregion
  187. #region private method
  188. /// <summary>
  189. /// 初始化对象
  190. /// </summary>
  191. /// <param name="recBufferSize"></param>
  192. /// <param name="port"></param>
  193. private void Initialize()
  194. {
  195. sendTokenManager = new SocketTokenManager<SocketAsyncEventArgs>(maxNumberOfConnections);
  196. sendBufferManager = new SocketBufferManager(maxNumberOfConnections, bufferSizeByConnection);
  197. //初始化发送接收对象池
  198. for (int i = 0; i < maxNumberOfConnections; ++i)
  199. {
  200. SocketAsyncEventArgs sendArgs = new SocketAsyncEventArgs();
  201. sendArgs.Completed += IO_Completed;
  202. sendArgs.UserToken = socket;
  203. sendBufferManager.SetBuffer(sendArgs);
  204. sendTokenManager.Set(sendArgs);
  205. }
  206. }
  207. private void Close()
  208. {
  209. if (socket != null)
  210. {
  211. try
  212. {
  213. if(socket.Connected)
  214. {
  215. //socket.Disconnect(true);
  216. socket.Shutdown(SocketShutdown.Send);
  217. }
  218. }
  219. catch(Exception)
  220. {
  221. }
  222. try
  223. {
  224. socket.Close();
  225. socket.Dispose();
  226. }
  227. catch(Exception)
  228. {
  229. }
  230. isConnected = false;
  231. }
  232. }
  233. private void ProcessReceive(SocketAsyncEventArgs e)
  234. {
  235. SocketToken sToken = new SocketToken()
  236. {
  237. TokenSocket = e.AcceptSocket as Socket,
  238. TokenIpEndPoint = (IPEndPoint)e.RemoteEndPoint
  239. };
  240. try
  241. {
  242. if (e.SocketError != SocketError.Success || e.BytesTransferred == 0)
  243. return;
  244. //缓冲区偏移量返回
  245. if (ReceivedOffsetHandler != null)
  246. ReceivedOffsetHandler(new SegmentToken(sToken, e.Buffer, e.Offset, e.BytesTransferred));
  247. //截取后返回
  248. if (ReceivedCallbackHandler != null)
  249. {
  250. ReceivedCallbackHandler(sToken, encoding.GetString(e.Buffer, e.Offset, e.BytesTransferred));
  251. }
  252. }
  253. catch (Exception ex)
  254. {
  255. throw ex;
  256. }
  257. finally
  258. {
  259. if (e.SocketError == SocketError.Success)
  260. {
  261. //继续下一个接收
  262. if (!socket.ReceiveFromAsync(e))
  263. {
  264. ProcessReceive(e);
  265. }
  266. }
  267. }
  268. }
  269. private void ProcessSent(SocketAsyncEventArgs e)
  270. {
  271. try
  272. {
  273. bool isSuccess = e.SocketError == SocketError.Success;
  274. if (isConnected == false && isSuccess)
  275. {
  276. StartReceive();
  277. isConnected = true;
  278. }
  279. if (SentCallbackHandler != null)
  280. {
  281. SocketToken sToken = new SocketToken()
  282. {
  283. TokenSocket = e.UserToken as Socket,
  284. TokenIpEndPoint = (IPEndPoint)e.RemoteEndPoint
  285. };
  286. SentCallbackHandler(new SegmentToken( sToken, e.Buffer, e.Offset, e.BytesTransferred));
  287. }
  288. }
  289. catch (Exception ex)
  290. {
  291. throw ex;
  292. }
  293. finally
  294. {
  295. sendTokenManager.Set(e);
  296. }
  297. }
  298. void IO_Completed(object sender, SocketAsyncEventArgs e)
  299. {
  300. switch (e.LastOperation)
  301. {
  302. case SocketAsyncOperation.Receive:
  303. case SocketAsyncOperation.ReceiveFrom:
  304. case SocketAsyncOperation.ReceiveMessageFrom:
  305. ProcessReceive(e);
  306. break;
  307. case SocketAsyncOperation.SendTo:
  308. ProcessSent(e);
  309. break;
  310. }
  311. }
  312. #endregion
  313. }
  314. }