SocketManager.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. #if !BESTHTTP_DISABLE_SOCKETIO
  2. using System;
  3. using System.Collections.Generic;
  4. using BestHTTP.SocketIO.Transports;
  5. using BestHTTP.Extensions;
  6. using BestHTTP.SocketIO.JsonEncoders;
  7. using BestHTTP.SocketIO.Events;
  8. namespace BestHTTP.SocketIO
  9. {
  10. public sealed class SocketManager : IHeartbeat, IManager
  11. {
  12. /// <summary>
  13. /// Possible states of a SocketManager instance.
  14. /// </summary>
  15. public enum States
  16. {
  17. /// <summary>
  18. /// Initial state of the SocketManager
  19. /// </summary>
  20. Initial,
  21. /// <summary>
  22. /// The SocketManager is closed, initiated by the user or by the server
  23. /// </summary>
  24. Closed,
  25. /// <summary>
  26. /// The SocketManager is currently opening.
  27. /// </summary>
  28. Opening,
  29. /// <summary>
  30. /// The SocketManager is open, events can be sent to the server.
  31. /// </summary>
  32. Open,
  33. /// <summary>
  34. /// Paused for transport upgrade
  35. /// </summary>
  36. Paused,
  37. /// <summary>
  38. /// An error occurred, the SocketManager now trying to connect again to the server.
  39. /// </summary>
  40. Reconnecting
  41. }
  42. /// <summary>
  43. /// The default Json encode/decoder that will be used to encode/decode the event arguments.
  44. /// </summary>
  45. public static IJsonEncoder DefaultEncoder = new DefaultJSonEncoder();
  46. /// <summary>
  47. /// Supported Socket.IO protocol version
  48. /// </summary>
  49. public const int MinProtocolVersion = 4;
  50. #region Public Properties
  51. /// <summary>
  52. /// The current state of this Socket.IO manager.
  53. /// </summary>
  54. public States State { get { return state; } private set { PreviousState = state; state = value; } }
  55. private States state;
  56. /// <summary>
  57. /// The SocketOptions instance that this manager will use.
  58. /// </summary>
  59. public SocketOptions Options { get; private set; }
  60. /// <summary>
  61. /// The Uri to the Socket.IO endpoint.
  62. /// </summary>
  63. public Uri Uri { get; private set; }
  64. /// <summary>
  65. /// The server sent and parsed Handshake data.
  66. /// </summary>
  67. public HandshakeData Handshake { get; private set; }
  68. /// <summary>
  69. /// The currently used main transport instance.
  70. /// </summary>
  71. public ITransport Transport { get; private set; }
  72. /// <summary>
  73. /// The Request counter for request-based transports.
  74. /// </summary>
  75. public ulong RequestCounter { get; internal set; }
  76. /// <summary>
  77. /// The root("/") Socket.
  78. /// </summary>
  79. public Socket Socket { get { return GetSocket(); } }
  80. /// <summary>
  81. /// Indexer to access socket associated to the given namespace.
  82. /// </summary>
  83. public Socket this[string nsp] { get { return GetSocket(nsp); } }
  84. /// <summary>
  85. /// How many reconnect attempts made.
  86. /// </summary>
  87. public int ReconnectAttempts { get; private set; }
  88. /// <summary>
  89. /// The JSon encoder that will be used to encode the sent data to json and decode the received json to an object list.
  90. /// </summary>
  91. public IJsonEncoder Encoder { get; set; }
  92. #endregion
  93. #region Internal Properties
  94. /// <summary>
  95. /// Timestamp support to the request based transports.
  96. /// </summary>
  97. internal UInt32 Timestamp { get { return (UInt32)(DateTime.UtcNow.Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds; } }
  98. /// <summary>
  99. /// Auto-incrementing property to return Ack ids.
  100. /// </summary>
  101. internal int NextAckId { get { return System.Threading.Interlocked.Increment(ref nextAckId); } }
  102. private int nextAckId;
  103. /// <summary>
  104. /// Internal property to store the previous state of the manager.
  105. /// </summary>
  106. internal States PreviousState { get; private set; }
  107. /// <summary>
  108. /// Transport currently upgrading.
  109. /// </summary>
  110. internal ITransport UpgradingTransport { get; set; }
  111. #endregion
  112. #region Privates
  113. /// <summary>
  114. /// Namespace name -> Socket mapping
  115. /// </summary>
  116. private Dictionary<string, Socket> Namespaces = new Dictionary<string, Socket>();
  117. /// <summary>
  118. /// List of the sockets to able to iterate over them easily.
  119. /// </summary>
  120. private List<Socket> Sockets = new List<Socket>();
  121. /// <summary>
  122. /// List of unsent packets. Only instantiated when we have to use it.
  123. /// </summary>
  124. private List<Packet> OfflinePackets;
  125. /// <summary>
  126. /// When we sent out the last heartbeat(Ping) message.
  127. /// </summary>
  128. private DateTime LastHeartbeat = DateTime.MinValue;
  129. /// <summary>
  130. /// When we have to try to do a reconnect attempt
  131. /// </summary>
  132. private DateTime ReconnectAt;
  133. /// <summary>
  134. /// When we started to connect to the server.
  135. /// </summary>
  136. private DateTime ConnectionStarted;
  137. /// <summary>
  138. /// Private flag to avoid multiple Close call
  139. /// </summary>
  140. private bool closing;
  141. /// <summary>
  142. /// Whether the connection is waiting for a ping response.
  143. /// </summary>
  144. private bool IsWaitingPong;
  145. #endregion
  146. #region Constructors
  147. /// <summary>
  148. /// Constructor to create a SocketManager instance that will connect to the given uri.
  149. /// </summary>
  150. public SocketManager(Uri uri)
  151. :this(uri, new SocketOptions())
  152. {
  153. }
  154. /// <summary>
  155. /// Constructor to create a SocketManager instance.
  156. /// </summary>
  157. public SocketManager(Uri uri, SocketOptions options)
  158. {
  159. Uri = uri;
  160. Options = options;
  161. State = States.Initial;
  162. PreviousState = States.Initial;
  163. Encoder = SocketManager.DefaultEncoder;
  164. }
  165. #endregion
  166. /// <summary>
  167. /// Returns with the "/" namespace, the same as the Socket property.
  168. /// </summary>
  169. public Socket GetSocket()
  170. {
  171. return GetSocket("/");
  172. }
  173. /// <summary>
  174. /// Returns with the specified namespace
  175. /// </summary>
  176. public Socket GetSocket(string nsp)
  177. {
  178. if (string.IsNullOrEmpty(nsp))
  179. throw new ArgumentNullException("Namespace parameter is null or empty!");
  180. /*if (nsp[0] != '/')
  181. nsp = "/" + nsp;*/
  182. Socket socket = null;
  183. if (!Namespaces.TryGetValue(nsp, out socket))
  184. {
  185. // No socket found, create one
  186. socket = new Socket(nsp, this);
  187. Namespaces.Add(nsp, socket);
  188. Sockets.Add(socket);
  189. (socket as ISocket).Open();
  190. }
  191. return socket;
  192. }
  193. /// <summary>
  194. /// Internal function to remove a Socket instance from this manager.
  195. /// </summary>
  196. /// <param name="socket"></param>
  197. void IManager.Remove(Socket socket)
  198. {
  199. Namespaces.Remove(socket.Namespace);
  200. Sockets.Remove(socket);
  201. if (Sockets.Count == 0)
  202. Close();
  203. }
  204. #region Connection to the server, and upgrading
  205. /// <summary>
  206. /// This function will begin to open the Socket.IO connection by sending out the handshake request.
  207. /// If the Options' AutoConnect is true, it will be called automatically.
  208. /// </summary>
  209. public void Open()
  210. {
  211. if (State != States.Initial &&
  212. State != States.Closed &&
  213. State != States.Reconnecting)
  214. return;
  215. HTTPManager.Logger.Information("SocketManager", "Opening");
  216. ReconnectAt = DateTime.MinValue;
  217. switch (Options.ConnectWith)
  218. {
  219. case TransportTypes.Polling: Transport = new PollingTransport(this); break;
  220. #if !BESTHTTP_DISABLE_WEBSOCKET
  221. case TransportTypes.WebSocket: Transport = new WebSocketTransport(this); break;
  222. #endif
  223. }
  224. Transport.Open();
  225. (this as IManager).EmitEvent("connecting");
  226. State = States.Opening;
  227. ConnectionStarted = DateTime.UtcNow;
  228. HTTPManager.Heartbeats.Subscribe(this);
  229. // The root namespace will be opened by default
  230. GetSocket("/");
  231. }
  232. /// <summary>
  233. /// Closes this Socket.IO connection.
  234. /// </summary>
  235. public void Close()
  236. {
  237. (this as IManager).Close(true);
  238. }
  239. /// <summary>
  240. /// Closes this Socket.IO connection.
  241. /// </summary>
  242. void IManager.Close(bool removeSockets)
  243. {
  244. if (State == States.Closed || closing)
  245. return;
  246. closing = true;
  247. HTTPManager.Logger.Information("SocketManager", "Closing");
  248. HTTPManager.Heartbeats.Unsubscribe(this);
  249. // Disconnect the sockets. The Disconnect function will call the Remove function to remove it from the Sockets list.
  250. if (removeSockets)
  251. while (Sockets.Count > 0)
  252. (Sockets[Sockets.Count - 1] as ISocket).Disconnect(removeSockets);
  253. else
  254. for (int i = 0; i < Sockets.Count; ++i)
  255. (Sockets[i] as ISocket).Disconnect(removeSockets);
  256. // Set to Closed after Socket's Disconnect. This way we can send the disconnect events to the server.
  257. State = States.Closed;
  258. LastHeartbeat = DateTime.MinValue;
  259. if (OfflinePackets != null)
  260. OfflinePackets.Clear();
  261. // Remove the references from the dictionary too.
  262. if (removeSockets)
  263. Namespaces.Clear();
  264. Handshake = null;
  265. if (Transport != null)
  266. Transport.Close();
  267. Transport = null;
  268. closing = false;
  269. }
  270. /// <summary>
  271. /// Called from a ITransport implementation when an error occurs and we may have to try to reconnect.
  272. /// </summary>
  273. void IManager.TryToReconnect()
  274. {
  275. if (State == States.Reconnecting ||
  276. State == States.Closed)
  277. return;
  278. if (!Options.Reconnection || HTTPManager.IsQuitting)
  279. {
  280. Close();
  281. return;
  282. }
  283. if (++ReconnectAttempts >= Options.ReconnectionAttempts)
  284. {
  285. (this as IManager).EmitEvent("reconnect_failed");
  286. Close();
  287. return;
  288. }
  289. Random rand = new Random();
  290. int delay = (int)Options.ReconnectionDelay.TotalMilliseconds * ReconnectAttempts;
  291. ReconnectAt = DateTime.UtcNow +
  292. TimeSpan.FromMilliseconds(Math.Min(rand.Next(/*rand min:*/(int)(delay - (delay * Options.RandomizationFactor)),
  293. /*rand max:*/(int)(delay + (delay * Options.RandomizationFactor))),
  294. (int)Options.ReconnectionDelayMax.TotalMilliseconds));
  295. (this as IManager).Close(false);
  296. State = States.Reconnecting;
  297. for (int i = 0; i < Sockets.Count; ++i)
  298. (Sockets[i] as ISocket).Open();
  299. // In the Close() function we unregistered
  300. HTTPManager.Heartbeats.Subscribe(this);
  301. HTTPManager.Logger.Information("SocketManager", "Reconnecting");
  302. }
  303. /// <summary>
  304. /// Called by transports when they are connected to the server.
  305. /// </summary>
  306. bool IManager.OnTransportConnected(ITransport trans)
  307. {
  308. if (State != States.Opening)
  309. return false;
  310. if (PreviousState == States.Reconnecting)
  311. (this as IManager).EmitEvent("reconnect");
  312. State = States.Open;
  313. ReconnectAttempts = 0;
  314. // Send out packets that we collected while there were no available transport.
  315. SendOfflinePackets();
  316. HTTPManager.Logger.Information("SocketManager", "Open");
  317. #if !BESTHTTP_DISABLE_WEBSOCKET
  318. // Can we upgrade to WebSocket transport?
  319. if (Transport.Type != TransportTypes.WebSocket &&
  320. Handshake.Upgrades.Contains("websocket"))
  321. {
  322. UpgradingTransport = new WebSocketTransport(this);
  323. UpgradingTransport.Open();
  324. }
  325. #endif
  326. return true;
  327. }
  328. void IManager.OnTransportError(ITransport trans, string err)
  329. {
  330. (this as IManager).EmitError(SocketIOErrors.Internal, err);
  331. trans.Close();
  332. (this as IManager).TryToReconnect();
  333. }
  334. void IManager.OnTransportProbed(ITransport trans)
  335. {
  336. HTTPManager.Logger.Information("SocketManager", "\"probe\" packet received");
  337. // If we have to reconnect, we will go straight with the transport we were able to upgrade
  338. Options.ConnectWith = trans.Type;
  339. // Pause ourself to wait for any send and receive turn to finish.
  340. State = States.Paused;
  341. }
  342. #endregion
  343. #region Packet Handling
  344. /// <summary>
  345. /// Select the best transport to send out packets.
  346. /// </summary>
  347. private ITransport SelectTransport()
  348. {
  349. if (State != States.Open || Transport == null)
  350. return null;
  351. return Transport.IsRequestInProgress ? null : Transport;
  352. }
  353. /// <summary>
  354. /// Will select the best transport and sends out all packets that are in the OfflinePackets list.
  355. /// </summary>
  356. private void SendOfflinePackets()
  357. {
  358. ITransport trans = SelectTransport();
  359. // Send out packets that we not sent while no transport was available.
  360. // This function is called before the event handlers get the 'connected' event, so
  361. // theoretically the packet orders are remains.
  362. if (OfflinePackets != null && OfflinePackets.Count > 0 && trans != null)
  363. {
  364. trans.Send(OfflinePackets);
  365. OfflinePackets.Clear();
  366. }
  367. }
  368. /// <summary>
  369. /// Internal function that called from the Socket class. It will send out the packet instantly, or if no transport is available it will store
  370. /// the packet in the OfflinePackets list.
  371. /// </summary>
  372. void IManager.SendPacket(Packet packet)
  373. {
  374. ITransport trans = SelectTransport();
  375. if (trans != null)
  376. {
  377. try
  378. {
  379. trans.Send(packet);
  380. }
  381. catch(Exception ex)
  382. {
  383. (this as IManager).EmitError(SocketIOErrors.Internal, ex.Message + " " + ex.StackTrace);
  384. }
  385. }
  386. else
  387. {
  388. if (OfflinePackets == null)
  389. OfflinePackets = new List<Packet>();
  390. // The same packet can be sent through multiple Sockets.
  391. OfflinePackets.Add(packet.Clone());
  392. }
  393. }
  394. /// <summary>
  395. /// Called from the currently operating Transport. Will pass forward to the Socket that has to call the callbacks.
  396. /// </summary>
  397. void IManager.OnPacket(Packet packet)
  398. {
  399. if (State == States.Closed)
  400. return;
  401. switch(packet.TransportEvent)
  402. {
  403. case TransportEventTypes.Open:
  404. if (Handshake == null)
  405. {
  406. Handshake = new HandshakeData();
  407. if (!Handshake.Parse(packet.Payload))
  408. HTTPManager.Logger.Warning("SocketManager", "Expected handshake data, but wasn't able to pars. Payload: " + packet.Payload);
  409. (this as IManager).OnTransportConnected(Transport);
  410. return;
  411. }
  412. break;
  413. case TransportEventTypes.Ping:
  414. (this as IManager).SendPacket(new Packet(TransportEventTypes.Pong, SocketIOEventTypes.Unknown, "/", string.Empty));
  415. break;
  416. case TransportEventTypes.Pong:
  417. IsWaitingPong = false;
  418. break;
  419. }
  420. Socket socket = null;
  421. if (Namespaces.TryGetValue(packet.Namespace, out socket))
  422. (socket as ISocket).OnPacket(packet);
  423. else
  424. HTTPManager.Logger.Warning("SocketManager", "Namespace \"" + packet.Namespace + "\" not found!");
  425. }
  426. #endregion
  427. /// <summary>
  428. /// Sends an event to all available namespaces.
  429. /// </summary>
  430. public void EmitAll(string eventName, params object[] args)
  431. {
  432. for (int i = 0; i < Sockets.Count; ++i)
  433. Sockets[i].Emit(eventName, args);
  434. }
  435. /// <summary>
  436. /// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
  437. /// </summary>
  438. void IManager.EmitEvent(string eventName, params object[] args)
  439. {
  440. Socket socket = null;
  441. if (Namespaces.TryGetValue("/", out socket))
  442. (socket as ISocket).EmitEvent(eventName, args);
  443. }
  444. /// <summary>
  445. /// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
  446. /// </summary>
  447. void IManager.EmitEvent(SocketIOEventTypes type, params object[] args)
  448. {
  449. (this as IManager).EmitEvent(EventNames.GetNameFor(type), args);
  450. }
  451. void IManager.EmitError(SocketIOErrors errCode, string msg)
  452. {
  453. (this as IManager).EmitEvent(SocketIOEventTypes.Error, new Error(errCode, msg));
  454. }
  455. void IManager.EmitAll(string eventName, params object[] args)
  456. {
  457. for (int i = 0; i < Sockets.Count; ++i)
  458. (Sockets[i] as ISocket).EmitEvent(eventName, args);
  459. }
  460. #region IHeartbeat Implementation
  461. /// <summary>
  462. /// Called from the HTTPManager's OnUpdate function every frame. It's main function is to send out heartbeat messages.
  463. /// </summary>
  464. void IHeartbeat.OnHeartbeatUpdate(TimeSpan dif)
  465. {
  466. switch (State)
  467. {
  468. case States.Paused:
  469. // To ensure no messages are lost, the upgrade packet will only be sent once all the buffers of the existing transport are flushed and the transport is considered paused.
  470. if (!Transport.IsRequestInProgress &&
  471. !Transport.IsPollingInProgress)
  472. {
  473. State = States.Open;
  474. // Close the current transport
  475. Transport.Close();
  476. // and switch to the newly upgraded one
  477. Transport = UpgradingTransport;
  478. UpgradingTransport = null;
  479. // We will send an Upgrade("5") packet.
  480. Transport.Send(new Packet(TransportEventTypes.Upgrade, SocketIOEventTypes.Unknown, "/", string.Empty));
  481. goto case States.Open;
  482. }
  483. break;
  484. case States.Opening:
  485. if (DateTime.UtcNow - ConnectionStarted >= Options.Timeout)
  486. {
  487. (this as IManager).EmitError(SocketIOErrors.Internal, "Connection timed out!");
  488. (this as IManager).EmitEvent("connect_error");
  489. (this as IManager).EmitEvent("connect_timeout");
  490. (this as IManager).TryToReconnect();
  491. }
  492. break;
  493. case States.Reconnecting:
  494. if (ReconnectAt != DateTime.MinValue && DateTime.UtcNow >= ReconnectAt)
  495. {
  496. (this as IManager).EmitEvent("reconnect_attempt");
  497. (this as IManager).EmitEvent("reconnecting");
  498. Open();
  499. }
  500. break;
  501. case States.Open:
  502. ITransport trans = null;
  503. // Select transport to use
  504. if (Transport != null && Transport.State == TransportStates.Open)
  505. trans = Transport;
  506. // not yet open?
  507. if (trans == null || trans.State != TransportStates.Open)
  508. return;
  509. // Start to poll the server for events
  510. trans.Poll();
  511. // Start to send out unsent packets
  512. SendOfflinePackets();
  513. // First time we reached this point. Set the LastHeartbeat to the current time, 'cause we are just opened.
  514. if (LastHeartbeat == DateTime.MinValue)
  515. {
  516. LastHeartbeat = DateTime.UtcNow;
  517. return;
  518. }
  519. // It's time to send out a ping event to the server
  520. if (!IsWaitingPong && DateTime.UtcNow - LastHeartbeat > Handshake.PingInterval)
  521. {
  522. (this as IManager).SendPacket(new Packet(TransportEventTypes.Ping, SocketIOEventTypes.Unknown, "/", string.Empty));
  523. LastHeartbeat = DateTime.UtcNow;
  524. IsWaitingPong = true;
  525. }
  526. // No pong event received in the given time, we are disconnected.
  527. if (IsWaitingPong && DateTime.UtcNow - LastHeartbeat > Handshake.PingTimeout)
  528. {
  529. IsWaitingPong = false;
  530. (this as IManager).TryToReconnect();
  531. }
  532. break; // case States.Open:
  533. }
  534. }
  535. #endregion
  536. }
  537. }
  538. #endif