#if !BESTHTTP_DISABLE_SIGNALR_CORE && !BESTHTTP_DISABLE_WEBSOCKET
using BestHTTP.Futures;
using BestHTTP.SignalRCore.Messages;
using System;
using System.Collections.Generic;
namespace BestHTTP.SignalRCore
{
public sealed class HubOptions
{
///
/// When this is set to true, the plugin will skip the negotiation request if the PreferedTransport is
/// WebSocket.
///
public bool SkipNegotiation { get; set; }
///
/// The preferred transport to choose when more than one available.
///
public TransportTypes PreferedTransport { get; set; }
}
public sealed class HubConnection
{
public static readonly object[] EmptyArgs = new object[0];
///
/// Uri of the Hub endpoint
///
public Uri Uri { get; private set; }
///
/// Current state of this connection.
///
public ConnectionStates State { get; private set; }
///
/// Current, active ITransport instance.
///
public ITransport Transport { get; private set; }
///
/// The IProtocol implementation that will parse, encode and decode messages.
///
public IProtocol Protocol { get; private set; }
///
/// This event is called when successfully connected to the hub.
///
public event Action OnConnected;
///
/// This event is called when an unexpected error happen and the connection is closed.
///
public event Action OnError;
///
/// This event is called when the connection is gracefully terminated.
///
public event Action OnClosed;
///
/// This event is called for every server-sent message. When returns false, no further processing of the message is done
/// by the plugin.
///
public event Func OnMessage;
///
/// An IAuthenticationProvider implementation that will be used to authenticate the connection.
///
public IAuthenticationProvider AuthenticationProvider { get; set; }
///
/// Negotiation response sent by the server.
///
public NegotiationResult NegotiationResult { get; private set; }
///
///
///
public HubOptions Options { get; private set; }
///
/// This will be increment to add a unique id to every message the plugin will send.
///
private long lastInvocationId = 0;
///
/// Store the callback for all sent message that expect a return value from the server. All sent message has
/// a unique invocationId that will be sent back from the server.
///
private Dictionary> invocations = new Dictionary>();
///
/// This is where we store the methodname => callback mapping.
///
private Dictionary subscriptions = new Dictionary(StringComparer.OrdinalIgnoreCase);
public HubConnection(Uri hubUri, IProtocol protocol)
: this(hubUri, protocol, new HubOptions())
{
}
public HubConnection(Uri hubUri, IProtocol protocol, HubOptions options)
{
this.Uri = hubUri;
this.State = ConnectionStates.Initial;
this.Options = options;
this.Protocol = protocol;
this.Protocol.Connection = this;
}
public void StartConnect()
{
if (this.State != ConnectionStates.Initial)
return;
HTTPManager.Logger.Verbose("HubConnection", "StartConnect");
if (this.AuthenticationProvider != null && this.AuthenticationProvider.IsPreAuthRequired)
{
HTTPManager.Logger.Information("HubConnection", "StartConnect - Authenticating");
SetState(ConnectionStates.Authenticating);
this.AuthenticationProvider.OnAuthenticationSucceded += OnAuthenticationSucceded;
this.AuthenticationProvider.OnAuthenticationFailed += OnAuthenticationFailed;
// Start the authentication process
this.AuthenticationProvider.StartAuthentication();
}
else
StartNegotiation();
}
private void OnAuthenticationSucceded(IAuthenticationProvider provider)
{
HTTPManager.Logger.Verbose("HubConnection", "OnAuthenticationSucceded");
this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
StartNegotiation();
}
private void OnAuthenticationFailed(IAuthenticationProvider provider, string reason)
{
HTTPManager.Logger.Error("HubConnection", "OnAuthenticationFailed: " + reason);
this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
SetState(ConnectionStates.Closed, reason);
}
private void StartNegotiation()
{
HTTPManager.Logger.Verbose("HubConnection", "StartNegotiation");
if (this.Options.SkipNegotiation)
{
HTTPManager.Logger.Verbose("HubConnection", "Skipping negotiation");
ConnectImpl();
return;
}
if (this.State == ConnectionStates.CloseInitiated)
{
SetState(ConnectionStates.Closed);
return;
}
SetState(ConnectionStates.Negotiating);
// https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
// Send out a negotiation request. While we could skip it and connect right with the websocket transport
// it might return with additional information that could be useful.
UriBuilder builder = new UriBuilder(this.Uri);
builder.Path += "/negotiate";
var request = new HTTPRequest(builder.Uri, HTTPMethods.Post, OnNegotiationRequestFinished);
if (this.AuthenticationProvider != null)
this.AuthenticationProvider.PrepareRequest(request);
request.Send();
}
private void ConnectImpl()
{
HTTPManager.Logger.Verbose("HubConnection", "ConnectImpl");
switch (this.Options.PreferedTransport)
{
case TransportTypes.WebSocket:
if (this.NegotiationResult != null && !IsTransportSupported("WebSockets"))
{
SetState(ConnectionStates.Closed, "The 'WebSockets' transport isn't supported by the server!");
return;
}
this.Transport = new Transports.WebSocketTransport(this);
this.Transport.OnStateChanged += Transport_OnStateChanged;
break;
default:
SetState(ConnectionStates.Closed, "Unsupportted transport: " + this.Options.PreferedTransport);
break;
}
this.Transport.StartConnect();
}
private bool IsTransportSupported(string transportName)
{
// https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
// If the negotiation response contains only the url and accessToken, no 'availableTransports' list is sent
if (this.NegotiationResult.SupportedTransports == null)
return true;
for (int i = 0; i < this.NegotiationResult.SupportedTransports.Count; ++i)
if (this.NegotiationResult.SupportedTransports[i].Name == transportName)
return true;
return false;
}
private void OnNegotiationRequestFinished(HTTPRequest req, HTTPResponse resp)
{
if (this.State == ConnectionStates.CloseInitiated)
{
SetState(ConnectionStates.Closed);
return;
}
string errorReason = null;
switch (req.State)
{
// The request finished without any problem.
case HTTPRequestStates.Finished:
if (resp.IsSuccess)
{
HTTPManager.Logger.Information("HubConnection", "Negotiation Request Finished Successfully! Response: " + resp.DataAsText);
// Parse negotiation
this.NegotiationResult = NegotiationResult.Parse(resp.DataAsText, out errorReason);
// TODO: check validity of the negotiation result:
// If url and accessToken is present, the other two must be null.
// https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
if (string.IsNullOrEmpty(errorReason))
ConnectImpl();
}
else // Internal server error?
errorReason = string.Format("Negotiation Request Finished Successfully, but the server sent an error. Status Code: {0}-{1} Message: {2}",
resp.StatusCode,
resp.Message,
resp.DataAsText);
break;
// The request finished with an unexpected error. The request's Exception property may contain more info about the error.
case HTTPRequestStates.Error:
errorReason = "Negotiation Request Finished with Error! " + (req.Exception != null ? (req.Exception.Message + "\n" + req.Exception.StackTrace) : "No Exception");
break;
// The request aborted, initiated by the user.
case HTTPRequestStates.Aborted:
errorReason = "Negotiation Request Aborted!";
break;
// Connecting to the server is timed out.
case HTTPRequestStates.ConnectionTimedOut:
errorReason = "Negotiation Request - Connection Timed Out!";
break;
// The request didn't finished in the given time.
case HTTPRequestStates.TimedOut:
errorReason = "Negotiation Request - Processing the request Timed Out!";
break;
}
if (errorReason != null)
SetState(ConnectionStates.Closed, errorReason);
}
public void StartClose()
{
HTTPManager.Logger.Verbose("HubConnection", "StartClose");
SetState(ConnectionStates.CloseInitiated);
if (this.Transport != null)
this.Transport.StartClose();
}
public IFuture> Stream(string target, params object[] args)
{
var future = new Future>();
long id = InvokeImp(target,
args,
callback: (message) =>
{
switch (message.type)
{
// StreamItem message contains only one item.
case MessageTypes.StreamItem:
{
var container = future.value;
if (container.IsCanceled)
break;
container.AddItem((TResult)this.Protocol.ConvertTo(typeof(TResult), message.item));
// (re)assign the container to raise OnItem event
future.AssignItem(container);
break;
}
case MessageTypes.Completion:
{
bool isSuccess = string.IsNullOrEmpty(message.error);
if (isSuccess)
{
var container = future.value;
// While completion message must not contain any result, this should be future-proof
//if (!container.IsCanceled && message.Result != null)
//{
// TResult[] results = (TResult[])this.Protocol.ConvertTo(typeof(TResult[]), message.Result);
//
// container.AddItems(results);
//}
future.Assign(container);
}
else
future.Fail(new Exception(message.error));
break;
}
}
},
isStreamingInvocation: true);
future.BeginProcess(new StreamItemContainer(id));
return future;
}
public void CancelStream(StreamItemContainer container)
{
Message message = new Message {
type = MessageTypes.CancelInvocation,
invocationId = container.id.ToString()
};
container.IsCanceled = true;
SendMessage(message);
}
public IFuture Invoke(string target, params object[] args)
{
Future future = new Future();
InvokeImp(target,
args,
(message) =>
{
bool isSuccess = string.IsNullOrEmpty(message.error);
if (isSuccess)
future.Assign((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result));
else
future.Fail(new Exception(message.error));
});
return future;
}
public IFuture Send(string target, params object[] args)
{
Future future = new Future();
InvokeImp(target,
args,
(message) =>
{
bool isSuccess = string.IsNullOrEmpty(message.error);
if (isSuccess)
future.Assign(true);
else
future.Fail(new Exception(message.error));
});
return future;
}
private long InvokeImp(string target, object[] args, Action callback, bool isStreamingInvocation = false)
{
if (this.State != ConnectionStates.Connected)
throw new Exception("Not connected yet!");
long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId);
var message = new Message
{
type = isStreamingInvocation ? MessageTypes.StreamInvocation : MessageTypes.Invocation,
invocationId = invocationId.ToString(),
target = target,
arguments = args,
nonblocking = callback == null,
};
SendMessage(message);
if (callback != null)
this.invocations.Add(invocationId, callback);
return invocationId;
}
private void SendMessage(Message message)
{
byte[] encoded = this.Protocol.EncodeMessage(message);
this.Transport.Send(encoded);
}
public void On(string methodName, Action callback)
{
On(methodName, null, (args) => callback());
}
public void On(string methodName, Action callback)
{
On(methodName, new Type[] { typeof(T1) }, (args) => callback((T1)args[0]));
}
public void On(string methodName, Action callback)
{
On(methodName,
new Type[] { typeof(T1), typeof(T2) },
(args) => callback((T1)args[0], (T2)args[1]));
}
public void On(string methodName, Action callback)
{
On(methodName,
new Type[] { typeof(T1), typeof(T2), typeof(T3) },
(args) => callback((T1)args[0], (T2)args[1], (T3)args[2]));
}
public void On(string methodName, Action callback)
{
On(methodName,
new Type[] { typeof(T1), typeof(T2), typeof(T3), typeof(T4) },
(args) => callback((T1)args[0], (T2)args[1], (T3)args[2], (T4)args[3]));
}
public void On(string methodName, Type[] paramTypes, Action