HubConnection.cs 26 KB


  1. #if !BESTHTTP_DISABLE_SIGNALR_CORE && !BESTHTTP_DISABLE_WEBSOCKET
  2. using BestHTTP.Futures;
  3. using BestHTTP.SignalRCore.Messages;
  4. using System;
  5. using System.Collections.Generic;
  6. namespace BestHTTP.SignalRCore
  7. {
  8. public sealed class HubOptions
  9. {
  10. /// <summary>
  11. /// When this is set to true, the plugin will skip the negotiation request if the PreferedTransport is
  12. /// WebSocket.
  13. /// </summary>
  14. public bool SkipNegotiation { get; set; }
  15. /// <summary>
  16. /// The preferred transport to choose when more than one available.
  17. /// </summary>
  18. public TransportTypes PreferedTransport { get; set; }
  19. }
  20. public sealed class HubConnection
  21. {
  22. public static readonly object[] EmptyArgs = new object[0];
  23. /// <summary>
  24. /// Uri of the Hub endpoint
  25. /// </summary>
  26. public Uri Uri { get; private set; }
  27. /// <summary>
  28. /// Current state of this connection.
  29. /// </summary>
  30. public ConnectionStates State { get; private set; }
  31. /// <summary>
  32. /// Current, active ITransport instance.
  33. /// </summary>
  34. public ITransport Transport { get; private set; }
  35. /// <summary>
  36. /// The IProtocol implementation that will parse, encode and decode messages.
  37. /// </summary>
  38. public IProtocol Protocol { get; private set; }
  39. /// <summary>
  40. /// This event is called when successfully connected to the hub.
  41. /// </summary>
  42. public event Action<HubConnection> OnConnected;
  43. /// <summary>
  44. /// This event is called when an unexpected error happen and the connection is closed.
  45. /// </summary>
  46. public event Action<HubConnection, string> OnError;
  47. /// <summary>
  48. /// This event is called when the connection is gracefully terminated.
  49. /// </summary>
  50. public event Action<HubConnection> OnClosed;
  51. /// <summary>
  52. /// This event is called for every server-sent message. When returns false, no further processing of the message is done
  53. /// by the plugin.
  54. /// </summary>
  55. public event Func<HubConnection, Message, bool> OnMessage;
  56. /// <summary>
  57. /// An IAuthenticationProvider implementation that will be used to authenticate the connection.
  58. /// </summary>
  59. public IAuthenticationProvider AuthenticationProvider { get; set; }
  60. /// <summary>
  61. /// Negotiation response sent by the server.
  62. /// </summary>
  63. public NegotiationResult NegotiationResult { get; private set; }
  64. /// <summary>
  65. ///
  66. /// </summary>
  67. public HubOptions Options { get; private set; }
  68. /// <summary>
  69. /// This will be increment to add a unique id to every message the plugin will send.
  70. /// </summary>
  71. private long lastInvocationId = 0;
  72. /// <summary>
  73. /// Store the Callback for all sent message that expect a return value from the server. All sent message has
  74. /// a unique invocationId that will be sent back from the server.
  75. /// </summary>
  76. private Dictionary<long, Action<Message>> invocations = new Dictionary<long, Action<Message>>();
  77. /// <summary>
  78. /// This is where we store the methodname => Callback mapping.
  79. /// </summary>
  80. private Dictionary<string, Subscription> subscriptions = new Dictionary<string, Subscription>(StringComparer.OrdinalIgnoreCase);
  81. public HubConnection(Uri hubUri, IProtocol protocol)
  82. : this(hubUri, protocol, new HubOptions())
  83. {
  84. }
  85. public HubConnection(Uri hubUri, IProtocol protocol, HubOptions options)
  86. {
  87. this.Uri = hubUri;
  88. this.State = ConnectionStates.Initial;
  89. this.Options = options;
  90. this.Protocol = protocol;
  91. this.Protocol.Connection = this;
  92. }
  93. public void StartConnect()
  94. {
  95. if (this.State != ConnectionStates.Initial)
  96. return;
  97. HTTPManager.Logger.Verbose("HubConnection", "StartConnect");
  98. if (this.AuthenticationProvider != null && this.AuthenticationProvider.IsPreAuthRequired)
  99. {
  100. HTTPManager.Logger.Information("HubConnection", "StartConnect - Authenticating");
  101. SetState(ConnectionStates.Authenticating);
  102. this.AuthenticationProvider.OnAuthenticationSucceded += OnAuthenticationSucceded;
  103. this.AuthenticationProvider.OnAuthenticationFailed += OnAuthenticationFailed;
  104. // Start the authentication process
  105. this.AuthenticationProvider.StartAuthentication();
  106. }
  107. else
  108. StartNegotiation();
  109. }
  110. private void OnAuthenticationSucceded(IAuthenticationProvider provider)
  111. {
  112. HTTPManager.Logger.Verbose("HubConnection", "OnAuthenticationSucceded");
  113. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  114. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  115. StartNegotiation();
  116. }
  117. private void OnAuthenticationFailed(IAuthenticationProvider provider, string reason)
  118. {
  119. HTTPManager.Logger.Error("HubConnection", "OnAuthenticationFailed: " + reason);
  120. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  121. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  122. SetState(ConnectionStates.Closed, reason);
  123. }
  124. private void StartNegotiation()
  125. {
  126. HTTPManager.Logger.Verbose("HubConnection", "StartNegotiation");
  127. if (this.Options.SkipNegotiation)
  128. {
  129. HTTPManager.Logger.Verbose("HubConnection", "Skipping negotiation");
  130. ConnectImpl();
  131. return;
  132. }
  133. if (this.State == ConnectionStates.CloseInitiated)
  134. {
  135. SetState(ConnectionStates.Closed);
  136. return;
  137. }
  138. SetState(ConnectionStates.Negotiating);
  139. // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  140. // Send out a negotiation request. While we could skip it and connect right with the websocket transport
  141. // it might return with additional information that could be useful.
  142. UriBuilder builder = new UriBuilder(this.Uri);
  143. builder.Path += "/negotiate";
  144. var request = new HTTPRequest(builder.Uri, HTTPMethods.Post, OnNegotiationRequestFinished);
  145. if (this.AuthenticationProvider != null)
  146. this.AuthenticationProvider.PrepareRequest(request);
  147. request.Send();
  148. }
  149. private void ConnectImpl()
  150. {
  151. HTTPManager.Logger.Verbose("HubConnection", "ConnectImpl");
  152. switch (this.Options.PreferedTransport)
  153. {
  154. case TransportTypes.WebSocket:
  155. if (this.NegotiationResult != null && !IsTransportSupported("WebSockets"))
  156. {
  157. SetState(ConnectionStates.Closed, "The 'WebSockets' transport isn't supported by the server!");
  158. return;
  159. }
  160. this.Transport = new Transports.WebSocketTransport(this);
  161. this.Transport.OnStateChanged += Transport_OnStateChanged;
  162. break;
  163. default:
  164. SetState(ConnectionStates.Closed, "Unsupportted transport: " + this.Options.PreferedTransport);
  165. break;
  166. }
  167. this.Transport.StartConnect();
  168. }
  169. private bool IsTransportSupported(string transportName)
  170. {
  171. // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  172. // If the negotiation response contains only the url and accessToken, no 'availableTransports' list is sent
  173. if (this.NegotiationResult.SupportedTransports == null)
  174. return true;
  175. for (int i = 0; i < this.NegotiationResult.SupportedTransports.Count; ++i)
  176. if (this.NegotiationResult.SupportedTransports[i].Name == transportName)
  177. return true;
  178. return false;
  179. }
  180. private void OnNegotiationRequestFinished(HTTPRequest req, HTTPResponse resp)
  181. {
  182. if (this.State == ConnectionStates.CloseInitiated)
  183. {
  184. SetState(ConnectionStates.Closed);
  185. return;
  186. }
  187. string errorReason = null;
  188. switch (req.State)
  189. {
  190. // The request finished without any problem.
  191. case HTTPRequestStates.Finished:
  192. if (resp.IsSuccess)
  193. {
  194. HTTPManager.Logger.Information("HubConnection", "Negotiation Request Finished Successfully! Response: " + resp.DataAsText);
  195. // Parse negotiation
  196. this.NegotiationResult = NegotiationResult.Parse(resp.DataAsText, out errorReason);
  197. // TODO: check validity of the negotiation result:
  198. // If url and accessToken is present, the other two must be null.
  199. // https://github.com/aspnet/SignalR/blob/dev/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  200. if (string.IsNullOrEmpty(errorReason))
  201. ConnectImpl();
  202. }
  203. else // Internal server error?
  204. errorReason = string.Format("Negotiation Request Finished Successfully, but the server sent an error. Status Code: {0}-{1} Message: {2}",
  205. resp.StatusCode,
  206. resp.Message,
  207. resp.DataAsText);
  208. break;
  209. // The request finished with an unexpected error. The request's Exception property may contain more info about the error.
  210. case HTTPRequestStates.Error:
  211. errorReason = "Negotiation Request Finished with Error! " + (req.Exception != null ? (req.Exception.Message + "\n" + req.Exception.StackTrace) : "No Exception");
  212. break;
  213. // The request aborted, initiated by the user.
  214. case HTTPRequestStates.Aborted:
  215. errorReason = "Negotiation Request Aborted!";
  216. break;
  217. // Connecting to the server is timed out.
  218. case HTTPRequestStates.ConnectionTimedOut:
  219. errorReason = "Negotiation Request - Connection Timed Out!";
  220. break;
  221. // The request didn't finished in the given time.
  222. case HTTPRequestStates.TimedOut:
  223. errorReason = "Negotiation Request - Processing the request Timed Out!";
  224. break;
  225. }
  226. if (errorReason != null)
  227. SetState(ConnectionStates.Closed, errorReason);
  228. }
  229. public void StartClose()
  230. {
  231. HTTPManager.Logger.Verbose("HubConnection", "StartClose");
  232. SetState(ConnectionStates.CloseInitiated);
  233. if (this.Transport != null)
  234. this.Transport.StartClose();
  235. }
  236. public IFuture<StreamItemContainer<TResult>> Stream<TResult>(string target, params object[] args)
  237. {
  238. var future = new Future<StreamItemContainer<TResult>>();
  239. long id = InvokeImp(target,
  240. args,
  241. callback: (message) =>
  242. {
  243. switch (message.type)
  244. {
  245. // StreamItem message contains only one item.
  246. case MessageTypes.StreamItem:
  247. {
  248. var container = future.value;
  249. if (container.IsCanceled)
  250. break;
  251. container.AddItem((TResult)this.Protocol.ConvertTo(typeof(TResult), message.item));
  252. // (re)assign the container to raise OnItem event
  253. future.AssignItem(container);
  254. break;
  255. }
  256. case MessageTypes.Completion:
  257. {
  258. bool isSuccess = string.IsNullOrEmpty(message.error);
  259. if (isSuccess)
  260. {
  261. var container = future.value;
  262. // While completion message must not contain any result, this should be future-proof
  263. //if (!container.IsCanceled && message.Result != null)
  264. //{
  265. // TResult[] results = (TResult[])this.Protocol.ConvertTo(typeof(TResult[]), message.Result);
  266. //
  267. // container.AddItems(results);
  268. //}
  269. future.Assign(container);
  270. }
  271. else
  272. future.Fail(new Exception(message.error));
  273. break;
  274. }
  275. }
  276. },
  277. isStreamingInvocation: true);
  278. future.BeginProcess(new StreamItemContainer<TResult>(id));
  279. return future;
  280. }
  281. public void CancelStream<T>(StreamItemContainer<T> container)
  282. {
  283. Message message = new Message {
  284. type = MessageTypes.CancelInvocation,
  285. invocationId = container.id.ToString()
  286. };
  287. container.IsCanceled = true;
  288. SendMessage(message);
  289. }
  290. public IFuture<TResult> Invoke<TResult>(string target, params object[] args)
  291. {
  292. Future<TResult> future = new Future<TResult>();
  293. InvokeImp(target,
  294. args,
  295. (message) =>
  296. {
  297. bool isSuccess = string.IsNullOrEmpty(message.error);
  298. if (isSuccess)
  299. future.Assign((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result));
  300. else
  301. future.Fail(new Exception(message.error));
  302. });
  303. return future;
  304. }
  305. public IFuture<bool> Send(string target, params object[] args)
  306. {
  307. Future<bool> future = new Future<bool>();
  308. InvokeImp(target,
  309. args,
  310. (message) =>
  311. {
  312. bool isSuccess = string.IsNullOrEmpty(message.error);
  313. if (isSuccess)
  314. future.Assign(true);
  315. else
  316. future.Fail(new Exception(message.error));
  317. });
  318. return future;
  319. }
  320. private long InvokeImp(string target, object[] args, Action<Message> callback, bool isStreamingInvocation = false)
  321. {
  322. if (this.State != ConnectionStates.Connected)
  323. throw new Exception("Not connected yet!");
  324. long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId);
  325. var message = new Message
  326. {
  327. type = isStreamingInvocation ? MessageTypes.StreamInvocation : MessageTypes.Invocation,
  328. invocationId = invocationId.ToString(),
  329. target = target,
  330. arguments = args,
  331. nonblocking = callback == null,
  332. };
  333. SendMessage(message);
  334. if (callback != null)
  335. this.invocations.Add(invocationId, callback);
  336. return invocationId;
  337. }
  338. private void SendMessage(Message message)
  339. {
  340. byte[] encoded = this.Protocol.EncodeMessage(message);
  341. this.Transport.Send(encoded);
  342. }
  343. public void On(string methodName, Action callback)
  344. {
  345. On(methodName, null, (args) => callback());
  346. }
  347. public void On<T1>(string methodName, Action<T1> callback)
  348. {
  349. On(methodName, new Type[] { typeof(T1) }, (args) => callback((T1)args[0]));
  350. }
  351. public void On<T1, T2>(string methodName, Action<T1, T2> callback)
  352. {
  353. On(methodName,
  354. new Type[] { typeof(T1), typeof(T2) },
  355. (args) => callback((T1)args[0], (T2)args[1]));
  356. }
  357. public void On<T1, T2, T3>(string methodName, Action<T1, T2, T3> callback)
  358. {
  359. On(methodName,
  360. new Type[] { typeof(T1), typeof(T2), typeof(T3) },
  361. (args) => callback((T1)args[0], (T2)args[1], (T3)args[2]));
  362. }
  363. public void On<T1, T2, T3, T4>(string methodName, Action<T1, T2, T3, T4> callback)
  364. {
  365. On(methodName,
  366. new Type[] { typeof(T1), typeof(T2), typeof(T3), typeof(T4) },
  367. (args) => callback((T1)args[0], (T2)args[1], (T3)args[2], (T4)args[3]));
  368. }
  369. public void On(string methodName, Type[] paramTypes, Action<object[]> callback)
  370. {
  371. Subscription subscription = null;
  372. if (!this.subscriptions.TryGetValue(methodName, out subscription))
  373. this.subscriptions.Add(methodName, subscription = new Subscription());
  374. subscription.Add(paramTypes, callback);
  375. }
  376. internal void OnMessages(List<Message> messages)
  377. {
  378. for (int messageIdx = 0; messageIdx < messages.Count; ++messageIdx)
  379. {
  380. var message = messages[messageIdx];
  381. try
  382. {
  383. if (this.OnMessage != null && !this.OnMessage(this, message))
  384. return;
  385. }
  386. catch (Exception ex)
  387. {
  388. HTTPManager.Logger.Exception("HubConnection", "Exception in OnMessage user code!", ex);
  389. }
  390. switch (message.type)
  391. {
  392. case MessageTypes.Invocation:
  393. {
  394. Subscription subscribtion = null;
  395. if (this.subscriptions.TryGetValue(message.target, out subscribtion))
  396. {
  397. for (int i = 0; i < subscribtion.callbacks.Count; ++i)
  398. {
  399. var callbackDesc = subscribtion.callbacks[i];
  400. object[] realArgs = null;
  401. try
  402. {
  403. realArgs = this.Protocol.GetRealArguments(callbackDesc.ParamTypes, message.arguments);
  404. }
  405. catch (Exception ex)
  406. {
  407. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - GetRealArguments", ex);
  408. }
  409. try
  410. {
  411. callbackDesc.Callback.Invoke(realArgs);
  412. }
  413. catch (Exception ex)
  414. {
  415. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - Invoke", ex);
  416. }
  417. }
  418. }
  419. break;
  420. }
  421. case MessageTypes.StreamItem:
  422. {
  423. long invocationId;
  424. if (long.TryParse(message.invocationId, out invocationId))
  425. {
  426. Action<Message> callback;
  427. if (this.invocations.TryGetValue(invocationId, out callback) && callback != null)
  428. {
  429. try
  430. {
  431. callback(message);
  432. }
  433. catch (Exception ex)
  434. {
  435. HTTPManager.Logger.Exception("HubConnection", "OnMessages - StreamItem - Callback", ex);
  436. }
  437. }
  438. }
  439. break;
  440. }
  441. case MessageTypes.Completion:
  442. {
  443. long invocationId;
  444. if (long.TryParse(message.invocationId, out invocationId))
  445. {
  446. Action<Message> callback;
  447. if (this.invocations.TryGetValue(invocationId, out callback) && callback != null)
  448. {
  449. try
  450. {
  451. callback(message);
  452. }
  453. catch (Exception ex)
  454. {
  455. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Completion - Callback", ex);
  456. }
  457. }
  458. this.invocations.Remove(invocationId);
  459. }
  460. break;
  461. }
  462. case MessageTypes.Close:
  463. SetState(ConnectionStates.Closed, message.error);
  464. break;
  465. }
  466. }
  467. }
  468. private void Transport_OnStateChanged(TransportStates oldState, TransportStates newState)
  469. {
  470. HTTPManager.Logger.Verbose("HubConnection", string.Format("Transport_OnStateChanged - oldState: {0} newState: {1}", oldState.ToString(), newState.ToString()));
  471. switch (newState)
  472. {
  473. case TransportStates.Connected:
  474. SetState(ConnectionStates.Connected);
  475. break;
  476. case TransportStates.Failed:
  477. SetState(ConnectionStates.Closed, this.Transport.ErrorReason);
  478. break;
  479. case TransportStates.Closed:
  480. SetState(ConnectionStates.Closed);
  481. break;
  482. }
  483. }
  484. private void SetState(ConnectionStates state, string errorReason = null)
  485. {
  486. HTTPManager.Logger.Information("HubConnection", "SetState - from State: " + this.State.ToString() + " to State: " + state.ToString() + " errorReason: " + errorReason ?? string.Empty);
  487. if (this.State == state)
  488. return;
  489. this.State = state;
  490. switch (state)
  491. {
  492. case ConnectionStates.Initial:
  493. case ConnectionStates.Authenticating:
  494. case ConnectionStates.Negotiating:
  495. case ConnectionStates.CloseInitiated:
  496. break;
  497. case ConnectionStates.Connected:
  498. try
  499. {
  500. if (this.OnConnected != null)
  501. this.OnConnected(this);
  502. }
  503. catch(Exception ex)
  504. {
  505. HTTPManager.Logger.Exception("HubConnection", "Exception in OnConnected user code!", ex);
  506. }
  507. break;
  508. case ConnectionStates.Closed:
  509. if (string.IsNullOrEmpty(errorReason))
  510. {
  511. if (this.OnClosed != null)
  512. {
  513. try
  514. {
  515. this.OnClosed(this);
  516. }
  517. catch(Exception ex)
  518. {
  519. HTTPManager.Logger.Exception("HubConnection", "Exception in OnClosed user code!", ex);
  520. }
  521. }
  522. }
  523. else
  524. {
  525. if (this.OnError != null)
  526. {
  527. try
  528. {
  529. this.OnError(this, errorReason);
  530. }
  531. catch(Exception ex)
  532. {
  533. HTTPManager.Logger.Exception("HubConnection", "Exception in OnError user code!", ex);
  534. }
  535. }
  536. }
  537. break;
  538. }
  539. }
  540. }
  541. }
  542. #endif