123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- using System;
- using System.Collections.Generic;
- using BestHTTP.SocketIO.Transports;
- using BestHTTP.Extensions;
- using BestHTTP.SocketIO.JsonEncoders;
- using BestHTTP.SocketIO.Events;
- namespace BestHTTP.SocketIO
- {
- public sealed class SocketManager : IHeartbeat, IManager
- {
- /// <summary>
- /// Possible states of a SocketManager instance.
- /// </summary>
- public enum States
- {
- /// <summary>
- /// Initial state of the SocketManager
- /// </summary>
- Initial,
- /// <summary>
- /// The SocketManager is closed, initiated by the user or by the server
- /// </summary>
- Closed,
- /// <summary>
- /// The SocketManager is currently opening.
- /// </summary>
- Opening,
- /// <summary>
- /// The SocketManager is open, events can be sent to the server.
- /// </summary>
- Open,
- /// <summary>
- /// Paused for transport upgrade
- /// </summary>
- Paused,
- /// <summary>
- /// An error occurred, the SocketManager now trying to connect again to the server.
- /// </summary>
- Reconnecting
- }
- /// <summary>
- /// The default Json encode/decoder that will be used to encode/decode the event arguments.
- /// </summary>
- public static IJsonEncoder DefaultEncoder = new DefaultJSonEncoder();
- /// <summary>
- /// Supported Socket.IO protocol version
- /// </summary>
- public const int MinProtocolVersion = 4;
- #region Public Properties
- /// <summary>
- /// The current state of this Socket.IO manager.
- /// </summary>
- public States State { get { return state; } private set { PreviousState = state; state = value; } }
- private States state;
- /// <summary>
- /// The SocketOptions instance that this manager will use.
- /// </summary>
- public SocketOptions Options { get; private set; }
- /// <summary>
- /// The Uri to the Socket.IO endpoint.
- /// </summary>
- public Uri Uri { get; private set; }
- /// <summary>
- /// The server sent and parsed Handshake data.
- /// </summary>
- public HandshakeData Handshake { get; private set; }
- /// <summary>
- /// The currently used main transport instance.
- /// </summary>
- public ITransport Transport { get; private set; }
- /// <summary>
- /// The Request counter for request-based transports.
- /// </summary>
- public ulong RequestCounter { get; internal set; }
- /// <summary>
- /// The root("/") Socket.
- /// </summary>
- public Socket Socket { get { return GetSocket(); } }
- /// <summary>
- /// Indexer to access socket associated to the given namespace.
- /// </summary>
- public Socket this[string nsp] { get { return GetSocket(nsp); } }
- /// <summary>
- /// How many reconnect attempts made.
- /// </summary>
- public int ReconnectAttempts { get; private set; }
- /// <summary>
- /// The JSon encoder that will be used to encode the sent data to json and decode the received json to an object list.
- /// </summary>
- public IJsonEncoder Encoder { get; set; }
- #endregion
- #region Internal Properties
- /// <summary>
- /// Timestamp support to the request based transports.
- /// </summary>
- internal UInt32 Timestamp { get { return (UInt32)(DateTime.UtcNow.Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds; } }
- /// <summary>
- /// Auto-incrementing property to return Ack ids.
- /// </summary>
- internal int NextAckId { get { return System.Threading.Interlocked.Increment(ref nextAckId); } }
- private int nextAckId;
- /// <summary>
- /// Internal property to store the previous state of the manager.
- /// </summary>
- internal States PreviousState { get; private set; }
- /// <summary>
- /// Transport currently upgrading.
- /// </summary>
- internal ITransport UpgradingTransport { get; set; }
- #endregion
- #region Privates
- /// <summary>
- /// Namespace name -> Socket mapping
- /// </summary>
- private Dictionary<string, Socket> Namespaces = new Dictionary<string, Socket>();
- /// <summary>
- /// List of the sockets to able to iterate over them easily.
- /// </summary>
- private List<Socket> Sockets = new List<Socket>();
- /// <summary>
- /// List of unsent packets. Only instantiated when we have to use it.
- /// </summary>
- private List<Packet> OfflinePackets;
- /// <summary>
- /// When we sent out the last heartbeat(Ping) message.
- /// </summary>
- private DateTime LastHeartbeat = DateTime.MinValue;
- /// <summary>
- /// When we have to try to do a reconnect attempt
- /// </summary>
- private DateTime ReconnectAt;
- /// <summary>
- /// When we started to connect to the server.
- /// </summary>
- private DateTime ConnectionStarted;
- /// <summary>
- /// Private flag to avoid multiple Close call
- /// </summary>
- private bool closing;
- /// <summary>
- /// Whether the connection is waiting for a ping response.
- /// </summary>
- private bool IsWaitingPong;
- #endregion
- #region Constructors
- /// <summary>
- /// Constructor to create a SocketManager instance that will connect to the given uri.
- /// </summary>
- public SocketManager(Uri uri)
- :this(uri, new SocketOptions())
- {
- }
- /// <summary>
- /// Constructor to create a SocketManager instance.
- /// </summary>
- public SocketManager(Uri uri, SocketOptions options)
- {
- Uri = uri;
- Options = options;
- State = States.Initial;
- PreviousState = States.Initial;
- Encoder = SocketManager.DefaultEncoder;
- }
- #endregion
- /// <summary>
- /// Returns with the "/" namespace, the same as the Socket property.
- /// </summary>
- public Socket GetSocket()
- {
- return GetSocket("/");
- }
- /// <summary>
- /// Returns with the specified namespace
- /// </summary>
- public Socket GetSocket(string nsp)
- {
- if (string.IsNullOrEmpty(nsp))
- throw new ArgumentNullException("Namespace parameter is null or empty!");
- /*if (nsp[0] != '/')
- nsp = "/" + nsp;*/
- Socket socket = null;
- if (!Namespaces.TryGetValue(nsp, out socket))
- {
- // No socket found, create one
- socket = new Socket(nsp, this);
- Namespaces.Add(nsp, socket);
- Sockets.Add(socket);
- (socket as ISocket).Open();
- }
- return socket;
- }
- /// <summary>
- /// Internal function to remove a Socket instance from this manager.
- /// </summary>
- /// <param name="socket"></param>
- void IManager.Remove(Socket socket)
- {
- Namespaces.Remove(socket.Namespace);
- Sockets.Remove(socket);
- if (Sockets.Count == 0)
- Close();
- }
- #region Connection to the server, and upgrading
- /// <summary>
- /// This function will begin to open the Socket.IO connection by sending out the handshake request.
- /// If the Options' AutoConnect is true, it will be called automatically.
- /// </summary>
- public void Open()
- {
- if (State != States.Initial &&
- State != States.Closed &&
- State != States.Reconnecting)
- return;
- HTTPManager.Logger.Information("SocketManager", "Opening");
- ReconnectAt = DateTime.MinValue;
- switch (Options.ConnectWith)
- {
- case TransportTypes.Polling: Transport = new PollingTransport(this); break;
- case TransportTypes.WebSocket: Transport = new WebSocketTransport(this); break;
- #endif
- }
- Transport.Open();
- (this as IManager).EmitEvent("connecting");
- State = States.Opening;
- ConnectionStarted = DateTime.UtcNow;
- HTTPManager.Heartbeats.Subscribe(this);
- // The root namespace will be opened by default
- GetSocket("/");
- }
- /// <summary>
- /// Closes this Socket.IO connection.
- /// </summary>
- public void Close()
- {
- (this as IManager).Close(true);
- }
- /// <summary>
- /// Closes this Socket.IO connection.
- /// </summary>
- void IManager.Close(bool removeSockets)
- {
- if (State == States.Closed || closing)
- return;
- closing = true;
- HTTPManager.Logger.Information("SocketManager", "Closing");
- HTTPManager.Heartbeats.Unsubscribe(this);
- // Disconnect the sockets. The Disconnect function will call the Remove function to remove it from the Sockets list.
- if (removeSockets)
- while (Sockets.Count > 0)
- (Sockets[Sockets.Count - 1] as ISocket).Disconnect(removeSockets);
- else
- for (int i = 0; i < Sockets.Count; ++i)
- (Sockets[i] as ISocket).Disconnect(removeSockets);
- // Set to Closed after Socket's Disconnect. This way we can send the disconnect events to the server.
- State = States.Closed;
- LastHeartbeat = DateTime.MinValue;
- if (OfflinePackets != null)
- OfflinePackets.Clear();
- // Remove the references from the dictionary too.
- if (removeSockets)
- Namespaces.Clear();
- Handshake = null;
- if (Transport != null)
- Transport.Close();
- Transport = null;
- closing = false;
- }
- /// <summary>
- /// Called from a ITransport implementation when an error occurs and we may have to try to reconnect.
- /// </summary>
- void IManager.TryToReconnect()
- {
- if (State == States.Reconnecting ||
- State == States.Closed)
- return;
- if (!Options.Reconnection || HTTPManager.IsQuitting)
- {
- Close();
- return;
- }
- if (++ReconnectAttempts >= Options.ReconnectionAttempts)
- {
- (this as IManager).EmitEvent("reconnect_failed");
- Close();
- return;
- }
- Random rand = new Random();
- int delay = (int)Options.ReconnectionDelay.TotalMilliseconds * ReconnectAttempts;
- ReconnectAt = DateTime.UtcNow +
- TimeSpan.FromMilliseconds(Math.Min(rand.Next(/*rand min:*/(int)(delay - (delay * Options.RandomizationFactor)),
- /*rand max:*/(int)(delay + (delay * Options.RandomizationFactor))),
- (int)Options.ReconnectionDelayMax.TotalMilliseconds));
- (this as IManager).Close(false);
- State = States.Reconnecting;
- for (int i = 0; i < Sockets.Count; ++i)
- (Sockets[i] as ISocket).Open();
- // In the Close() function we unregistered
- HTTPManager.Heartbeats.Subscribe(this);
- HTTPManager.Logger.Information("SocketManager", "Reconnecting");
- }
- /// <summary>
- /// Called by transports when they are connected to the server.
- /// </summary>
- bool IManager.OnTransportConnected(ITransport trans)
- {
- if (State != States.Opening)
- return false;
- if (PreviousState == States.Reconnecting)
- (this as IManager).EmitEvent("reconnect");
- State = States.Open;
- ReconnectAttempts = 0;
- // Send out packets that we collected while there were no available transport.
- SendOfflinePackets();
- HTTPManager.Logger.Information("SocketManager", "Open");
- // Can we upgrade to WebSocket transport?
- if (Transport.Type != TransportTypes.WebSocket &&
- Handshake.Upgrades.Contains("websocket"))
- {
- UpgradingTransport = new WebSocketTransport(this);
- UpgradingTransport.Open();
- }
- #endif
- return true;
- }
- void IManager.OnTransportError(ITransport trans, string err)
- {
- (this as IManager).EmitError(SocketIOErrors.Internal, err);
- trans.Close();
- (this as IManager).TryToReconnect();
- }
- void IManager.OnTransportProbed(ITransport trans)
- {
- HTTPManager.Logger.Information("SocketManager", "\"probe\" packet received");
- // If we have to reconnect, we will go straight with the transport we were able to upgrade
- Options.ConnectWith = trans.Type;
- // Pause ourself to wait for any send and receive turn to finish.
- State = States.Paused;
- }
- #endregion
- #region Packet Handling
- /// <summary>
- /// Select the best transport to send out packets.
- /// </summary>
- private ITransport SelectTransport()
- {
- if (State != States.Open || Transport == null)
- return null;
- return Transport.IsRequestInProgress ? null : Transport;
- }
- /// <summary>
- /// Will select the best transport and sends out all packets that are in the OfflinePackets list.
- /// </summary>
- private void SendOfflinePackets()
- {
- ITransport trans = SelectTransport();
- // Send out packets that we not sent while no transport was available.
- // This function is called before the event handlers get the 'connected' event, so
- // theoretically the packet orders are remains.
- if (OfflinePackets != null && OfflinePackets.Count > 0 && trans != null)
- {
- trans.Send(OfflinePackets);
- OfflinePackets.Clear();
- }
- }
- /// <summary>
- /// Internal function that called from the Socket class. It will send out the packet instantly, or if no transport is available it will store
- /// the packet in the OfflinePackets list.
- /// </summary>
- void IManager.SendPacket(Packet packet)
- {
- ITransport trans = SelectTransport();
- if (trans != null)
- {
- try
- {
- trans.Send(packet);
- }
- catch(Exception ex)
- {
- (this as IManager).EmitError(SocketIOErrors.Internal, ex.Message + " " + ex.StackTrace);
- }
- }
- else
- {
- if (OfflinePackets == null)
- OfflinePackets = new List<Packet>();
- // The same packet can be sent through multiple Sockets.
- OfflinePackets.Add(packet.Clone());
- }
- }
- /// <summary>
- /// Called from the currently operating Transport. Will pass forward to the Socket that has to call the callbacks.
- /// </summary>
- void IManager.OnPacket(Packet packet)
- {
- if (State == States.Closed)
- return;
- switch(packet.TransportEvent)
- {
- case TransportEventTypes.Open:
- if (Handshake == null)
- {
- Handshake = new HandshakeData();
- if (!Handshake.Parse(packet.Payload))
- HTTPManager.Logger.Warning("SocketManager", "Expected handshake data, but wasn't able to pars. Payload: " + packet.Payload);
- (this as IManager).OnTransportConnected(Transport);
- return;
- }
- break;
- case TransportEventTypes.Ping:
- (this as IManager).SendPacket(new Packet(TransportEventTypes.Pong, SocketIOEventTypes.Unknown, "/", string.Empty));
- break;
- case TransportEventTypes.Pong:
- IsWaitingPong = false;
- break;
- }
- Socket socket = null;
- if (Namespaces.TryGetValue(packet.Namespace, out socket))
- (socket as ISocket).OnPacket(packet);
- else
- HTTPManager.Logger.Warning("SocketManager", "Namespace \"" + packet.Namespace + "\" not found!");
- }
- #endregion
- /// <summary>
- /// Sends an event to all available namespaces.
- /// </summary>
- public void EmitAll(string eventName, params object[] args)
- {
- for (int i = 0; i < Sockets.Count; ++i)
- Sockets[i].Emit(eventName, args);
- }
- /// <summary>
- /// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
- /// </summary>
- void IManager.EmitEvent(string eventName, params object[] args)
- {
- Socket socket = null;
- if (Namespaces.TryGetValue("/", out socket))
- (socket as ISocket).EmitEvent(eventName, args);
- }
- /// <summary>
- /// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
- /// </summary>
- void IManager.EmitEvent(SocketIOEventTypes type, params object[] args)
- {
- (this as IManager).EmitEvent(EventNames.GetNameFor(type), args);
- }
- void IManager.EmitError(SocketIOErrors errCode, string msg)
- {
- (this as IManager).EmitEvent(SocketIOEventTypes.Error, new Error(errCode, msg));
- }
- void IManager.EmitAll(string eventName, params object[] args)
- {
- for (int i = 0; i < Sockets.Count; ++i)
- (Sockets[i] as ISocket).EmitEvent(eventName, args);
- }
- #region IHeartbeat Implementation
- /// <summary>
- /// Called from the HTTPManager's OnUpdate function every frame. It's main function is to send out heartbeat messages.
- /// </summary>
- void IHeartbeat.OnHeartbeatUpdate(TimeSpan dif)
- {
- switch (State)
- {
- case States.Paused:
- // To ensure no messages are lost, the upgrade packet will only be sent once all the buffers of the existing transport are flushed and the transport is considered paused.
- if (!Transport.IsRequestInProgress &&
- !Transport.IsPollingInProgress)
- {
- State = States.Open;
- // Close the current transport
- Transport.Close();
- // and switch to the newly upgraded one
- Transport = UpgradingTransport;
- UpgradingTransport = null;
- // We will send an Upgrade("5") packet.
- Transport.Send(new Packet(TransportEventTypes.Upgrade, SocketIOEventTypes.Unknown, "/", string.Empty));
- goto case States.Open;
- }
- break;
- case States.Opening:
- if (DateTime.UtcNow - ConnectionStarted >= Options.Timeout)
- {
- (this as IManager).EmitError(SocketIOErrors.Internal, "Connection timed out!");
- (this as IManager).EmitEvent("connect_error");
- (this as IManager).EmitEvent("connect_timeout");
- (this as IManager).TryToReconnect();
- }
- break;
- case States.Reconnecting:
- if (ReconnectAt != DateTime.MinValue && DateTime.UtcNow >= ReconnectAt)
- {
- (this as IManager).EmitEvent("reconnect_attempt");
- (this as IManager).EmitEvent("reconnecting");
- Open();
- }
- break;
- case States.Open:
- ITransport trans = null;
- // Select transport to use
- if (Transport != null && Transport.State == TransportStates.Open)
- trans = Transport;
- // not yet open?
- if (trans == null || trans.State != TransportStates.Open)
- return;
- // Start to poll the server for events
- trans.Poll();
- // Start to send out unsent packets
- SendOfflinePackets();
- // First time we reached this point. Set the LastHeartbeat to the current time, 'cause we are just opened.
- if (LastHeartbeat == DateTime.MinValue)
- {
- LastHeartbeat = DateTime.UtcNow;
- return;
- }
- // It's time to send out a ping event to the server
- if (!IsWaitingPong && DateTime.UtcNow - LastHeartbeat > Handshake.PingInterval)
- {
- (this as IManager).SendPacket(new Packet(TransportEventTypes.Ping, SocketIOEventTypes.Unknown, "/", string.Empty));
- LastHeartbeat = DateTime.UtcNow;
- IsWaitingPong = true;
- }
- // No pong event received in the given time, we are disconnected.
- if (IsWaitingPong && DateTime.UtcNow - LastHeartbeat > Handshake.PingTimeout)
- {
- IsWaitingPong = false;
- (this as IManager).TryToReconnect();
- }
- break; // case States.Open:
- }
- }
- #endregion
- }
- }
- #endif