SocketIO.cs 9.6 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net.WebSockets;
  4. using System.Text;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using EZXR.Glass.Network.SocketIOClient.Arguments;
  8. using EZXR.Glass.Network.SocketIOClient.Parsers;
  9. namespace EZXR.Glass.Network.SocketIOClient
  10. {
  11. public class SocketIO
  12. {
  13. /// <summary>
  14. ///
  15. /// </summary>
  16. /// <param name="uri"></param>
  17. public SocketIO(Uri uri)
  18. {
  19. if (uri.Scheme == "https" || uri.Scheme == "http" || uri.Scheme == "wss" || uri.Scheme == "ws")
  20. {
  21. _uri = uri;
  22. }
  23. else
  24. {
  25. OnError?.Invoke("Unsupported protocol");
  26. throw new ArgumentException("Unsupported protocol");
  27. }
  28. EventHandlers = new Dictionary<string, EventHandler>();
  29. Callbacks = new Dictionary<int, EventHandler>();
  30. _urlConverter = new UrlConverter();
  31. if (_uri.AbsolutePath != "/")
  32. {
  33. _namespace = _uri.AbsolutePath + ',';
  34. }
  35. _packetId = -1;
  36. ConnectTimeout = TimeSpan.FromSeconds(30);
  37. }
  38. public SocketIO(string uri) : this(new Uri(uri)) { }
  39. private const int ReceiveChunkSize = 1024;
  40. private const int SendChunkSize = 1024;
  41. readonly Uri _uri;
  42. private ClientWebSocket _socket;
  43. readonly UrlConverter _urlConverter;
  44. readonly string _namespace;
  45. private CancellationTokenSource _tokenSource;
  46. private int _packetId;
  47. public Dictionary<int, EventHandler> Callbacks { get; }
  48. public int EIO { get; set; } = 3;
  49. public TimeSpan ConnectTimeout { get; set; }
  50. public Dictionary<string, string> Parameters { get; set; }
  51. public event Action OnConnected;
  52. public event Action<ServerCloseReason> OnClosed;
  53. public event Action<string, ResponseArgs> UnhandledEvent;
  54. public event Action<string, ResponseArgs> OnReceivedEvent;
  55. public event Action<string> OnError;
  56. public Dictionary<string, EventHandler> EventHandlers { get; }
  57. public SocketIOState State { get; private set; }
  58. public Task ConnectAsync()
  59. {
  60. _tokenSource = new CancellationTokenSource();
  61. Uri wsUri = _urlConverter.HttpToWs(_uri, EIO.ToString(), Parameters);
  62. if (_socket != null)
  63. {
  64. _socket.Dispose();
  65. }
  66. _socket = new ClientWebSocket();
  67. bool executed = _socket.ConnectAsync(wsUri, CancellationToken.None).Wait(ConnectTimeout);
  68. if (!executed)
  69. {
  70. OnError?.Invoke("Connect Time Out");
  71. throw new TimeoutException();
  72. }
  73. Listen();
  74. return Task.CompletedTask;
  75. }
  76. public Task CloseAsync()
  77. {
  78. if (_socket == null)
  79. {
  80. throw new InvalidOperationException("Close failed, must connect first.");
  81. }
  82. else
  83. {
  84. _tokenSource.Cancel();
  85. _tokenSource.Dispose();
  86. _socket.Abort();
  87. _socket.Dispose();
  88. _socket = null;
  89. OnClosed?.Invoke(ServerCloseReason.ClosedByClient);
  90. return Task.CompletedTask;
  91. }
  92. }
  93. private void Listen()
  94. {
  95. // Listen State
  96. Task.Factory.StartNew(async () =>
  97. {
  98. while (true)
  99. {
  100. await Task.Delay(500);
  101. if (_socket.State == WebSocketState.Aborted || _socket.State == WebSocketState.Closed)
  102. {
  103. if (State != SocketIOState.Closed)
  104. {
  105. State = SocketIOState.Closed;
  106. _tokenSource.Cancel();
  107. OnClosed?.Invoke(ServerCloseReason.Aborted);
  108. }
  109. }
  110. }
  111. }, _tokenSource.Token);
  112. // Listen Message
  113. Task.Factory.StartNew(async () =>
  114. {
  115. var buffer = new byte[ReceiveChunkSize];
  116. while (true)
  117. {
  118. if (_socket.State == WebSocketState.Open)
  119. {
  120. WebSocketReceiveResult result = await _socket.ReceiveAsync(new ArraySegment<byte>(buffer), _tokenSource.Token);
  121. if (result.MessageType == WebSocketMessageType.Text)
  122. {
  123. var builder = new StringBuilder();
  124. string str = Encoding.UTF8.GetString(buffer, 0, result.Count);
  125. builder.Append(str);
  126. while (!result.EndOfMessage)
  127. {
  128. result = await _socket.ReceiveAsync(new ArraySegment<byte>(buffer), _tokenSource.Token);
  129. str = Encoding.UTF8.GetString(buffer, 0, result.Count);
  130. builder.Append(str);
  131. }
  132. var parser = new ResponseTextParser(_namespace, this)
  133. {
  134. Text = builder.ToString()
  135. };
  136. await parser.ParseAsync();
  137. }
  138. }
  139. }
  140. }, _tokenSource.Token);
  141. }
  142. private async Task SendMessageAsync(string text)
  143. {
  144. if (_socket.State == WebSocketState.Open)
  145. {
  146. var messageBuffer = Encoding.UTF8.GetBytes(text);
  147. var messagesCount = (int)Math.Ceiling((double)messageBuffer.Length / SendChunkSize);
  148. for (var i = 0; i < messagesCount; i++)
  149. {
  150. int offset = SendChunkSize * i;
  151. int count = SendChunkSize;
  152. bool isEndOfMessage = (i + 1) == messagesCount;
  153. if ((count * (i + 1)) > messageBuffer.Length)
  154. {
  155. count = messageBuffer.Length - offset;
  156. }
  157. await _socket.SendAsync(new ArraySegment<byte>(messageBuffer, offset, count), WebSocketMessageType.Text, isEndOfMessage, _tokenSource.Token);
  158. }
  159. }
  160. }
  161. public Task InvokeConnectedAsync()
  162. {
  163. State = SocketIOState.Connected;
  164. OnConnected?.Invoke();
  165. return Task.CompletedTask;
  166. }
  167. public async Task InvokeClosedAsync()
  168. {
  169. if (State != SocketIOState.Closed)
  170. {
  171. State = SocketIOState.Closed;
  172. await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, _tokenSource.Token);
  173. _tokenSource.Cancel();
  174. OnClosed?.Invoke(ServerCloseReason.ClosedByServer);
  175. }
  176. }
  177. public async Task InvokeOpenedAsync(OpenedArgs args)
  178. {
  179. await Task.Factory.StartNew(async () =>
  180. {
  181. if (_namespace != null)
  182. {
  183. await SendMessageAsync("40" + _namespace);
  184. }
  185. State = SocketIOState.Connected;
  186. while (true)
  187. {
  188. if (State == SocketIOState.Connected)
  189. {
  190. await Task.Delay(args.PingInterval);
  191. await SendMessageAsync(((int)EngineIOProtocol.Ping).ToString());
  192. }
  193. else
  194. {
  195. break;
  196. }
  197. }
  198. });
  199. }
  200. public Task InvokeUnhandledEvent(string eventName, ResponseArgs args)
  201. {
  202. UnhandledEvent?.Invoke(eventName, args);
  203. return Task.CompletedTask;
  204. }
  205. public Task InvokeReceivedEvent(string eventName, ResponseArgs args)
  206. {
  207. OnReceivedEvent?.Invoke(eventName, args);
  208. return Task.CompletedTask;
  209. }
  210. public void On(string eventName, EventHandler handler)
  211. {
  212. EventHandlers.Add(eventName, handler);
  213. }
  214. private async Task EmitAsync(string eventName, int packetId, object obj)
  215. {
  216. string text = EZXR.Glass.Core.JsonUtil.ToJson<object>(obj);
  217. var builder = new StringBuilder();
  218. builder
  219. .Append("42")
  220. .Append(_namespace)
  221. .Append(packetId)
  222. .Append('[')
  223. .Append('"')
  224. .Append(eventName)
  225. .Append('"')
  226. .Append(',')
  227. .Append(text)
  228. .Append(']');
  229. string message = builder.ToString();
  230. if (State == SocketIOState.Connected)
  231. {
  232. await SendMessageAsync(message);
  233. }
  234. else
  235. {
  236. OnError?.Invoke("Socket connection not ready, emit failure.");
  237. throw new InvalidOperationException("Socket connection not ready, emit failure.");
  238. }
  239. }
  240. public async Task EmitAsync(string eventName, object obj)
  241. {
  242. _packetId++;
  243. await EmitAsync(eventName, _packetId, obj);
  244. }
  245. public async Task EmitAsync(string eventName, object obj, EventHandler callback)
  246. {
  247. _packetId++;
  248. Callbacks.Add(_packetId, callback);
  249. await EmitAsync(eventName, _packetId, obj);
  250. }
  251. }
  252. }