#if !BESTHTTP_DISABLE_SERVERSENT_EVENTS && (!UNITY_WEBGL || UNITY_EDITOR)
using System;
using System.IO;
using System.Threading;
using System.Text;
using System.Collections.Generic;
namespace BestHTTP.ServerSentEvents
{
///
/// A low-level class to receive and parse an EventSource(http://www.w3.org/TR/eventsource/) stream.
/// Higher level protocol representation is implemented in the EventSource class.
///
public sealed class EventSourceResponse : HTTPResponse, IProtocol
{
public bool IsClosed { get; private set; }
#region Public Events
public Action OnMessage;
public Action OnClosed;
#endregion
#region Privates
///
/// Thread sync object
///
private object FrameLock = new object();
///
/// Buffer for the read data.
///
private byte[] LineBuffer = new byte[1024];
///
/// Buffer position.
///
private int LineBufferPos = 0;
///
/// The currently receiving and parsing message
///
private BestHTTP.ServerSentEvents.Message CurrentMessage;
///
/// Completed messages that waiting to be dispatched
///
private List CompletedMessages = new List();
#endregion
public EventSourceResponse(HTTPRequest request, Stream stream, bool isStreamed, bool isFromCache)
:base(request, stream, isStreamed, isFromCache)
{
base.IsClosedManually = true;
}
public override bool Receive(int forceReadRawContentLength = -1, bool readPayloadData = true)
{
bool received = base.Receive(forceReadRawContentLength, false);
string contentType = this.GetFirstHeaderValue("content-type");
base.IsUpgraded = received &&
this.StatusCode == 200 &&
!string.IsNullOrEmpty(contentType) &&
contentType.ToLower().StartsWith("text/event-stream");
// If we didn't upgraded to the protocol we have to read all the sent payload because
// next requests may read these datas as HTTP headers and will fail
if (!IsUpgraded)
ReadPayload(forceReadRawContentLength);
return received;
}
internal void StartReceive()
{
if (IsUpgraded)
{
#if NETFX_CORE
#pragma warning disable 4014
Windows.System.Threading.ThreadPool.RunAsync(ReceiveThreadFunc);
#pragma warning restore 4014
#else
ThreadPool.QueueUserWorkItem(ReceiveThreadFunc);
//new Thread(ReceiveThreadFunc)
// .Start();
#endif
}
}
#region Private Threading Functions
private void ReceiveThreadFunc(object param)
{
try
{
if (HasHeaderWithValue("transfer-encoding", "chunked"))
ReadChunked(Stream);
else
ReadRaw(Stream, -1);
}
#if !NETFX_CORE
catch (ThreadAbortException)
{
this.baseRequest.State = HTTPRequestStates.Aborted;
}
#endif
catch (Exception e)
{
if (HTTPUpdateDelegator.IsCreated)
{
this.baseRequest.Exception = e;
this.baseRequest.State = HTTPRequestStates.Error;
}
else
this.baseRequest.State = HTTPRequestStates.Aborted;
}
finally
{
IsClosed = true;
}
}
#endregion
#region Read Implementations
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1
private new void ReadChunked(Stream stream)
{
int chunkLength = ReadChunkLength(stream);
byte[] buffer = new byte[chunkLength];
while (chunkLength != 0)
{
// To avoid more GC garbage we use only one buffer, and resize only if the next chunk doesn't fit.
if (buffer.Length < chunkLength)
Array.Resize(ref buffer, chunkLength);
int readBytes = 0;
// Fill up the buffer
do
{
int bytes = stream.Read(buffer, readBytes, chunkLength - readBytes);
if (bytes == 0)
throw new Exception("The remote server closed the connection unexpectedly!");
readBytes += bytes;
} while (readBytes < chunkLength);
FeedData(buffer, readBytes);
// Every chunk data has a trailing CRLF
ReadTo(stream, LF);
// read the next chunk's length
chunkLength = ReadChunkLength(stream);
}
// Read the trailing headers or the CRLF
ReadHeaders(stream);
}
private new void ReadRaw(Stream stream, long contentLength)
{
byte[] buffer = new byte[1024];
int bytes;
do
{
bytes = stream.Read(buffer, 0, buffer.Length);
FeedData(buffer, bytes);
} while(bytes > 0);
}
#endregion
#region Data Parsing
public void FeedData(byte[] buffer, int count)
{
if (count == -1)
count = buffer.Length;
if (count == 0)
return;
int newlineIdx;
int pos = 0;
do {
newlineIdx = -1;
int skipCount = 1; // to skip CR and/or LF
for (int i = pos; i < count && newlineIdx == -1; ++i)
{
// Lines must be separated by either a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character, or a single U+000D CARRIAGE RETURN (CR) character.
if (buffer[i] == HTTPResponse.CR)
{
if (i + 1 < count && buffer[i + 1] == HTTPResponse.LF)
skipCount = 2;
newlineIdx = i;
}
else if (buffer[i] == HTTPResponse.LF)
newlineIdx = i;
}
int copyIndex = newlineIdx == -1 ? count : newlineIdx;
if (LineBuffer.Length < LineBufferPos + (copyIndex - pos))
Array.Resize(ref LineBuffer, LineBufferPos + (copyIndex - pos));
Array.Copy(buffer, pos, LineBuffer, LineBufferPos, copyIndex - pos);
LineBufferPos += copyIndex - pos;
if (newlineIdx == -1)
return;
ParseLine(LineBuffer, LineBufferPos);
LineBufferPos = 0;
//pos += newlineIdx + skipCount;
pos = newlineIdx + skipCount;
}while(newlineIdx != -1 && pos < count);
}
void ParseLine(byte[] buffer, int count)
{
// If the line is empty (a blank line) => Dispatch the event
if (count == 0)
{
if (CurrentMessage != null)
{
lock (FrameLock)
CompletedMessages.Add(CurrentMessage);
CurrentMessage = null;
}
return;
}
// If the line starts with a U+003A COLON character (:) => Ignore the line.
if (buffer[0] == 0x3A)
return;
//If the line contains a U+003A COLON character (:)
int colonIdx = -1;
for (int i = 0; i < count && colonIdx == -1; ++i)
if (buffer[i] == 0x3A)
colonIdx = i;
string field;
string value;
if (colonIdx != -1)
{
// Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
field = Encoding.UTF8.GetString(buffer, 0, colonIdx);
//Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. If value starts with a U+0020 SPACE character, remove it from value.
if (colonIdx + 1 < count && buffer[colonIdx + 1] == 0x20)
colonIdx++;
colonIdx++;
// discarded because it is not followed by a blank line
if (colonIdx >= count)
return;
value = Encoding.UTF8.GetString(buffer, colonIdx, count - colonIdx);
}
else
{
// Otherwise, the string is not empty but does not contain a U+003A COLON character (:) =>
// Process the field using the whole line as the field name, and the empty string as the field value.
field = Encoding.UTF8.GetString(buffer, 0, count);
value = string.Empty;
}
if (CurrentMessage == null)
CurrentMessage = new BestHTTP.ServerSentEvents.Message();
switch(field)
{
// If the field name is "id" => SaveLocal the last event ID buffer to the field value.
case "id":
CurrentMessage.Id = value;
break;
// If the field name is "event" => SaveLocal the event type buffer to field value.
case "event":
CurrentMessage.Event = value;
break;
// If the field name is "data" => Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer.
case "data":
// Append a new line if we already have some data. This way we can skip step 3.) in the EventSource's OnMessageReceived.
// We do only null check, because empty string can be valid payload
if (CurrentMessage.Data != null)
CurrentMessage.Data += Environment.NewLine;
CurrentMessage.Data += value;
break;
// If the field name is "retry" => If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten,
// and set the event stream's reconnection time to that integer. Otherwise, ignore the field.
case "retry":
int result;
if (int.TryParse(value, out result))
CurrentMessage.Retry = TimeSpan.FromMilliseconds(result);
break;
// Otherwise: The field is ignored.
default:
break;
}
}
#endregion
void IProtocol.HandleEvents()
{
lock(FrameLock)
{
// Send out messages.
if (CompletedMessages.Count > 0)
{
if (OnMessage != null)
for (int i = 0; i < CompletedMessages.Count; ++i)
{
try
{
OnMessage(this, CompletedMessages[i]);
}
catch(Exception ex)
{
HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnMessage", ex);
}
}
CompletedMessages.Clear();
}
}
// We are closed
if (IsClosed)
{
CompletedMessages.Clear();
if (OnClosed != null)
{
try
{
OnClosed(this);
}
catch (Exception ex)
{
HTTPManager.Logger.Exception("EventSourceMessage", "HandleEvents - OnClosed", ex);
}
finally
{
OnClosed = null;
}
}
}
}
}
}
#endif