123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- #if !BESTHTTP_DISABLE_SIGNALR_CORE && !BESTHTTP_DISABLE_WEBSOCKET
- using BestHTTP.Futures;
- using BestHTTP.SignalRCore.Messages;
- using System;
- using System.Collections.Generic;
- namespace BestHTTP.SignalRCore
- {
- public sealed class HubOptions
- {
- /// <summary>
- /// When this is set to true, the plugin will skip the negotiation request if the PreferedTransport is
- /// WebSocket.
- /// </summary>
- public bool SkipNegotiation { get; set; }
- /// <summary>
- /// The preferred transport to choose when more than one available.
- /// </summary>
- public TransportTypes PreferedTransport { get; set; }
- }
- public sealed class HubConnection
- {
- public static readonly object[] EmptyArgs = new object[0];
- /// <summary>
- /// Uri of the Hub endpoint
- /// </summary>
- public Uri Uri { get; private set; }
- /// <summary>
- /// Current state of this connection.
- /// </summary>
- public ConnectionStates State { get; private set; }
- /// <summary>
- /// Current, active ITransport instance.
- /// </summary>
- public ITransport Transport { get; private set; }
- /// <summary>
- /// The IProtocol implementation that will parse, encode and decode messages.
- /// </summary>
- public IProtocol Protocol { get; private set; }
- /// <summary>
- /// This event is called when successfully connected to the hub.
- /// </summary>
- public event Action<HubConnection> OnConnected;
- /// <summary>
- /// This event is called when an unexpected error happen and the connection is closed.
- /// </summary>
- public event Action<HubConnection, string> OnError;
- /// <summary>
- /// This event is called when the connection is gracefully terminated.
- /// </summary>
- public event Action<HubConnection> OnClosed;
- /// <summary>
- /// This event is called for every server-sent message. When returns false, no further processing of the message is done
- /// by the plugin.
- /// </summary>
- public event Func<HubConnection, Message, bool> OnMessage;
- /// <summary>
- /// An IAuthenticationProvider implementation that will be used to authenticate the connection.
- /// </summary>
- public IAuthenticationProvider AuthenticationProvider { get; set; }
- /// <summary>
- /// Negotiation response sent by the server.
- /// </summary>
- public NegotiationResult NegotiationResult { get; private set; }
- /// <summary>
- ///
- /// </summary>
- public HubOptions Options { get; private set; }
- /// <summary>
- /// This will be increment to add a unique id to every message the plugin will send.
- /// </summary>
- private long lastInvocationId = 0;
- /// <summary>
- /// Store the callback for all sent message that expect a return value from the server. All sent message has
- /// a unique invocationId that will be sent back from the server.
- /// </summary>
- private Dictionary<long, Action<Message>> invocations = new Dictionary<long, Action<Message>>();
- /// <summary>
- /// This is where we store the methodname => callback mapping.
- /// </summary>
- private Dictionary<string, Subscription> subscriptions = new Dictionary<string, Subscription>(StringComparer.OrdinalIgnoreCase);
- public HubConnection(Uri hubUri, IProtocol protocol)
- : this(hubUri, protocol, new HubOptions())
- {
- }
- public HubConnection(Uri hubUri, IProtocol protocol, HubOptions options)
- {
- this.Uri = hubUri;
- this.State = ConnectionStates.Initial;
- this.Options = options;
- this.Protocol = protocol;
- this.Protocol.Connection = this;
- }
- public void StartConnect()
- {
- if (this.State != ConnectionStates.Initial)
- return;
- HTTPManager.Logger.Verbose("HubConnection", "StartConnect");
- if (this.AuthenticationProvider != null && this.AuthenticationProvider.IsPreAuthRequired)
- {
- HTTPManager.Logger.Information("HubConnection", "StartConnect - Authenticating");
- SetState(ConnectionStates.Authenticating);
- this.AuthenticationProvider.OnAuthenticationSucceded += OnAuthenticationSucceded;
- this.AuthenticationProvider.OnAuthenticationFailed += OnAuthenticationFailed;
- // Start the authentication process
- this.AuthenticationProvider.StartAuthentication();
- }
- else
- StartNegotiation();
- }
- private void OnAuthenticationSucceded(IAuthenticationProvider provider)
- {
- HTTPManager.Logger.Verbose("HubConnection", "OnAuthenticationSucceded");
- this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
- this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
- StartNegotiation();
- }
- private void OnAuthenticationFailed(IAuthenticationProvider provider, string reason)
- {
- HTTPManager.Logger.Error("HubConnection", "OnAuthenticationFailed: " + reason);
- this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
- this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
- SetState(ConnectionStates.Closed, reason);
- }
- private void StartNegotiation()
- {
- HTTPManager.Logger.Verbose("HubConnection", "StartNegotiation");
- if (this.Options.SkipNegotiation)
- {
- HTTPManager.Logger.Verbose("HubConnection", "Skipping negotiation");
- ConnectImpl();
- return;
- }
- if (this.State == ConnectionStates.CloseInitiated)
- {
- SetState(ConnectionStates.Closed);
- return;
- }
- SetState(ConnectionStates.Negotiating);
- // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
- // Send out a negotiation request. While we could skip it and connect right with the websocket transport
- // it might return with additional information that could be useful.
- UriBuilder builder = new UriBuilder(this.Uri);
- builder.Path += "/negotiate";
- var request = new HTTPRequest(builder.Uri, HTTPMethods.Post, OnNegotiationRequestFinished);
- if (this.AuthenticationProvider != null)
- this.AuthenticationProvider.PrepareRequest(request);
- request.Send();
- }
-
- private void ConnectImpl()
- {
- HTTPManager.Logger.Verbose("HubConnection", "ConnectImpl");
- switch (this.Options.PreferedTransport)
- {
- case TransportTypes.WebSocket:
- if (this.NegotiationResult != null && !IsTransportSupported("WebSockets"))
- {
- SetState(ConnectionStates.Closed, "The 'WebSockets' transport isn't supported by the server!");
- return;
- }
- this.Transport = new Transports.WebSocketTransport(this);
- this.Transport.OnStateChanged += Transport_OnStateChanged;
- break;
- default:
- SetState(ConnectionStates.Closed, "Unsupportted transport: " + this.Options.PreferedTransport);
- break;
- }
- this.Transport.StartConnect();
- }
- private bool IsTransportSupported(string transportName)
- {
- // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
- // If the negotiation response contains only the url and accessToken, no 'availableTransports' list is sent
- if (this.NegotiationResult.SupportedTransports == null)
- return true;
- for (int i = 0; i < this.NegotiationResult.SupportedTransports.Count; ++i)
- if (this.NegotiationResult.SupportedTransports[i].Name == transportName)
- return true;
- return false;
- }
- private void OnNegotiationRequestFinished(HTTPRequest req, HTTPResponse resp)
- {
- if (this.State == ConnectionStates.CloseInitiated)
- {
- SetState(ConnectionStates.Closed);
- return;
- }
- string errorReason = null;
- switch (req.State)
- {
- // The request finished without any problem.
- case HTTPRequestStates.Finished:
- if (resp.IsSuccess)
- {
- HTTPManager.Logger.Information("HubConnection", "Negotiation Request Finished Successfully! Response: " + resp.DataAsText);
- // Parse negotiation
- this.NegotiationResult = NegotiationResult.Parse(resp.DataAsText, out errorReason);
- // TODO: check validity of the negotiation result:
- // If url and accessToken is present, the other two must be null.
- // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
- if (string.IsNullOrEmpty(errorReason))
- ConnectImpl();
- }
- else // Internal server error?
- errorReason = string.Format("Negotiation Request Finished Successfully, but the server sent an error. Status Code: {0}-{1} Message: {2}",
- resp.StatusCode,
- resp.Message,
- resp.DataAsText);
- break;
- // The request finished with an unexpected error. The request's Exception property may contain more info about the error.
- case HTTPRequestStates.Error:
- errorReason = "Negotiation Request Finished with Error! " + (req.Exception != null ? (req.Exception.Message + "\n" + req.Exception.StackTrace) : "No Exception");
- break;
- // The request aborted, initiated by the user.
- case HTTPRequestStates.Aborted:
- errorReason = "Negotiation Request Aborted!";
- break;
- // Connecting to the server is timed out.
- case HTTPRequestStates.ConnectionTimedOut:
- errorReason = "Negotiation Request - Connection Timed Out!";
- break;
- // The request didn't finished in the given time.
- case HTTPRequestStates.TimedOut:
- errorReason = "Negotiation Request - Processing the request Timed Out!";
- break;
- }
- if (errorReason != null)
- SetState(ConnectionStates.Closed, errorReason);
- }
- public void StartClose()
- {
- HTTPManager.Logger.Verbose("HubConnection", "StartClose");
- SetState(ConnectionStates.CloseInitiated);
- if (this.Transport != null)
- this.Transport.StartClose();
- }
- public IFuture<StreamItemContainer<TResult>> Stream<TResult>(string target, params object[] args)
- {
- var future = new Future<StreamItemContainer<TResult>>();
-
- long id = InvokeImp(target,
- args,
- callback: (message) =>
- {
- switch (message.type)
- {
- // StreamItem message contains only one item.
- case MessageTypes.StreamItem:
- {
- var container = future.value;
- if (container.IsCanceled)
- break;
- container.AddItem((TResult)this.Protocol.ConvertTo(typeof(TResult), message.item));
- // (re)assign the container to raise OnItem event
- future.AssignItem(container);
- break;
- }
- case MessageTypes.Completion:
- {
- bool isSuccess = string.IsNullOrEmpty(message.error);
- if (isSuccess)
- {
- var container = future.value;
- // While completion message must not contain any result, this should be future-proof
- //if (!container.IsCanceled && message.Result != null)
- //{
- // TResult[] results = (TResult[])this.Protocol.ConvertTo(typeof(TResult[]), message.Result);
- //
- // container.AddItems(results);
- //}
- future.Assign(container);
- }
- else
- future.Fail(new Exception(message.error));
- break;
- }
- }
- },
- isStreamingInvocation: true);
- future.BeginProcess(new StreamItemContainer<TResult>(id));
- return future;
- }
- public void CancelStream<T>(StreamItemContainer<T> container)
- {
- Message message = new Message {
- type = MessageTypes.CancelInvocation,
- invocationId = container.id.ToString()
- };
- container.IsCanceled = true;
- SendMessage(message);
- }
- public IFuture<TResult> Invoke<TResult>(string target, params object[] args)
- {
- Future<TResult> future = new Future<TResult>();
- InvokeImp(target,
- args,
- (message) =>
- {
- bool isSuccess = string.IsNullOrEmpty(message.error);
- if (isSuccess)
- future.Assign((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result));
- else
- future.Fail(new Exception(message.error));
- });
- return future;
- }
- public IFuture<bool> Send(string target, params object[] args)
- {
- Future<bool> future = new Future<bool>();
- InvokeImp(target,
- args,
- (message) =>
- {
- bool isSuccess = string.IsNullOrEmpty(message.error);
- if (isSuccess)
- future.Assign(true);
- else
- future.Fail(new Exception(message.error));
- });
- return future;
- }
- private long InvokeImp(string target, object[] args, Action<Message> callback, bool isStreamingInvocation = false)
- {
- if (this.State != ConnectionStates.Connected)
- throw new Exception("Not connected yet!");
- long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId);
- var message = new Message
- {
- type = isStreamingInvocation ? MessageTypes.StreamInvocation : MessageTypes.Invocation,
- invocationId = invocationId.ToString(),
- target = target,
- arguments = args,
- nonblocking = callback == null,
- };
- SendMessage(message);
- if (callback != null)
- this.invocations.Add(invocationId, callback);
- return invocationId;
- }
- private void SendMessage(Message message)
- {
- byte[] encoded = this.Protocol.EncodeMessage(message);
- this.Transport.Send(encoded);
- }
- public void On(string methodName, Action callback)
- {
- On(methodName, null, (args) => callback());
- }
- public void On<T1>(string methodName, Action<T1> callback)
- {
- On(methodName, new Type[] { typeof(T1) }, (args) => callback((T1)args[0]));
- }
- public void On<T1, T2>(string methodName, Action<T1, T2> callback)
- {
- On(methodName,
- new Type[] { typeof(T1), typeof(T2) },
- (args) => callback((T1)args[0], (T2)args[1]));
- }
- public void On<T1, T2, T3>(string methodName, Action<T1, T2, T3> callback)
- {
- On(methodName,
- new Type[] { typeof(T1), typeof(T2), typeof(T3) },
- (args) => callback((T1)args[0], (T2)args[1], (T3)args[2]));
- }
- public void On<T1, T2, T3, T4>(string methodName, Action<T1, T2, T3, T4> callback)
- {
- On(methodName,
- new Type[] { typeof(T1), typeof(T2), typeof(T3), typeof(T4) },
- (args) => callback((T1)args[0], (T2)args[1], (T3)args[2], (T4)args[3]));
- }
- public void On(string methodName, Type[] paramTypes, Action<object[]> callback)
- {
- Subscription subscription = null;
- if (!this.subscriptions.TryGetValue(methodName, out subscription))
- this.subscriptions.Add(methodName, subscription = new Subscription());
- subscription.Add(paramTypes, callback);
- }
- internal void OnMessages(List<Message> messages)
- {
- for (int messageIdx = 0; messageIdx < messages.Count; ++messageIdx)
- {
- var message = messages[messageIdx];
- try
- {
- if (this.OnMessage != null && !this.OnMessage(this, message))
- return;
- }
- catch (Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "Exception in OnMessage user code!", ex);
- }
- switch (message.type)
- {
- case MessageTypes.Invocation:
- {
- Subscription subscribtion = null;
- if (this.subscriptions.TryGetValue(message.target, out subscribtion))
- {
- for (int i = 0; i < subscribtion.callbacks.Count; ++i)
- {
- var callbackDesc = subscribtion.callbacks[i];
- object[] realArgs = null;
- try
- {
- realArgs = this.Protocol.GetRealArguments(callbackDesc.ParamTypes, message.arguments);
- }
- catch (Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - GetRealArguments", ex);
- }
- try
- {
- callbackDesc.Callback.Invoke(realArgs);
- }
- catch (Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - Invoke", ex);
- }
- }
- }
- break;
- }
- case MessageTypes.StreamItem:
- {
- long invocationId;
- if (long.TryParse(message.invocationId, out invocationId))
- {
- Action<Message> callback;
- if (this.invocations.TryGetValue(invocationId, out callback) && callback != null)
- {
- try
- {
- callback(message);
- }
- catch (Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "OnMessages - StreamItem - callback", ex);
- }
- }
- }
- break;
- }
- case MessageTypes.Completion:
- {
- long invocationId;
- if (long.TryParse(message.invocationId, out invocationId))
- {
- Action<Message> callback;
- if (this.invocations.TryGetValue(invocationId, out callback) && callback != null)
- {
- try
- {
- callback(message);
- }
- catch (Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "OnMessages - Completion - callback", ex);
- }
- }
- this.invocations.Remove(invocationId);
- }
- break;
- }
- case MessageTypes.Close:
- SetState(ConnectionStates.Closed, message.error);
- break;
- }
- }
- }
- private void Transport_OnStateChanged(TransportStates oldState, TransportStates newState)
- {
- HTTPManager.Logger.Verbose("HubConnection", string.Format("Transport_OnStateChanged - oldState: {0} newState: {1}", oldState.ToString(), newState.ToString()));
- switch (newState)
- {
- case TransportStates.Connected:
- SetState(ConnectionStates.Connected);
- break;
- case TransportStates.Failed:
- SetState(ConnectionStates.Closed, this.Transport.ErrorReason);
- break;
- case TransportStates.Closed:
- SetState(ConnectionStates.Closed);
- break;
- }
- }
- private void SetState(ConnectionStates state, string errorReason = null)
- {
- HTTPManager.Logger.Information("HubConnection", "SetState - from State: " + this.State.ToString() + " to State: " + state.ToString() + " errorReason: " + errorReason ?? string.Empty);
- if (this.State == state)
- return;
- this.State = state;
- switch (state)
- {
- case ConnectionStates.Initial:
- case ConnectionStates.Authenticating:
- case ConnectionStates.Negotiating:
- case ConnectionStates.CloseInitiated:
- break;
- case ConnectionStates.Connected:
- try
- {
- if (this.OnConnected != null)
- this.OnConnected(this);
- }
- catch(Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "Exception in OnConnected user code!", ex);
- }
- break;
- case ConnectionStates.Closed:
- if (string.IsNullOrEmpty(errorReason))
- {
- if (this.OnClosed != null)
- {
- try
- {
- this.OnClosed(this);
- }
- catch(Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "Exception in OnClosed user code!", ex);
- }
- }
- }
- else
- {
- if (this.OnError != null)
- {
- try
- {
- this.OnError(this, errorReason);
- }
- catch(Exception ex)
- {
- HTTPManager.Logger.Exception("HubConnection", "Exception in OnError user code!", ex);
- }
- }
- }
- break;
- }
- }
- }
- }
- #endif
|