123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- #if !BESTHTTP_DISABLE_SERVERSENT_EVENTS && (!UNITY_WEBGL || UNITY_EDITOR)
- using System;
- using System.IO;
- using System.Threading;
- using System.Text;
- using System.Collections.Generic;
- namespace BestHTTP.ServerSentEvents
- {
- /// <summary>
- /// A low-level class to receive and parse an EventSource(http://www.w3.org/TR/eventsource/) stream.
- /// Higher level protocol representation is implemented in the EventSource class.
- /// </summary>
- public sealed class EventSourceResponse : HTTPResponse, IProtocol
- {
- public bool IsClosed { get; private set; }
- #region Public Events
- public Action<EventSourceResponse, BestHTTP.ServerSentEvents.Message> OnMessage;
- public Action<EventSourceResponse> OnClosed;
- #endregion
- #region Privates
- /// <summary>
- /// Thread sync object
- /// </summary>
- private object FrameLock = new object();
- /// <summary>
- /// Buffer for the read data.
- /// </summary>
- private byte[] LineBuffer = new byte[1024];
- /// <summary>
- /// Buffer position.
- /// </summary>
- private int LineBufferPos = 0;
- /// <summary>
- /// The currently receiving and parsing message
- /// </summary>
- private BestHTTP.ServerSentEvents.Message CurrentMessage;
- /// <summary>
- /// Completed messages that waiting to be dispatched
- /// </summary>
- private List<BestHTTP.ServerSentEvents.Message> CompletedMessages = new List<BestHTTP.ServerSentEvents.Message>();
- #endregion
- public EventSourceResponse(HTTPRequest request, Stream stream, bool isStreamed, bool isFromCache)
- :base(request, stream, isStreamed, isFromCache)
- {
- base.IsClosedManually = true;
- }
- public override bool Receive(int forceReadRawContentLength = -1, bool readPayloadData = true)
- {
- bool received = base.Receive(forceReadRawContentLength, false);
- string contentType = this.GetFirstHeaderValue("content-type");
- base.IsUpgraded = received &&
- this.StatusCode == 200 &&
- !string.IsNullOrEmpty(contentType) &&
- contentType.ToLower().StartsWith("text/event-stream");
- // If we didn't upgraded to the protocol we have to read all the sent payload because
- // next requests may read these datas as HTTP headers and will fail
- if (!IsUpgraded)
- ReadPayload(forceReadRawContentLength);
- return received;
- }
- internal void StartReceive()
- {
- if (IsUpgraded)
- {
- #if NETFX_CORE
- #pragma warning disable 4014
- Windows.System.Threading.ThreadPool.RunAsync(ReceiveThreadFunc);
- #pragma warning restore 4014
- #else
- ThreadPool.QueueUserWorkItem(ReceiveThreadFunc);
- //new Thread(ReceiveThreadFunc)
- // .Start();
- #endif
- }
- }
- #region Private Threading Functions
- private void ReceiveThreadFunc(object param)
- {
- try
- {
- if (HasHeaderWithValue("transfer-encoding", "chunked"))
- ReadChunked(Stream);
- else
- ReadRaw(Stream, -1);
- }
- #if !NETFX_CORE
- catch (ThreadAbortException)
- {
- this.baseRequest.State = HTTPRequestStates.Aborted;
- }
- #endif
- catch (Exception e)
- {
- if (HTTPUpdateDelegator.IsCreated)
- {
- this.baseRequest.Exception = e;
- this.baseRequest.State = HTTPRequestStates.Error;
- }
- else
- this.baseRequest.State = HTTPRequestStates.Aborted;
- }
- finally
- {
- IsClosed = true;
- }
- }
- #endregion
- #region Read Implementations
- // http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1
- private new void ReadChunked(Stream stream)
- {
- int chunkLength = ReadChunkLength(stream);
- byte[] buffer = new byte[chunkLength];
- while (chunkLength != 0)
- {
- // To avoid more GC garbage we use only one buffer, and resize only if the next chunk doesn't fit.
- if (buffer.Length < chunkLength)
- Array.Resize<byte>(ref buffer, chunkLength);
- int readBytes = 0;
- // Fill up the buffer
- do
- {
- int bytes = stream.Read(buffer, readBytes, chunkLength - readBytes);
- if (bytes == 0)
- throw new Exception("The remote server closed the connection unexpectedly!");
- readBytes += bytes;
- } while (readBytes < chunkLength);
- FeedData(buffer, readBytes);
- // Every chunk data has a trailing CRLF
- ReadTo(stream, LF);
- // read the next chunk's length
- chunkLength = ReadChunkLength(stream);
- }
- // Read the trailing headers or the CRLF
- ReadHeaders(stream);
- }
- private new void ReadRaw(Stream stream, long contentLength)
- {
- byte[] buffer = new byte[1024];
- int bytes;
- do
- {
- bytes = stream.Read(buffer, 0, buffer.Length);
- FeedData(buffer, bytes);
- } while(bytes > 0);
- }
- #endregion
- #region Data Parsing
- public void FeedData(byte[] buffer, int count)
- {
- if (count == -1)
- count = buffer.Length;
- if (count == 0)
- return;
- int newlineIdx;
- int pos = 0;
- do {
- newlineIdx = -1;
- int skipCount = 1; // to skip CR and/or LF
- for (int i = pos; i < count && newlineIdx == -1; ++i)
- {
- // Lines must be separated by either a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, or a single U+000D CARRIAGE RETURN (CR) character.
- if (buffer[i] == HTTPResponse.CR)
- {
- if (i + 1 < count && buffer[i + 1] == HTTPResponse.LF)
- skipCount = 2;
- newlineIdx = i;
- }
- else if (buffer[i] == HTTPResponse.LF)
- newlineIdx = i;
- }
- int copyIndex = newlineIdx == -1 ? count : newlineIdx;
- if (LineBuffer.Length < LineBufferPos + (copyIndex - pos))
- Array.Resize<byte>(ref LineBuffer, LineBufferPos + (copyIndex - pos));
- Array.Copy(buffer, pos, LineBuffer, LineBufferPos, copyIndex - pos);
- LineBufferPos += copyIndex - pos;
- if (newlineIdx == -1)
- return;
- ParseLine(LineBuffer, LineBufferPos);
- LineBufferPos = 0;
- //pos += newlineIdx + skipCount;
- pos = newlineIdx + skipCount;
- }while(newlineIdx != -1 && pos < count);
- }
- void ParseLine(byte[] buffer, int count)
- {
- // If the line is empty (a blank line) => Dispatch the event
- if (count == 0)
- {
- if (CurrentMessage != null)
- {
- lock (FrameLock)
- CompletedMessages.Add(CurrentMessage);
- CurrentMessage = null;
- }
- return;
- }
- // If the line starts with a U+003A COLON character (:) => Ignore the line.
- if (buffer[0] == 0x3A)
- return;
- //If the line contains a U+003A COLON character (:)
- int colonIdx = -1;
- for (int i = 0; i < count && colonIdx == -1; ++i)
- if (buffer[i] == 0x3A)
- colonIdx = i;
- string field;
- string value;
- if (colonIdx != -1)
- {
- // Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
- field = Encoding.UTF8.GetString(buffer, 0, colonIdx);
- //Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. If value starts with a U+0020 SPACE character, remove it from value.
- if (colonIdx + 1 < count && buffer[colonIdx + 1] == 0x20)
- colonIdx++;
- colonIdx++;
- // discarded because it is not followed by a blank line
- if (colonIdx >= count)
- return;
- value = Encoding.UTF8.GetString(buffer, colonIdx, count - colonIdx);
- }
- else
- {
- // Otherwise, the string is not empty but does not contain a U+003A COLON character (:) =>
- // Process the field using the whole line as the field name, and the empty string as the field value.
- field = Encoding.UTF8.GetString(buffer, 0, count);
- value = string.Empty;
- }
- if (CurrentMessage == null)
- CurrentMessage = new BestHTTP.ServerSentEvents.Message();
- switch(field)
- {
- // If the field name is "id" => SaveLocal the last event ID buffer to the field value.
- case "id":
- CurrentMessage.Id = value;
- break;
- // If the field name is "event" => SaveLocal the event type buffer to field value.
- case "event":
- CurrentMessage.Event = value;
- break;
- // If the field name is "data" => Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer.
- case "data":
- // Append a new line if we already have some data. This way we can skip step 3.) in the EventSource's OnMessageReceived.
- // We do only null check, because empty string can be valid payload
- if (CurrentMessage.Data != null)
- CurrentMessage.Data += Environment.NewLine;
- CurrentMessage.Data += value;
- break;
- // If the field name is "retry" => If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten,
- // and set the event stream's reconnection time to that integer. Otherwise, ignore the field.
- case "retry":
- int result;
- if (int.TryParse(value, out result))
- CurrentMessage.Retry = TimeSpan.FromMilliseconds(result);
- break;
- // Otherwise: The field is ignored.
- default:
- break;
- }
- }
- #endregion
- void IProtocol.HandleEvents()
- {
- lock(FrameLock)
- {
- // Send out messages.
- if (CompletedMessages.Count > 0)
- {
- if (OnMessage != null)
- for (int i = 0; i < CompletedMessages.Count; ++i)
- {
- try
- {
- OnMessage(this, CompletedMessages[i]);
- }
- catch(Exception ex)
- {
- HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnMessage", ex);
- }
- }
- CompletedMessages.Clear();
- }
- }
- // We are closed
- if (IsClosed)
- {
- CompletedMessages.Clear();
- if (OnClosed != null)
- {
- try
- {
- OnClosed(this);
- }
- catch (Exception ex)
- {
- HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnClosed", ex);
- }
- finally
- {
- OnClosed = null;
- }
- }
- }
- }
- }
- }
- #endif
|