EventSourceResponse.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. #if !BESTHTTP_DISABLE_SERVERSENT_EVENTS && (!UNITY_WEBGL || UNITY_EDITOR)
  2. using System;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Text;
  6. using System.Collections.Generic;
  7. namespace BestHTTP.ServerSentEvents
  8. {
  9. /// <summary>
  10. /// A low-level class to receive and parse an EventSource(http://www.w3.org/TR/eventsource/) stream.
  11. /// Higher level protocol representation is implemented in the EventSource class.
  12. /// </summary>
  13. public sealed class EventSourceResponse : HTTPResponse, IProtocol
  14. {
  15. public bool IsClosed { get; private set; }
  16. #region Public Events
  17. public Action<EventSourceResponse, BestHTTP.ServerSentEvents.Message> OnMessage;
  18. public Action<EventSourceResponse> OnClosed;
  19. #endregion
  20. #region Privates
  21. /// <summary>
  22. /// Thread sync object
  23. /// </summary>
  24. private object FrameLock = new object();
  25. /// <summary>
  26. /// Buffer for the read data.
  27. /// </summary>
  28. private byte[] LineBuffer = new byte[1024];
  29. /// <summary>
  30. /// Buffer position.
  31. /// </summary>
  32. private int LineBufferPos = 0;
  33. /// <summary>
  34. /// The currently receiving and parsing message
  35. /// </summary>
  36. private BestHTTP.ServerSentEvents.Message CurrentMessage;
  37. /// <summary>
  38. /// Completed messages that waiting to be dispatched
  39. /// </summary>
  40. private List<BestHTTP.ServerSentEvents.Message> CompletedMessages = new List<BestHTTP.ServerSentEvents.Message>();
  41. #endregion
  42. public EventSourceResponse(HTTPRequest request, Stream stream, bool isStreamed, bool isFromCache)
  43. :base(request, stream, isStreamed, isFromCache)
  44. {
  45. base.IsClosedManually = true;
  46. }
  47. public override bool Receive(int forceReadRawContentLength = -1, bool readPayloadData = true)
  48. {
  49. bool received = base.Receive(forceReadRawContentLength, false);
  50. string contentType = this.GetFirstHeaderValue("content-type");
  51. base.IsUpgraded = received &&
  52. this.StatusCode == 200 &&
  53. !string.IsNullOrEmpty(contentType) &&
  54. contentType.ToLower().StartsWith("text/event-stream");
  55. // If we didn't upgraded to the protocol we have to read all the sent payload because
  56. // next requests may read these datas as HTTP headers and will fail
  57. if (!IsUpgraded)
  58. ReadPayload(forceReadRawContentLength);
  59. return received;
  60. }
  61. internal void StartReceive()
  62. {
  63. if (IsUpgraded)
  64. {
  65. #if NETFX_CORE
  66. #pragma warning disable 4014
  67. Windows.System.Threading.ThreadPool.RunAsync(ReceiveThreadFunc);
  68. #pragma warning restore 4014
  69. #else
  70. ThreadPool.QueueUserWorkItem(ReceiveThreadFunc);
  71. //new Thread(ReceiveThreadFunc)
  72. // .Start();
  73. #endif
  74. }
  75. }
  76. #region Private Threading Functions
  77. private void ReceiveThreadFunc(object param)
  78. {
  79. try
  80. {
  81. if (HasHeaderWithValue("transfer-encoding", "chunked"))
  82. ReadChunked(Stream);
  83. else
  84. ReadRaw(Stream, -1);
  85. }
  86. #if !NETFX_CORE
  87. catch (ThreadAbortException)
  88. {
  89. this.baseRequest.State = HTTPRequestStates.Aborted;
  90. }
  91. #endif
  92. catch (Exception e)
  93. {
  94. if (HTTPUpdateDelegator.IsCreated)
  95. {
  96. this.baseRequest.Exception = e;
  97. this.baseRequest.State = HTTPRequestStates.Error;
  98. }
  99. else
  100. this.baseRequest.State = HTTPRequestStates.Aborted;
  101. }
  102. finally
  103. {
  104. IsClosed = true;
  105. }
  106. }
  107. #endregion
  108. #region Read Implementations
  109. // http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1
  110. private new void ReadChunked(Stream stream)
  111. {
  112. int chunkLength = ReadChunkLength(stream);
  113. byte[] buffer = new byte[chunkLength];
  114. while (chunkLength != 0)
  115. {
  116. // To avoid more GC garbage we use only one buffer, and resize only if the next chunk doesn't fit.
  117. if (buffer.Length < chunkLength)
  118. Array.Resize<byte>(ref buffer, chunkLength);
  119. int readBytes = 0;
  120. // Fill up the buffer
  121. do
  122. {
  123. int bytes = stream.Read(buffer, readBytes, chunkLength - readBytes);
  124. if (bytes == 0)
  125. throw new Exception("The remote server closed the connection unexpectedly!");
  126. readBytes += bytes;
  127. } while (readBytes < chunkLength);
  128. FeedData(buffer, readBytes);
  129. // Every chunk data has a trailing CRLF
  130. ReadTo(stream, LF);
  131. // read the next chunk's length
  132. chunkLength = ReadChunkLength(stream);
  133. }
  134. // Read the trailing headers or the CRLF
  135. ReadHeaders(stream);
  136. }
  137. private new void ReadRaw(Stream stream, long contentLength)
  138. {
  139. byte[] buffer = new byte[1024];
  140. int bytes;
  141. do
  142. {
  143. bytes = stream.Read(buffer, 0, buffer.Length);
  144. FeedData(buffer, bytes);
  145. } while(bytes > 0);
  146. }
  147. #endregion
  148. #region Data Parsing
  149. public void FeedData(byte[] buffer, int count)
  150. {
  151. if (count == -1)
  152. count = buffer.Length;
  153. if (count == 0)
  154. return;
  155. int newlineIdx;
  156. int pos = 0;
  157. do {
  158. newlineIdx = -1;
  159. int skipCount = 1; // to skip CR and/or LF
  160. for (int i = pos; i < count && newlineIdx == -1; ++i)
  161. {
  162. // Lines must be separated by either a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, or a single U+000D CARRIAGE RETURN (CR) character.
  163. if (buffer[i] == HTTPResponse.CR)
  164. {
  165. if (i + 1 < count && buffer[i + 1] == HTTPResponse.LF)
  166. skipCount = 2;
  167. newlineIdx = i;
  168. }
  169. else if (buffer[i] == HTTPResponse.LF)
  170. newlineIdx = i;
  171. }
  172. int copyIndex = newlineIdx == -1 ? count : newlineIdx;
  173. if (LineBuffer.Length < LineBufferPos + (copyIndex - pos))
  174. Array.Resize<byte>(ref LineBuffer, LineBufferPos + (copyIndex - pos));
  175. Array.Copy(buffer, pos, LineBuffer, LineBufferPos, copyIndex - pos);
  176. LineBufferPos += copyIndex - pos;
  177. if (newlineIdx == -1)
  178. return;
  179. ParseLine(LineBuffer, LineBufferPos);
  180. LineBufferPos = 0;
  181. //pos += newlineIdx + skipCount;
  182. pos = newlineIdx + skipCount;
  183. }while(newlineIdx != -1 && pos < count);
  184. }
  185. void ParseLine(byte[] buffer, int count)
  186. {
  187. // If the line is empty (a blank line) => Dispatch the event
  188. if (count == 0)
  189. {
  190. if (CurrentMessage != null)
  191. {
  192. lock (FrameLock)
  193. CompletedMessages.Add(CurrentMessage);
  194. CurrentMessage = null;
  195. }
  196. return;
  197. }
  198. // If the line starts with a U+003A COLON character (:) => Ignore the line.
  199. if (buffer[0] == 0x3A)
  200. return;
  201. //If the line contains a U+003A COLON character (:)
  202. int colonIdx = -1;
  203. for (int i = 0; i < count && colonIdx == -1; ++i)
  204. if (buffer[i] == 0x3A)
  205. colonIdx = i;
  206. string field;
  207. string value;
  208. if (colonIdx != -1)
  209. {
  210. // Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
  211. field = Encoding.UTF8.GetString(buffer, 0, colonIdx);
  212. //Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. If value starts with a U+0020 SPACE character, remove it from value.
  213. if (colonIdx + 1 < count && buffer[colonIdx + 1] == 0x20)
  214. colonIdx++;
  215. colonIdx++;
  216. // discarded because it is not followed by a blank line
  217. if (colonIdx >= count)
  218. return;
  219. value = Encoding.UTF8.GetString(buffer, colonIdx, count - colonIdx);
  220. }
  221. else
  222. {
  223. // Otherwise, the string is not empty but does not contain a U+003A COLON character (:) =>
  224. // Process the field using the whole line as the field name, and the empty string as the field value.
  225. field = Encoding.UTF8.GetString(buffer, 0, count);
  226. value = string.Empty;
  227. }
  228. if (CurrentMessage == null)
  229. CurrentMessage = new BestHTTP.ServerSentEvents.Message();
  230. switch(field)
  231. {
  232. // If the field name is "id" => Set the last event ID buffer to the field value.
  233. case "id":
  234. CurrentMessage.Id = value;
  235. break;
  236. // If the field name is "event" => Set the event type buffer to field value.
  237. case "event":
  238. CurrentMessage.Event = value;
  239. break;
  240. // If the field name is "data" => Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer.
  241. case "data":
  242. // Append a new line if we already have some data. This way we can skip step 3.) in the EventSource's OnMessageReceived.
  243. // We do only null check, because empty string can be valid payload
  244. if (CurrentMessage.Data != null)
  245. CurrentMessage.Data += Environment.NewLine;
  246. CurrentMessage.Data += value;
  247. break;
  248. // If the field name is "retry" => If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten,
  249. // and set the event stream's reconnection time to that integer. Otherwise, ignore the field.
  250. case "retry":
  251. int result;
  252. if (int.TryParse(value, out result))
  253. CurrentMessage.Retry = TimeSpan.FromMilliseconds(result);
  254. break;
  255. // Otherwise: The field is ignored.
  256. default:
  257. break;
  258. }
  259. }
  260. #endregion
  261. void IProtocol.HandleEvents()
  262. {
  263. lock(FrameLock)
  264. {
  265. // Send out messages.
  266. if (CompletedMessages.Count > 0)
  267. {
  268. if (OnMessage != null)
  269. for (int i = 0; i < CompletedMessages.Count; ++i)
  270. {
  271. try
  272. {
  273. OnMessage(this, CompletedMessages[i]);
  274. }
  275. catch(Exception ex)
  276. {
  277. HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnMessage", ex);
  278. }
  279. }
  280. CompletedMessages.Clear();
  281. }
  282. }
  283. // We are closed
  284. if (IsClosed)
  285. {
  286. CompletedMessages.Clear();
  287. if (OnClosed != null)
  288. {
  289. try
  290. {
  291. OnClosed(this);
  292. }
  293. catch (Exception ex)
  294. {
  295. HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnClosed", ex);
  296. }
  297. finally
  298. {
  299. OnClosed = null;
  300. }
  301. }
  302. }
  303. }
  304. }
  305. }
  306. #endif