TcpServerProvider.cs 17 KB


  1. using System;
  2. using System.Net;
  3. using System.Net.Sockets;
  4. using System.Threading;
  5. using System.Text;
  6. namespace IFramework.Net.Tcp
  7. {
  8. internal class TcpServerProvider : TcpSocket, IDisposable, ITcpServerProvider
  9. {
  10. #region variable
  11. private bool isStoped = true;
  12. private bool _isDisposed = false;
  13. private int numberOfConnections = 0;
  14. private int maxNumberOfConnections = 32;
  15. private int offsetNumber = 2;
  16. private Encoding encoding = Encoding.UTF8;
  17. private LockParam lParam = new LockParam();
  18. private Semaphore maxNumberAcceptedClients = null;
  19. private SocketTokenManager<SocketAsyncEventArgs> sendTokenManager = null;
  20. private SocketTokenManager<SocketAsyncEventArgs> acceptTokenManager = null;
  21. private SocketBufferManager recvBufferManager = null;
  22. private SocketBufferManager sendBufferManager = null;
  23. #endregion
  24. #region properties
  25. /// <summary>
  26. /// 接受连接回调处理
  27. /// </summary>
  28. public OnAcceptedHandler AcceptedCallback { get; set; }
  29. /// <summary>
  30. /// 接收数据回调处理
  31. /// </summary>
  32. public OnReceivedHandler ReceivedCallback { get; set; }
  33. /// <summary>
  34. ///接收数据缓冲区,返回缓冲区的实际偏移和数量
  35. /// </summary>
  36. public OnReceivedSegmentHandler ReceivedOffsetCallback { get; set; }
  37. /// <summary>
  38. /// 发送回调处理
  39. /// </summary>
  40. public OnSentHandler SentCallback { get; set; }
  41. /// <summary>
  42. /// 断开连接回调处理
  43. /// </summary>
  44. public OnDisconnectedHandler DisconnectedCallback { get; set; }
  45. /// <summary>
  46. /// 连接数
  47. /// </summary>
  48. public int NumberOfConnections
  49. {
  50. get { return numberOfConnections; }
  51. }
  52. #endregion
  53. #region constructor
  54. public void Dispose()
  55. {
  56. Dispose(true);
  57. GC.SuppressFinalize(this);
  58. }
  59. protected virtual void Dispose(bool isDisposing)
  60. {
  61. if (_isDisposed) return;
  62. if (isDisposing)
  63. {
  64. DisposeSocketPool();
  65. SafeClose();
  66. recvBufferManager.Clear();
  67. sendBufferManager.Clear();
  68. _isDisposed = true;
  69. maxNumberAcceptedClients.Dispose();
  70. }
  71. }
  72. private void DisposeSocketPool()
  73. {
  74. sendTokenManager.ClearToCloseArgs();
  75. acceptTokenManager.ClearToCloseArgs();
  76. }
  77. /// <summary>
  78. /// 构造
  79. /// </summary>
  80. /// <param name="maxConnections">最大连接数</param>
  81. /// <param name="chunkBufferSize">接收块缓冲区</param>
  82. public TcpServerProvider(int maxConnections = 32, int chunkBufferSize = 4096)
  83. :base(chunkBufferSize)
  84. {
  85. if (maxConnections < 2) maxConnections = 2;
  86. this.maxNumberOfConnections = maxConnections;
  87. maxNumberAcceptedClients = new Semaphore(maxConnections+offsetNumber, maxConnections+offsetNumber);
  88. recvBufferManager = new SocketBufferManager(maxConnections+offsetNumber, chunkBufferSize);
  89. acceptTokenManager = new SocketTokenManager<SocketAsyncEventArgs>(maxConnections+ offsetNumber);
  90. sendTokenManager = new SocketTokenManager<SocketAsyncEventArgs>(maxConnections+ offsetNumber);
  91. sendBufferManager = new SocketBufferManager(maxConnections+ offsetNumber, chunkBufferSize);
  92. }
  93. #endregion
  94. #region public method
  95. public bool Start(int port, string ip = "0.0.0.0")
  96. {
  97. int errorCount = 0;
  98. Stop();
  99. InitializeAcceptPool();
  100. InitializeSendPool();
  101. reStart:
  102. try
  103. {
  104. SafeClose();
  105. using (LockWait lwait = new LockWait(ref lParam))
  106. {
  107. CreateTcpSocket(port,ip);
  108. socket.Bind(ipEndPoint);
  109. socket.Listen(128);
  110. isStoped = false;
  111. }
  112. StartAccept(null);
  113. return true;
  114. }
  115. catch (Exception ex)
  116. {
  117. SafeClose();
  118. ++errorCount;
  119. if (errorCount >= 3)
  120. {
  121. throw ex;
  122. }
  123. else
  124. {
  125. Thread.Sleep(1000);
  126. goto reStart;
  127. }
  128. }
  129. }
  130. public void Stop()
  131. {
  132. try
  133. {
  134. using (LockWait lwait = new LockWait(ref lParam))
  135. {
  136. DisposePoolToken();
  137. if (numberOfConnections > 0)
  138. {
  139. if (maxNumberAcceptedClients != null)
  140. maxNumberAcceptedClients.Release(numberOfConnections);
  141. numberOfConnections = 0;
  142. }
  143. SafeClose();
  144. isStoped = true;
  145. }
  146. }
  147. catch (Exception ex)
  148. {
  149. }
  150. }
  151. public void Close(SocketToken sToken)
  152. {
  153. ProcessAsyncDisconnect(sToken);
  154. }
  155. public bool Send(SegmentToken segToken, bool waiting = true)
  156. {
  157. try
  158. {
  159. if (!segToken.sToken.TokenSocket.Connected) return false;
  160. bool isWillEvent = true;
  161. ArraySegment<byte>[] segItems = sendBufferManager.BufferToSegments(segToken.Data.buffer,
  162. segToken.Data.offset,
  163. segToken.Data.size);
  164. foreach (var seg in segItems)
  165. {
  166. if (!segToken.sToken.TokenSocket.Connected) return false;
  167. var tArgs = GetSocketAsyncFromSendPool(waiting, segToken.sToken.TokenSocket);
  168. if (tArgs == null) return false;
  169. tArgs.UserToken = segToken.sToken;
  170. if (!sendBufferManager.WriteBuffer(tArgs, seg.Array, seg.Offset, seg.Count))
  171. {
  172. sendTokenManager.Set(tArgs);
  173. throw new Exception(string.Format("发送缓冲区溢出...buffer block max size:{0}", sendBufferManager.BlockSize));
  174. }
  175. isWillEvent &=segToken.sToken.SendAsync(tArgs);
  176. if (!isWillEvent)
  177. {
  178. ProcessSentCallback(tArgs);
  179. }
  180. if (sendTokenManager.Count < (sendTokenManager.Capacity >> 2))
  181. Thread.Sleep(5);
  182. }
  183. return isWillEvent;
  184. }
  185. catch (Exception ex)
  186. {
  187. Close(segToken.sToken);
  188. throw ex;
  189. }
  190. }
  191. public int SendSync(SegmentToken segToken)
  192. {
  193. return segToken.sToken.Send(segToken.Data);
  194. }
  195. #endregion
  196. #region private method
  197. private void DisposePoolToken()
  198. {
  199. sendTokenManager.ClearToCloseArgs();
  200. acceptTokenManager.ClearToCloseArgs();
  201. }
  202. private void InitializeAcceptPool()
  203. {
  204. acceptTokenManager.Clear();
  205. int cnt = maxNumberOfConnections + offsetNumber;
  206. for (int i = 1; i <=cnt; ++i)
  207. {
  208. SocketAsyncEventArgs args = new SocketAsyncEventArgs() {
  209. DisconnectReuseSocket=true,
  210. SocketError=SocketError.SocketError
  211. };
  212. args.Completed += IO_Completed;
  213. args.UserToken = new SocketToken(i)
  214. {
  215. TokenAgrs = args,
  216. };
  217. recvBufferManager.SetBuffer(args);
  218. acceptTokenManager.Set(args);
  219. }
  220. }
  221. private void InitializeSendPool()
  222. {
  223. sendTokenManager.Clear();
  224. int cnt = maxNumberOfConnections + offsetNumber;
  225. for (int i = 1; i <=cnt; ++i)
  226. {
  227. SocketAsyncEventArgs args = new SocketAsyncEventArgs() {
  228. DisconnectReuseSocket=true,
  229. SocketError=SocketError.NotInitialized
  230. };
  231. args.Completed += IO_Completed;
  232. args.UserToken = new SocketToken(i);
  233. sendBufferManager.SetBuffer(args);
  234. sendTokenManager.Set(args);
  235. }
  236. }
  237. private void StartAccept(SocketAsyncEventArgs e)
  238. {
  239. if (isStoped || socket == null)
  240. {
  241. isStoped = true;
  242. return;
  243. }
  244. if (e == null)
  245. {
  246. e = new SocketAsyncEventArgs()
  247. {
  248. DisconnectReuseSocket = true,
  249. UserToken = new SocketToken(),
  250. };
  251. e.Completed += Accept_Completed;
  252. }
  253. else
  254. {
  255. e.AcceptSocket = null;
  256. }
  257. maxNumberAcceptedClients.WaitOne();
  258. if (!socket.AcceptAsync(e))
  259. {
  260. ProcessAcceptCallback(e);
  261. }
  262. }
  263. private void ProcessAcceptCallback(SocketAsyncEventArgs e)
  264. {
  265. if (isStoped
  266. //|| maxNumberOfConnections <= numberOfConnections
  267. || e.SocketError != SocketError.Success)
  268. {
  269. DisposeSocketArgs(e);
  270. //ProcessDisconnectCallback(e);
  271. return;
  272. }
  273. //从对象池中取出一个对象
  274. SocketAsyncEventArgs tArgs = acceptTokenManager.GetEmptyWait((retry) =>
  275. {
  276. return true;
  277. }, false);
  278. if (tArgs == null)
  279. {
  280. DisposeSocketArgs(e);
  281. return;
  282. //throw new Exception(string.Format("已经达到最大连接数max:{0};used:{1}",
  283. // maxNumberOfConnections, numberOfConnections));
  284. }
  285. Interlocked.Increment(ref numberOfConnections);
  286. SocketToken sToken = ((SocketToken)tArgs.UserToken);
  287. sToken.TokenSocket = e.AcceptSocket;
  288. sToken.TokenSocket.ReceiveTimeout = receiveTimeout;
  289. sToken.TokenSocket.SendTimeout = sendTimeout;
  290. sToken.TokenIpEndPoint = (IPEndPoint)e.AcceptSocket.RemoteEndPoint;
  291. sToken.TokenAgrs = tArgs;
  292. tArgs.UserToken = sToken;
  293. //listening receive
  294. if (e.AcceptSocket.Connected)
  295. {
  296. if (!e.AcceptSocket.ReceiveAsync(tArgs))
  297. {
  298. ProcessReceiveCallback(tArgs);
  299. }
  300. if (maxNumberOfConnections < numberOfConnections)
  301. {
  302. Close(sToken);
  303. //ProcessDisconnectCallback(tArgs);
  304. }
  305. else
  306. {
  307. //将信息传递到自定义的方法
  308. AcceptedCallback?.Invoke(sToken);
  309. }
  310. }
  311. else
  312. {
  313. ProcessDisconnectCallback(tArgs);
  314. }
  315. if (isStoped) return;
  316. //继续准备下一个接收
  317. StartAccept(e);
  318. }
  319. private void ProcessReceiveCallback(SocketAsyncEventArgs e)
  320. {
  321. if (e.SocketError != SocketError.Success
  322. || e.BytesTransferred == 0)
  323. {
  324. ProcessDisconnectCallback(e);
  325. return;
  326. }
  327. SocketToken sToken = e.UserToken as SocketToken;
  328. if (ReceivedOffsetCallback != null)
  329. {
  330. ReceivedOffsetCallback(new SegmentToken(sToken, e.Buffer, e.Offset, e.BytesTransferred));
  331. }
  332. //处理接收到的数据
  333. if (ReceivedCallback != null)
  334. {
  335. ReceivedCallback(sToken, encoding.GetString(e.Buffer, e.Offset, e.BytesTransferred));
  336. }
  337. if (sToken.TokenSocket.Connected)
  338. {
  339. //继续投递下一个接受请求
  340. if (!sToken.TokenSocket.ReceiveAsync(e))
  341. {
  342. this.ProcessReceiveCallback(e);
  343. }
  344. }
  345. else
  346. {
  347. ProcessDisconnectCallback(e);
  348. }
  349. }
  350. private void ProcessSentCallback(SocketAsyncEventArgs e)
  351. {
  352. try
  353. {
  354. if (e.SocketError == SocketError.Success)
  355. {
  356. if (SentCallback != null)
  357. {
  358. SocketToken sToken = e.UserToken as SocketToken;
  359. SentCallback(new SegmentToken( sToken, e.Buffer, e.Offset, e.BytesTransferred));
  360. }
  361. }
  362. }
  363. catch (Exception ex)
  364. {
  365. throw ex;
  366. }
  367. finally
  368. {
  369. sendTokenManager.Set(e);
  370. }
  371. }
  372. private void ProcessDisconnectCallback(SocketAsyncEventArgs e)
  373. {
  374. SocketToken sToken = e.UserToken as SocketToken;
  375. if (sToken == null) {
  376. return;// throw new Exception("空异常");
  377. }
  378. try
  379. {
  380. sToken.Close();
  381. //递减信号量
  382. maxNumberAcceptedClients.Release();
  383. Interlocked.Decrement(ref numberOfConnections);
  384. if (sToken.TokenId != 0)
  385. {
  386. //将断开的对象重新放回复用队列
  387. acceptTokenManager.Set(e);
  388. }
  389. DisconnectedCallback?.Invoke(sToken);
  390. }
  391. catch (Exception ex)
  392. {
  393. throw ex;
  394. }
  395. }
  396. private void DisposeSocketArgs(SocketAsyncEventArgs e)
  397. {
  398. SocketToken s = e.UserToken as SocketToken;
  399. if (s != null) s.Close();// if (e.UserToken is SocketToken s) --新语法
  400. e.Dispose();
  401. }
  402. private SocketAsyncEventArgs GetSocketAsyncFromSendPool(bool waiting, Socket socket)
  403. {
  404. var tArgs = sendTokenManager.GetEmptyWait((retry) =>
  405. {
  406. if (socket.Connected == false) return false;
  407. return true;
  408. }, waiting);
  409. if (socket.Connected == false)
  410. return null;
  411. if (tArgs == null)
  412. throw new Exception("发送缓冲池已用完,等待回收超时...");
  413. return tArgs;
  414. }
  415. //slow close client socket
  416. private void ProcessAsyncDisconnect(SocketToken sToken)
  417. {
  418. try
  419. {
  420. if (sToken == null
  421. || sToken.TokenSocket == null
  422. || sToken.TokenAgrs == null) return;
  423. //SocketAsyncEventArgs args = new SocketAsyncEventArgs()
  424. //{
  425. // DisconnectReuseSocket = true,
  426. // SocketError = SocketError.SocketError,
  427. // UserToken = null
  428. //};
  429. //args.Completed += IO_Completed;
  430. //if (sToken.TokenSocket.DisconnectAsync(args) == false)
  431. //{
  432. // ProcessDisconnectCallback(sToken.TokenAgrs);
  433. //}
  434. if (sToken.TokenSocket.Connected)
  435. sToken.TokenSocket.Shutdown(SocketShutdown.Send);
  436. sToken.TokenSocket.Close();
  437. }
  438. catch (ObjectDisposedException oe)
  439. {
  440. #if DEBUG
  441. Console.WriteLine(oe.TargetSite.Name + oe.Message);
  442. #endif
  443. return;
  444. }
  445. catch (Exception ex)
  446. {
  447. #if DEBUG
  448. Console.WriteLine(ex.TargetSite.Name + ex.Message);
  449. #endif
  450. }
  451. }
  452. void IO_Completed(object sender, SocketAsyncEventArgs e)
  453. {
  454. switch (e.LastOperation)
  455. {
  456. case SocketAsyncOperation.Receive:
  457. ProcessReceiveCallback(e);
  458. break;
  459. case SocketAsyncOperation.Send:
  460. ProcessSentCallback(e);
  461. break;
  462. case SocketAsyncOperation.Disconnect:
  463. ProcessDisconnectCallback(e);
  464. break;
  465. case SocketAsyncOperation.Accept:
  466. ProcessAcceptCallback(e);
  467. break;
  468. default:
  469. break;
  470. }
  471. }
  472. void Accept_Completed(object send, SocketAsyncEventArgs e)
  473. {
  474. ProcessAcceptCallback(e);
  475. }
  476. #endregion
  477. }
  478. }