123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- #if !BESTHTTP_DISABLE_SERVERSENT_EVENTS && (!UNITY_WEBGL || UNITY_EDITOR)
- using System;
- using System.IO;
- using System.Threading;
- using System.Text;
- using System.Collections.Generic;
- namespace BestHTTP.ServerSentEvents
- {
-
-
-
-
- public sealed class EventSourceResponse : HTTPResponse, IProtocol
- {
- public bool IsClosed { get; private set; }
- #region Public Events
- public Action<EventSourceResponse, BestHTTP.ServerSentEvents.Message> OnMessage;
- public Action<EventSourceResponse> OnClosed;
- #endregion
- #region Privates
-
-
-
- private object FrameLock = new object();
-
-
-
- private byte[] LineBuffer = new byte[1024];
-
-
-
- private int LineBufferPos = 0;
-
-
-
- private BestHTTP.ServerSentEvents.Message CurrentMessage;
-
-
-
- private List<BestHTTP.ServerSentEvents.Message> CompletedMessages = new List<BestHTTP.ServerSentEvents.Message>();
- #endregion
- public EventSourceResponse(HTTPRequest request, Stream stream, bool isStreamed, bool isFromCache)
- :base(request, stream, isStreamed, isFromCache)
- {
- base.IsClosedManually = true;
- }
- public override bool Receive(int forceReadRawContentLength = -1, bool readPayloadData = true)
- {
- bool received = base.Receive(forceReadRawContentLength, false);
- string contentType = this.GetFirstHeaderValue("content-type");
- base.IsUpgraded = received &&
- this.StatusCode == 200 &&
- !string.IsNullOrEmpty(contentType) &&
- contentType.ToLower().StartsWith("text/event-stream");
-
-
- if (!IsUpgraded)
- ReadPayload(forceReadRawContentLength);
- return received;
- }
- internal void StartReceive()
- {
- if (IsUpgraded)
- {
- #if NETFX_CORE
- #pragma warning disable 4014
- Windows.System.Threading.ThreadPool.RunAsync(ReceiveThreadFunc);
- #pragma warning restore 4014
- #else
- ThreadPool.QueueUserWorkItem(ReceiveThreadFunc);
-
-
- #endif
- }
- }
- #region Private Threading Functions
- private void ReceiveThreadFunc(object param)
- {
- try
- {
- if (HasHeaderWithValue("transfer-encoding", "chunked"))
- ReadChunked(Stream);
- else
- ReadRaw(Stream, -1);
- }
- #if !NETFX_CORE
- catch (ThreadAbortException)
- {
- this.baseRequest.State = HTTPRequestStates.Aborted;
- }
- #endif
- catch (Exception e)
- {
- if (HTTPUpdateDelegator.IsCreated)
- {
- this.baseRequest.Exception = e;
- this.baseRequest.State = HTTPRequestStates.Error;
- }
- else
- this.baseRequest.State = HTTPRequestStates.Aborted;
- }
- finally
- {
- IsClosed = true;
- }
- }
- #endregion
- #region Read Implementations
-
- private new void ReadChunked(Stream stream)
- {
- int chunkLength = ReadChunkLength(stream);
- byte[] buffer = new byte[chunkLength];
- while (chunkLength != 0)
- {
-
- if (buffer.Length < chunkLength)
- Array.Resize<byte>(ref buffer, chunkLength);
- int readBytes = 0;
-
- do
- {
- int bytes = stream.Read(buffer, readBytes, chunkLength - readBytes);
- if (bytes == 0)
- throw new Exception("The remote server closed the connection unexpectedly!");
- readBytes += bytes;
- } while (readBytes < chunkLength);
- FeedData(buffer, readBytes);
-
- ReadTo(stream, LF);
-
- chunkLength = ReadChunkLength(stream);
- }
-
- ReadHeaders(stream);
- }
- private new void ReadRaw(Stream stream, long contentLength)
- {
- byte[] buffer = new byte[1024];
- int bytes;
- do
- {
- bytes = stream.Read(buffer, 0, buffer.Length);
- FeedData(buffer, bytes);
- } while(bytes > 0);
- }
- #endregion
- #region Data Parsing
- public void FeedData(byte[] buffer, int count)
- {
- if (count == -1)
- count = buffer.Length;
- if (count == 0)
- return;
- int newlineIdx;
- int pos = 0;
- do {
- newlineIdx = -1;
- int skipCount = 1;
- for (int i = pos; i < count && newlineIdx == -1; ++i)
- {
-
- if (buffer[i] == HTTPResponse.CR)
- {
- if (i + 1 < count && buffer[i + 1] == HTTPResponse.LF)
- skipCount = 2;
- newlineIdx = i;
- }
- else if (buffer[i] == HTTPResponse.LF)
- newlineIdx = i;
- }
- int copyIndex = newlineIdx == -1 ? count : newlineIdx;
- if (LineBuffer.Length < LineBufferPos + (copyIndex - pos))
- Array.Resize<byte>(ref LineBuffer, LineBufferPos + (copyIndex - pos));
- Array.Copy(buffer, pos, LineBuffer, LineBufferPos, copyIndex - pos);
- LineBufferPos += copyIndex - pos;
- if (newlineIdx == -1)
- return;
- ParseLine(LineBuffer, LineBufferPos);
- LineBufferPos = 0;
-
- pos = newlineIdx + skipCount;
- }while(newlineIdx != -1 && pos < count);
- }
- void ParseLine(byte[] buffer, int count)
- {
-
- if (count == 0)
- {
- if (CurrentMessage != null)
- {
- lock (FrameLock)
- CompletedMessages.Add(CurrentMessage);
- CurrentMessage = null;
- }
- return;
- }
-
- if (buffer[0] == 0x3A)
- return;
-
- int colonIdx = -1;
- for (int i = 0; i < count && colonIdx == -1; ++i)
- if (buffer[i] == 0x3A)
- colonIdx = i;
- string field;
- string value;
- if (colonIdx != -1)
- {
-
- field = Encoding.UTF8.GetString(buffer, 0, colonIdx);
-
- if (colonIdx + 1 < count && buffer[colonIdx + 1] == 0x20)
- colonIdx++;
- colonIdx++;
-
- if (colonIdx >= count)
- return;
- value = Encoding.UTF8.GetString(buffer, colonIdx, count - colonIdx);
- }
- else
- {
-
-
- field = Encoding.UTF8.GetString(buffer, 0, count);
- value = string.Empty;
- }
- if (CurrentMessage == null)
- CurrentMessage = new BestHTTP.ServerSentEvents.Message();
- switch(field)
- {
-
- case "id":
- CurrentMessage.Id = value;
- break;
-
- case "event":
- CurrentMessage.Event = value;
- break;
-
- case "data":
-
-
- if (CurrentMessage.Data != null)
- CurrentMessage.Data += Environment.NewLine;
- CurrentMessage.Data += value;
- break;
-
-
- case "retry":
- int result;
- if (int.TryParse(value, out result))
- CurrentMessage.Retry = TimeSpan.FromMilliseconds(result);
- break;
-
- default:
- break;
- }
- }
- #endregion
- void IProtocol.HandleEvents()
- {
- lock(FrameLock)
- {
-
- if (CompletedMessages.Count > 0)
- {
- if (OnMessage != null)
- for (int i = 0; i < CompletedMessages.Count; ++i)
- {
- try
- {
- OnMessage(this, CompletedMessages[i]);
- }
- catch(Exception ex)
- {
- HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnMessage", ex);
- }
- }
- CompletedMessages.Clear();
- }
- }
-
- if (IsClosed)
- {
- CompletedMessages.Clear();
- if (OnClosed != null)
- {
- try
- {
- OnClosed(this);
- }
- catch (Exception ex)
- {
- HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnClosed", ex);
- }
- finally
- {
- OnClosed = null;
- }
- }
- }
- }
- }
- }
- #endif
|