#if !BESTHTTP_DISABLE_SIGNALR_CORE && !BESTHTTP_DISABLE_WEBSOCKET using BestHTTP.Futures; using BestHTTP.SignalRCore.Messages; using System; using System.Collections.Generic; namespace BestHTTP.SignalRCore { public sealed class HubOptions { /// /// When this is set to true, the plugin will skip the negotiation request if the PreferedTransport is /// WebSocket. /// public bool SkipNegotiation { get; set; } /// /// The preferred transport to choose when more than one available. /// public TransportTypes PreferedTransport { get; set; } } public sealed class HubConnection { public static readonly object[] EmptyArgs = new object[0]; /// /// Uri of the Hub endpoint /// public Uri Uri { get; private set; } /// /// Current state of this connection. /// public ConnectionStates State { get; private set; } /// /// Current, active ITransport instance. /// public ITransport Transport { get; private set; } /// /// The IProtocol implementation that will parse, encode and decode messages. /// public IProtocol Protocol { get; private set; } /// /// This event is called when successfully connected to the hub. /// public event Action OnConnected; /// /// This event is called when an unexpected error happen and the connection is closed. /// public event Action OnError; /// /// This event is called when the connection is gracefully terminated. /// public event Action OnClosed; /// /// This event is called for every server-sent message. When returns false, no further processing of the message is done /// by the plugin. /// public event Func OnMessage; /// /// An IAuthenticationProvider implementation that will be used to authenticate the connection. /// public IAuthenticationProvider AuthenticationProvider { get; set; } /// /// Negotiation response sent by the server. /// public NegotiationResult NegotiationResult { get; private set; } /// /// /// public HubOptions Options { get; private set; } /// /// This will be increment to add a unique id to every message the plugin will send. /// private long lastInvocationId = 0; /// /// Store the Callback for all sent message that expect a return value from the server. All sent message has /// a unique invocationId that will be sent back from the server. /// private Dictionary> invocations = new Dictionary>(); /// /// This is where we store the methodname => Callback mapping. /// private Dictionary subscriptions = new Dictionary(StringComparer.OrdinalIgnoreCase); public HubConnection(Uri hubUri, IProtocol protocol) : this(hubUri, protocol, new HubOptions()) { } public HubConnection(Uri hubUri, IProtocol protocol, HubOptions options) { this.Uri = hubUri; this.State = ConnectionStates.Initial; this.Options = options; this.Protocol = protocol; this.Protocol.Connection = this; } public void StartConnect() { if (this.State != ConnectionStates.Initial) return; HTTPManager.Logger.Verbose("HubConnection", "StartConnect"); if (this.AuthenticationProvider != null && this.AuthenticationProvider.IsPreAuthRequired) { HTTPManager.Logger.Information("HubConnection", "StartConnect - Authenticating"); SetState(ConnectionStates.Authenticating); this.AuthenticationProvider.OnAuthenticationSucceded += OnAuthenticationSucceded; this.AuthenticationProvider.OnAuthenticationFailed += OnAuthenticationFailed; // Start the authentication process this.AuthenticationProvider.StartAuthentication(); } else StartNegotiation(); } private void OnAuthenticationSucceded(IAuthenticationProvider provider) { HTTPManager.Logger.Verbose("HubConnection", "OnAuthenticationSucceded"); this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded; this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed; StartNegotiation(); } private void OnAuthenticationFailed(IAuthenticationProvider provider, string reason) { HTTPManager.Logger.Error("HubConnection", "OnAuthenticationFailed: " + reason); this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded; this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed; SetState(ConnectionStates.Closed, reason); } private void StartNegotiation() { HTTPManager.Logger.Verbose("HubConnection", "StartNegotiation"); if (this.Options.SkipNegotiation) { HTTPManager.Logger.Verbose("HubConnection", "Skipping negotiation"); ConnectImpl(); return; } if (this.State == ConnectionStates.CloseInitiated) { SetState(ConnectionStates.Closed); return; } SetState(ConnectionStates.Negotiating); // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request // Send out a negotiation request. While we could skip it and connect right with the websocket transport // it might return with additional information that could be useful. UriBuilder builder = new UriBuilder(this.Uri); builder.Path += "/negotiate"; var request = new HTTPRequest(builder.Uri, HTTPMethods.Post, OnNegotiationRequestFinished); if (this.AuthenticationProvider != null) this.AuthenticationProvider.PrepareRequest(request); request.Send(); } private void ConnectImpl() { HTTPManager.Logger.Verbose("HubConnection", "ConnectImpl"); switch (this.Options.PreferedTransport) { case TransportTypes.WebSocket: if (this.NegotiationResult != null && !IsTransportSupported("WebSockets")) { SetState(ConnectionStates.Closed, "The 'WebSockets' transport isn't supported by the server!"); return; } this.Transport = new Transports.WebSocketTransport(this); this.Transport.OnStateChanged += Transport_OnStateChanged; break; default: SetState(ConnectionStates.Closed, "Unsupportted transport: " + this.Options.PreferedTransport); break; } this.Transport.StartConnect(); } private bool IsTransportSupported(string transportName) { // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request // If the negotiation response contains only the url and accessToken, no 'availableTransports' list is sent if (this.NegotiationResult.SupportedTransports == null) return true; for (int i = 0; i < this.NegotiationResult.SupportedTransports.Count; ++i) if (this.NegotiationResult.SupportedTransports[i].Name == transportName) return true; return false; } private void OnNegotiationRequestFinished(HTTPRequest req, HTTPResponse resp) { if (this.State == ConnectionStates.CloseInitiated) { SetState(ConnectionStates.Closed); return; } string errorReason = null; switch (req.State) { // The request finished without any problem. case HTTPRequestStates.Finished: if (resp.IsSuccess) { HTTPManager.Logger.Information("HubConnection", "Negotiation Request Finished Successfully! Response: " + resp.DataAsText); // Parse negotiation this.NegotiationResult = NegotiationResult.Parse(resp.DataAsText, out errorReason); // TODO: check validity of the negotiation result: // If url and accessToken is present, the other two must be null. // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request if (string.IsNullOrEmpty(errorReason)) ConnectImpl(); } else // Internal server error? errorReason = string.Format("Negotiation Request Finished Successfully, but the server sent an error. Status Code: {0}-{1} Message: {2}", resp.StatusCode, resp.Message, resp.DataAsText); break; // The request finished with an unexpected error. The request's Exception property may contain more info about the error. case HTTPRequestStates.Error: errorReason = "Negotiation Request Finished with Error! " + (req.Exception != null ? (req.Exception.Message + "\n" + req.Exception.StackTrace) : "No Exception"); break; // The request aborted, initiated by the user. case HTTPRequestStates.Aborted: errorReason = "Negotiation Request Aborted!"; break; // Connecting to the server is timed out. case HTTPRequestStates.ConnectionTimedOut: errorReason = "Negotiation Request - Connection Timed Out!"; break; // The request didn't finished in the given time. case HTTPRequestStates.TimedOut: errorReason = "Negotiation Request - Processing the request Timed Out!"; break; } if (errorReason != null) SetState(ConnectionStates.Closed, errorReason); } public void StartClose() { HTTPManager.Logger.Verbose("HubConnection", "StartClose"); SetState(ConnectionStates.CloseInitiated); if (this.Transport != null) this.Transport.StartClose(); } public IFuture> Stream(string target, params object[] args) { var future = new Future>(); long id = InvokeImp(target, args, callback: (message) => { switch (message.type) { // StreamItem message contains only one item. case MessageTypes.StreamItem: { var container = future.value; if (container.IsCanceled) break; container.AddItem((TResult)this.Protocol.ConvertTo(typeof(TResult), message.item)); // (re)assign the container to raise OnItem event future.AssignItem(container); break; } case MessageTypes.Completion: { bool isSuccess = string.IsNullOrEmpty(message.error); if (isSuccess) { var container = future.value; // While completion message must not contain any result, this should be future-proof //if (!container.IsCanceled && message.Result != null) //{ // TResult[] results = (TResult[])this.Protocol.ConvertTo(typeof(TResult[]), message.Result); // // container.AddItems(results); //} future.Assign(container); } else future.Fail(new Exception(message.error)); break; } } }, isStreamingInvocation: true); future.BeginProcess(new StreamItemContainer(id)); return future; } public void CancelStream(StreamItemContainer container) { Message message = new Message { type = MessageTypes.CancelInvocation, invocationId = container.id.ToString() }; container.IsCanceled = true; SendMessage(message); } public IFuture Invoke(string target, params object[] args) { Future future = new Future(); InvokeImp(target, args, (message) => { bool isSuccess = string.IsNullOrEmpty(message.error); if (isSuccess) future.Assign((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result)); else future.Fail(new Exception(message.error)); }); return future; } public IFuture Send(string target, params object[] args) { Future future = new Future(); InvokeImp(target, args, (message) => { bool isSuccess = string.IsNullOrEmpty(message.error); if (isSuccess) future.Assign(true); else future.Fail(new Exception(message.error)); }); return future; } private long InvokeImp(string target, object[] args, Action callback, bool isStreamingInvocation = false) { if (this.State != ConnectionStates.Connected) throw new Exception("Not connected yet!"); long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId); var message = new Message { type = isStreamingInvocation ? MessageTypes.StreamInvocation : MessageTypes.Invocation, invocationId = invocationId.ToString(), target = target, arguments = args, nonblocking = callback == null, }; SendMessage(message); if (callback != null) this.invocations.Add(invocationId, callback); return invocationId; } private void SendMessage(Message message) { byte[] encoded = this.Protocol.EncodeMessage(message); this.Transport.Send(encoded); } public void On(string methodName, Action callback) { On(methodName, null, (args) => callback()); } public void On(string methodName, Action callback) { On(methodName, new Type[] { typeof(T1) }, (args) => callback((T1)args[0])); } public void On(string methodName, Action callback) { On(methodName, new Type[] { typeof(T1), typeof(T2) }, (args) => callback((T1)args[0], (T2)args[1])); } public void On(string methodName, Action callback) { On(methodName, new Type[] { typeof(T1), typeof(T2), typeof(T3) }, (args) => callback((T1)args[0], (T2)args[1], (T3)args[2])); } public void On(string methodName, Action callback) { On(methodName, new Type[] { typeof(T1), typeof(T2), typeof(T3), typeof(T4) }, (args) => callback((T1)args[0], (T2)args[1], (T3)args[2], (T4)args[3])); } public void On(string methodName, Type[] paramTypes, Action callback) { Subscription subscription = null; if (!this.subscriptions.TryGetValue(methodName, out subscription)) this.subscriptions.Add(methodName, subscription = new Subscription()); subscription.Add(paramTypes, callback); } internal void OnMessages(List messages) { for (int messageIdx = 0; messageIdx < messages.Count; ++messageIdx) { var message = messages[messageIdx]; try { if (this.OnMessage != null && !this.OnMessage(this, message)) return; } catch (Exception ex) { HTTPManager.Logger.Exception("HubConnection", "Exception in OnMessage user code!", ex); } switch (message.type) { case MessageTypes.Invocation: { Subscription subscribtion = null; if (this.subscriptions.TryGetValue(message.target, out subscribtion)) { for (int i = 0; i < subscribtion.callbacks.Count; ++i) { var callbackDesc = subscribtion.callbacks[i]; object[] realArgs = null; try { realArgs = this.Protocol.GetRealArguments(callbackDesc.ParamTypes, message.arguments); } catch (Exception ex) { HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - GetRealArguments", ex); } try { callbackDesc.Callback.Invoke(realArgs); } catch (Exception ex) { HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - Invoke", ex); } } } break; } case MessageTypes.StreamItem: { long invocationId; if (long.TryParse(message.invocationId, out invocationId)) { Action callback; if (this.invocations.TryGetValue(invocationId, out callback) && callback != null) { try { callback(message); } catch (Exception ex) { HTTPManager.Logger.Exception("HubConnection", "OnMessages - StreamItem - Callback", ex); } } } break; } case MessageTypes.Completion: { long invocationId; if (long.TryParse(message.invocationId, out invocationId)) { Action callback; if (this.invocations.TryGetValue(invocationId, out callback) && callback != null) { try { callback(message); } catch (Exception ex) { HTTPManager.Logger.Exception("HubConnection", "OnMessages - Completion - Callback", ex); } } this.invocations.Remove(invocationId); } break; } case MessageTypes.Close: SetState(ConnectionStates.Closed, message.error); break; } } } private void Transport_OnStateChanged(TransportStates oldState, TransportStates newState) { HTTPManager.Logger.Verbose("HubConnection", string.Format("Transport_OnStateChanged - oldState: {0} newState: {1}", oldState.ToString(), newState.ToString())); switch (newState) { case TransportStates.Connected: SetState(ConnectionStates.Connected); break; case TransportStates.Failed: SetState(ConnectionStates.Closed, this.Transport.ErrorReason); break; case TransportStates.Closed: SetState(ConnectionStates.Closed); break; } } private void SetState(ConnectionStates state, string errorReason = null) { HTTPManager.Logger.Information("HubConnection", "SetState - from State: " + this.State.ToString() + " to State: " + state.ToString() + " errorReason: " + errorReason ?? string.Empty); if (this.State == state) return; this.State = state; switch (state) { case ConnectionStates.Initial: case ConnectionStates.Authenticating: case ConnectionStates.Negotiating: case ConnectionStates.CloseInitiated: break; case ConnectionStates.Connected: try { if (this.OnConnected != null) this.OnConnected(this); } catch(Exception ex) { HTTPManager.Logger.Exception("HubConnection", "Exception in OnConnected user code!", ex); } break; case ConnectionStates.Closed: if (string.IsNullOrEmpty(errorReason)) { if (this.OnClosed != null) { try { this.OnClosed(this); } catch(Exception ex) { HTTPManager.Logger.Exception("HubConnection", "Exception in OnClosed user code!", ex); } } } else { if (this.OnError != null) { try { this.OnError(this, errorReason); } catch(Exception ex) { HTTPManager.Logger.Exception("HubConnection", "Exception in OnError user code!", ex); } } } break; } } } } #endif