WebSocketResponse.cs 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705
  1. #if !BESTHTTP_DISABLE_WEBSOCKET && (!UNITY_WEBGL || UNITY_EDITOR)
  2. using System;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using BestHTTP.Extensions;
  8. using BestHTTP.WebSocket.Frames;
  9. namespace BestHTTP.WebSocket
  10. {
  11. public sealed class WebSocketResponse : HTTPResponse, IHeartbeat, IProtocol
  12. {
  13. /// <summary>
  14. /// Capacity of the RTT buffer where the latencies are kept.
  15. /// </summary>
  16. public static int RTTBufferCapacity = 5;
  17. #region Public Interface
  18. /// <summary>
  19. /// A reference to the original WebSocket instance. Used for accessing extensions.
  20. /// </summary>
  21. public WebSocket WebSocket { get; internal set; }
  22. /// <summary>
  23. /// Called when a Text message received
  24. /// </summary>
  25. public Action<WebSocketResponse, string> OnText;
  26. /// <summary>
  27. /// Called when a Binary message received
  28. /// </summary>
  29. public Action<WebSocketResponse, byte[]> OnBinary;
  30. /// <summary>
  31. /// Called when an incomplete frame received. No attempt will be made to reassemble these fragments.
  32. /// </summary>
  33. public Action<WebSocketResponse, WebSocketFrameReader> OnIncompleteFrame;
  34. /// <summary>
  35. /// Called when the connection closed.
  36. /// </summary>
  37. public Action<WebSocketResponse, UInt16, string> OnClosed;
  38. /// <summary>
  39. /// Indicates whether the connection to the server is closed or not.
  40. /// </summary>
  41. public bool IsClosed { get { return closed; } }
  42. /// <summary>
  43. /// On what frequency we have to send a ping to the server.
  44. /// </summary>
  45. public TimeSpan PingFrequnecy { get; private set; }
  46. /// <summary>
  47. /// Maximum size of a fragment's payload data. Its default value is 32767.
  48. /// </summary>
  49. public UInt16 MaxFragmentSize { get; private set; }
  50. /// <summary>
  51. /// Length of unsent, buffered up data in bytes.
  52. /// </summary>
  53. public int BufferedAmount { get { return this._bufferedAmount; } }
  54. private int _bufferedAmount;
  55. /// <summary>
  56. /// Calculated latency from the Round-Trip Times we store in the rtts field.
  57. /// </summary>
  58. public int Latency { get; private set; }
  59. #endregion
  60. #region Private Fields
  61. private List<WebSocketFrameReader> IncompleteFrames = new List<WebSocketFrameReader>();
  62. private List<WebSocketFrameReader> CompletedFrames = new List<WebSocketFrameReader>();
  63. private List<WebSocketFrameReader> frameCache = new List<WebSocketFrameReader>();
  64. private WebSocketFrameReader CloseFrame;
  65. private object FrameLock = new object();
  66. private object SendLock = new object();
  67. private List<WebSocketFrame> unsentFrames = new List<WebSocketFrame>();
  68. private AutoResetEvent newFrameSignal = new AutoResetEvent(false);
  69. private volatile bool sendThreadCreated = false;
  70. /// <summary>
  71. /// True if we sent out a Close message to the server
  72. /// </summary>
  73. private volatile bool closeSent;
  74. /// <summary>
  75. /// True if this WebSocket connection is closed
  76. /// </summary>
  77. private volatile bool closed;
  78. /// <summary>
  79. /// When we sent out the last ping.
  80. /// </summary>
  81. private DateTime lastPing = DateTime.MinValue;
  82. /// <summary>
  83. /// When we received the last pong.
  84. /// </summary>
  85. private DateTime lastMessage = DateTime.MinValue;
  86. /// <summary>
  87. /// A circular buffer to store the last N rtt times calculated by the pong messages.
  88. /// </summary>
  89. private CircularBuffer<int> rtts = new CircularBuffer<int>(WebSocketResponse.RTTBufferCapacity);
  90. #endregion
  91. internal WebSocketResponse(HTTPRequest request, Stream stream, bool isStreamed, bool isFromCache)
  92. : base(request, stream, isStreamed, isFromCache)
  93. {
  94. base.IsClosedManually = true;
  95. closed = false;
  96. MaxFragmentSize = UInt16.MaxValue / 2;
  97. }
  98. internal void StartReceive()
  99. {
  100. if (IsUpgraded)
  101. {
  102. #if NETFX_CORE
  103. #pragma warning disable 4014
  104. Windows.System.Threading.ThreadPool.RunAsync(ReceiveThreadFunc);
  105. #pragma warning restore 4014
  106. #else
  107. ThreadPool.QueueUserWorkItem(ReceiveThreadFunc);
  108. #endif
  109. }
  110. }
  111. internal void CloseStream()
  112. {
  113. var conn = HTTPManager.GetConnectionWith(this.baseRequest);
  114. if (conn != null)
  115. conn.Abort(HTTPConnectionStates.Closed);
  116. }
  117. #region Public interface for interacting with the server
  118. /// <summary>
  119. /// It will send the given message to the server in one frame.
  120. /// </summary>
  121. public void Send(string message)
  122. {
  123. if (message == null)
  124. throw new ArgumentNullException("message must not be null!");
  125. byte[] data = System.Text.Encoding.UTF8.GetBytes(message);
  126. Send(new WebSocketFrame(this.WebSocket, WebSocketFrameTypes.Text, data));
  127. }
  128. /// <summary>
  129. /// It will send the given data to the server in one frame.
  130. /// </summary>
  131. public void Send(byte[] data)
  132. {
  133. if (data == null)
  134. throw new ArgumentNullException("data must not be null!");
  135. WebSocketFrame frame = new WebSocketFrame(this.WebSocket, WebSocketFrameTypes.Binary, data);
  136. if (frame.Data != null && frame.Data.Length > this.MaxFragmentSize)
  137. {
  138. WebSocketFrame[] additionalFrames = frame.Fragment(this.MaxFragmentSize);
  139. lock(SendLock)
  140. {
  141. Send(frame);
  142. if (additionalFrames != null)
  143. for (int i = 0; i < additionalFrames.Length; ++i)
  144. Send(additionalFrames[i]);
  145. }
  146. }
  147. else
  148. Send(frame);
  149. }
  150. /// <summary>
  151. /// Will send count bytes from a byte array, starting from offset.
  152. /// </summary>
  153. public void Send(byte[] data, ulong offset, ulong count)
  154. {
  155. if (data == null)
  156. throw new ArgumentNullException("data must not be null!");
  157. if (offset + count > (ulong)data.Length)
  158. throw new ArgumentOutOfRangeException("offset + count >= data.Length");
  159. WebSocketFrame frame = new WebSocketFrame(this.WebSocket, WebSocketFrameTypes.Binary, data, offset, count, true, true);
  160. if (frame.Data != null && frame.Data.Length > this.MaxFragmentSize)
  161. {
  162. WebSocketFrame[] additionalFrames = frame.Fragment(this.MaxFragmentSize);
  163. lock (SendLock)
  164. {
  165. Send(frame);
  166. if (additionalFrames != null)
  167. for (int i = 0; i < additionalFrames.Length; ++i)
  168. Send(additionalFrames[i]);
  169. }
  170. }
  171. else
  172. Send(frame);
  173. }
  174. /// <summary>
  175. /// It will send the given frame to the server.
  176. /// </summary>
  177. public void Send(WebSocketFrame frame)
  178. {
  179. if (frame == null)
  180. throw new ArgumentNullException("frame is null!");
  181. if (closed || closeSent)
  182. return;
  183. lock (SendLock)
  184. {
  185. this.unsentFrames.Add(frame);
  186. if (!sendThreadCreated)
  187. {
  188. HTTPManager.Logger.Information("WebSocketResponse", "Send - Creating thread");
  189. #if NETFX_CORE
  190. #pragma warning disable 4014
  191. Windows.System.Threading.ThreadPool.RunAsync(SendThreadFunc);
  192. #pragma warning restore 4014
  193. #else
  194. ThreadPool.QueueUserWorkItem(SendThreadFunc);
  195. #endif
  196. sendThreadCreated = true;
  197. }
  198. }
  199. Interlocked.Add(ref this._bufferedAmount, frame.Data != null ? frame.Data.Length : 0);
  200. //if (HTTPManager.Logger.Level <= Logger.Loglevels.All)
  201. // HTTPManager.Logger.Information("WebSocketResponse", "Signaling SendThread!");
  202. newFrameSignal.Set();
  203. }
  204. /// <summary>
  205. /// It will send the given frame to the server by inserting the frame into the queue as the first element.
  206. /// </summary>
  207. public void Insert(WebSocketFrame frame)
  208. {
  209. if (frame == null)
  210. throw new ArgumentNullException("frame is null!");
  211. if (closed || closeSent)
  212. return;
  213. lock (SendLock)
  214. {
  215. this.unsentFrames.Insert(0, frame);
  216. if (!sendThreadCreated)
  217. {
  218. HTTPManager.Logger.Information("WebSocketResponse", "Insert - Creating thread");
  219. #if NETFX_CORE
  220. #pragma warning disable 4014
  221. Windows.System.Threading.ThreadPool.RunAsync(SendThreadFunc);
  222. #pragma warning restore 4014
  223. #else
  224. ThreadPool.QueueUserWorkItem(SendThreadFunc);
  225. #endif
  226. sendThreadCreated = true;
  227. }
  228. }
  229. Interlocked.Add(ref this._bufferedAmount, frame.Data != null ? frame.Data.Length : 0);
  230. newFrameSignal.Set();
  231. }
  232. public void SendNow(WebSocketFrame frame)
  233. {
  234. if (frame == null)
  235. throw new ArgumentNullException("frame is null!");
  236. if (closed || closeSent)
  237. return;
  238. byte[] rawData = frame.Get();
  239. Stream.Write(rawData, 0, rawData.Length);
  240. Stream.Flush();
  241. }
  242. /// <summary>
  243. /// It will initiate the closing of the connection to the server.
  244. /// </summary>
  245. public void Close()
  246. {
  247. Close(1000, "Bye!");
  248. }
  249. /// <summary>
  250. /// It will initiate the closing of the connection to the server.
  251. /// </summary>
  252. public void Close(UInt16 code, string msg)
  253. {
  254. if (closed)
  255. return;
  256. lock (SendLock)
  257. this.unsentFrames.Clear();
  258. Interlocked.Exchange(ref this._bufferedAmount, 0);
  259. Send(new WebSocketFrame(this.WebSocket, WebSocketFrameTypes.ConnectionClose, WebSocket.EncodeCloseData(code, msg)));
  260. }
  261. public void StartPinging(int frequency)
  262. {
  263. if (frequency < 100)
  264. throw new ArgumentException("frequency must be at least 100 milliseconds!");
  265. PingFrequnecy = TimeSpan.FromMilliseconds(frequency);
  266. lastMessage = DateTime.UtcNow;
  267. SendPing();
  268. HTTPManager.Heartbeats.Subscribe(this);
  269. HTTPUpdateDelegator.OnApplicationForegroundStateChanged += OnApplicationForegroundStateChanged;
  270. }
  271. #endregion
  272. #region Private Threading Functions
  273. private void SendThreadFunc(object param)
  274. {
  275. List<WebSocketFrame> localFrames = new List<WebSocketFrame>();
  276. try
  277. {
  278. while (!closed && !closeSent)
  279. {
  280. //if (HTTPManager.Logger.Level <= Logger.Loglevels.All)
  281. // HTTPManager.Logger.Information("WebSocketResponse", "SendThread - Waiting...");
  282. newFrameSignal.WaitOne();
  283. try
  284. {
  285. lock (SendLock)
  286. {
  287. // add frames int reversed order
  288. for (int i = this.unsentFrames.Count - 1; i >= 0; --i)
  289. localFrames.Add(this.unsentFrames[i]);
  290. this.unsentFrames.Clear();
  291. }
  292. //if (HTTPManager.Logger.Level <= Logger.Loglevels.All)
  293. // HTTPManager.Logger.Information("WebSocketResponse", "SendThread - Wait is over, " + localFrames.Count.ToString() + " new frames!");
  294. while (localFrames.Count > 0)
  295. {
  296. WebSocketFrame frame = localFrames[localFrames.Count - 1];
  297. localFrames.RemoveAt(localFrames.Count - 1);
  298. if (!closeSent)
  299. {
  300. byte[] rawData = frame.Get();
  301. Stream.Write(rawData, 0, rawData.Length);
  302. if (frame.Type == WebSocketFrameTypes.ConnectionClose)
  303. closeSent = true;
  304. }
  305. Interlocked.Add(ref this._bufferedAmount, -frame.Data.Length);
  306. }
  307. Stream.Flush();
  308. }
  309. catch(Exception ex)
  310. {
  311. if (HTTPUpdateDelegator.IsCreated)
  312. {
  313. this.baseRequest.Exception = ex;
  314. this.baseRequest.State = HTTPRequestStates.Error;
  315. }
  316. else
  317. this.baseRequest.State = HTTPRequestStates.Aborted;
  318. closed = true;
  319. }
  320. }
  321. }
  322. finally
  323. {
  324. sendThreadCreated = false;
  325. HTTPManager.Logger.Information("WebSocketResponse", "SendThread - Closed!");
  326. }
  327. }
  328. private void ReceiveThreadFunc(object param)
  329. {
  330. try
  331. {
  332. while (!closed)
  333. {
  334. try
  335. {
  336. WebSocketFrameReader frame = new WebSocketFrameReader();
  337. frame.Read(Stream);
  338. lastMessage = DateTime.UtcNow;
  339. // A server MUST NOT mask any frames that it sends to the client. A client MUST close a connection if it detects a masked frame.
  340. // In this case, it MAY use the status code 1002 (protocol error)
  341. // (These rules might be relaxed in a future specification.)
  342. if (frame.HasMask)
  343. {
  344. Close(1002, "Protocol Error: masked frame received from server!");
  345. continue;
  346. }
  347. if (!frame.IsFinal)
  348. {
  349. if (OnIncompleteFrame == null)
  350. IncompleteFrames.Add(frame);
  351. else
  352. lock (FrameLock) CompletedFrames.Add(frame);
  353. continue;
  354. }
  355. switch (frame.Type)
  356. {
  357. // For a complete documentation and rules on fragmentation see http://tools.ietf.org/html/rfc6455#section-5.4
  358. // A fragmented Frame's last fragment's opcode is 0 (Continuation) and the FIN bit is set to 1.
  359. case WebSocketFrameTypes.Continuation:
  360. // Do an assemble pass only if OnFragment is not set. Otherwise put it in the CompletedFrames, we will handle it in the HandleEvent phase.
  361. if (OnIncompleteFrame == null)
  362. {
  363. frame.Assemble(IncompleteFrames);
  364. // Remove all incomplete frames
  365. IncompleteFrames.Clear();
  366. // Control frames themselves MUST NOT be fragmented. So, its a normal text or binary frame. Go, handle it as usual.
  367. goto case WebSocketFrameTypes.Binary;
  368. }
  369. else
  370. lock (FrameLock) CompletedFrames.Add(frame);
  371. break;
  372. case WebSocketFrameTypes.Text:
  373. case WebSocketFrameTypes.Binary:
  374. frame.DecodeWithExtensions(WebSocket);
  375. lock (FrameLock) CompletedFrames.Add(frame);
  376. break;
  377. // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in response, unless it already received a Close frame.
  378. case WebSocketFrameTypes.Ping:
  379. if (!closeSent && !closed)
  380. Send(new WebSocketFrame(this.WebSocket, WebSocketFrameTypes.Pong, frame.Data));
  381. break;
  382. case WebSocketFrameTypes.Pong:
  383. try
  384. {
  385. // Get the ticks from the frame's payload
  386. long ticksSent = BitConverter.ToInt64(frame.Data, 0);
  387. // the difference between the current time and the time when the ping message is sent
  388. TimeSpan diff = TimeSpan.FromTicks(lastMessage.Ticks - ticksSent);
  389. // add it to the buffer
  390. this.rtts.Add((int)diff.TotalMilliseconds);
  391. // and calculate the new latency
  392. this.Latency = CalculateLatency();
  393. }
  394. catch
  395. {
  396. // https://tools.ietf.org/html/rfc6455#section-5.5
  397. // A Pong frame MAY be sent unsolicited. This serves as a
  398. // unidirectional heartbeat. A response to an unsolicited Pong frame is
  399. // not expected.
  400. }
  401. break;
  402. // If an endpoint receives a Close frame and did not previously send a Close frame, the endpoint MUST send a Close frame in response.
  403. case WebSocketFrameTypes.ConnectionClose:
  404. CloseFrame = frame;
  405. if (!closeSent)
  406. Send(new WebSocketFrame(this.WebSocket, WebSocketFrameTypes.ConnectionClose, null));
  407. closed = true;
  408. break;
  409. }
  410. }
  411. #if !NETFX_CORE
  412. catch (ThreadAbortException)
  413. {
  414. IncompleteFrames.Clear();
  415. this.baseRequest.State = HTTPRequestStates.Aborted;
  416. closed = true;
  417. newFrameSignal.Set();
  418. }
  419. #endif
  420. catch (Exception e)
  421. {
  422. if (HTTPUpdateDelegator.IsCreated)
  423. {
  424. this.baseRequest.Exception = e;
  425. this.baseRequest.State = HTTPRequestStates.Error;
  426. }
  427. else
  428. this.baseRequest.State = HTTPRequestStates.Aborted;
  429. closed = true;
  430. newFrameSignal.Set();
  431. }
  432. }
  433. }
  434. finally
  435. {
  436. HTTPManager.Heartbeats.Unsubscribe(this);
  437. HTTPUpdateDelegator.OnApplicationForegroundStateChanged -= OnApplicationForegroundStateChanged;
  438. HTTPManager.Logger.Information("WebSocketResponse", "ReceiveThread - Closed!");
  439. }
  440. }
  441. #endregion
  442. #region Sending Out Events
  443. /// <summary>
  444. /// Internal function to send out received messages.
  445. /// </summary>
  446. void IProtocol.HandleEvents()
  447. {
  448. frameCache.Clear();
  449. lock (FrameLock)
  450. {
  451. frameCache.AddRange(CompletedFrames);
  452. CompletedFrames.Clear();
  453. }
  454. for (int i = 0; i < frameCache.Count; ++i)
  455. {
  456. WebSocketFrameReader frame = frameCache[i];
  457. // Bugs in the clients shouldn't interrupt the code, so we need to try-catch and ignore any exception occurring here
  458. try
  459. {
  460. switch (frame.Type)
  461. {
  462. case WebSocketFrameTypes.Continuation:
  463. if (OnIncompleteFrame != null)
  464. OnIncompleteFrame(this, frame);
  465. break;
  466. case WebSocketFrameTypes.Text:
  467. // Any not Final frame is handled as a fragment
  468. if (!frame.IsFinal)
  469. goto case WebSocketFrameTypes.Continuation;
  470. if (OnText != null)
  471. OnText(this, frame.DataAsText);
  472. break;
  473. case WebSocketFrameTypes.Binary:
  474. // Any not Final frame is handled as a fragment
  475. if (!frame.IsFinal)
  476. goto case WebSocketFrameTypes.Continuation;
  477. if (OnBinary != null)
  478. OnBinary(this, frame.Data);
  479. break;
  480. }
  481. }
  482. catch (Exception ex)
  483. {
  484. HTTPManager.Logger.Exception("WebSocketResponse", "HandleEvents", ex);
  485. }
  486. }
  487. frameCache.Clear();
  488. // 2015.05.09
  489. // State checking added because if there is an error the OnClose called first, and then the OnError.
  490. // Now, when there is an error only the OnError event will be called!
  491. if (IsClosed && OnClosed != null && baseRequest.State == HTTPRequestStates.Processing)
  492. {
  493. try
  494. {
  495. UInt16 statusCode = 0;
  496. string msg = string.Empty;
  497. // If we received any data, we will get the status code and the message from it
  498. if (CloseFrame != null && CloseFrame.Data != null && CloseFrame.Data.Length >= 2)
  499. {
  500. if (BitConverter.IsLittleEndian)
  501. Array.Reverse(CloseFrame.Data, 0, 2);
  502. statusCode = BitConverter.ToUInt16(CloseFrame.Data, 0);
  503. if (CloseFrame.Data.Length > 2)
  504. msg = Encoding.UTF8.GetString(CloseFrame.Data, 2, CloseFrame.Data.Length - 2);
  505. }
  506. OnClosed(this, statusCode, msg);
  507. }
  508. catch (Exception ex)
  509. {
  510. HTTPManager.Logger.Exception("WebSocketResponse", "HandleEvents - OnClosed", ex);
  511. }
  512. }
  513. }
  514. #endregion
  515. #region IHeartbeat Implementation
  516. void IHeartbeat.OnHeartbeatUpdate(TimeSpan dif)
  517. {
  518. DateTime now = DateTime.UtcNow;
  519. if (now - lastPing >= PingFrequnecy)
  520. SendPing();
  521. if (now - (lastMessage + this.PingFrequnecy) > this.WebSocket.CloseAfterNoMesssage)
  522. {
  523. HTTPManager.Logger.Warning("WebSocketResponse",
  524. string.Format("No message received in the given time! Closing WebSocket. LastMessage: {0}, PingFrequency: {1}, Close After: {2}, Now: {3}",
  525. this.lastMessage, this.PingFrequnecy, this.WebSocket.CloseAfterNoMesssage, now));
  526. CloseWithError("No message received in the given time!");
  527. }
  528. }
  529. #endregion
  530. private void OnApplicationForegroundStateChanged(bool isPaused)
  531. {
  532. if (!isPaused)
  533. lastMessage = DateTime.UtcNow;
  534. }
  535. private void SendPing()
  536. {
  537. lastPing = DateTime.UtcNow;
  538. try
  539. {
  540. long ticks = DateTime.UtcNow.Ticks;
  541. var ticksBytes = BitConverter.GetBytes(ticks);
  542. var pingFrame = new WebSocketFrame(this.WebSocket, WebSocketFrameTypes.Ping, ticksBytes);
  543. Insert(pingFrame);
  544. }
  545. catch
  546. {
  547. HTTPManager.Logger.Information("WebSocketResponse", "Error while sending PING message! Closing WebSocket.");
  548. CloseWithError("Error while sending PING message!");
  549. }
  550. }
  551. private void CloseWithError(string message)
  552. {
  553. this.baseRequest.Exception = new Exception(message);
  554. this.baseRequest.State = HTTPRequestStates.Error;
  555. this.closed = true;
  556. HTTPManager.Heartbeats.Unsubscribe(this);
  557. HTTPUpdateDelegator.OnApplicationForegroundStateChanged -= OnApplicationForegroundStateChanged;
  558. newFrameSignal.Set();
  559. CloseStream();
  560. }
  561. private int CalculateLatency()
  562. {
  563. if (this.rtts.Count == 0)
  564. return 0;
  565. int sumLatency = 0;
  566. for (int i = 0; i < this.rtts.Count; ++i)
  567. sumLatency += this.rtts[i];
  568. return sumLatency / this.rtts.Count;
  569. }
  570. }
  571. }
  572. #endif