#if !BESTHTTP_DISABLE_SOCKETIO
using System;
using System.Collections.Generic;
using BestHTTP.SocketIO.Transports;
using BestHTTP.Extensions;
using BestHTTP.SocketIO.JsonEncoders;
using BestHTTP.SocketIO.Events;
namespace BestHTTP.SocketIO
{
public sealed class SocketManager : IHeartbeat, IManager
{
///
/// Possible states of a SocketManager instance.
///
public enum States
{
///
/// Initial state of the SocketManager
///
Initial,
///
/// The SocketManager is closed, initiated by the user or by the server
///
Closed,
///
/// The SocketManager is currently opening.
///
Opening,
///
/// The SocketManager is open, events can be sent to the server.
///
Open,
///
/// Paused for transport upgrade
///
Paused,
///
/// An error occurred, the SocketManager now trying to connect again to the server.
///
Reconnecting
}
///
/// The default Json encode/decoder that will be used to encode/decode the event arguments.
///
public static IJsonEncoder DefaultEncoder = new DefaultJSonEncoder();
///
/// Supported Socket.IO protocol version
///
public const int MinProtocolVersion = 4;
#region Public Properties
///
/// The current state of this Socket.IO manager.
///
public States State { get { return state; } private set { PreviousState = state; state = value; } }
private States state;
///
/// The SocketOptions instance that this manager will use.
///
public SocketOptions Options { get; private set; }
///
/// The Uri to the Socket.IO endpoint.
///
public Uri Uri { get; private set; }
///
/// The server sent and parsed Handshake data.
///
public HandshakeData Handshake { get; private set; }
///
/// The currently used main transport instance.
///
public ITransport Transport { get; private set; }
///
/// The Request counter for request-based transports.
///
public ulong RequestCounter { get; internal set; }
///
/// The root("/") Socket.
///
public Socket Socket { get { return GetSocket(); } }
///
/// Indexer to access socket associated to the given namespace.
///
public Socket this[string nsp] { get { return GetSocket(nsp); } }
///
/// How many reconnect attempts made.
///
public int ReconnectAttempts { get; private set; }
///
/// The JSon encoder that will be used to encode the sent data to json and decode the received json to an object list.
///
public IJsonEncoder Encoder { get; set; }
#endregion
#region Internal Properties
///
/// Timestamp support to the request based transports.
///
internal UInt32 Timestamp { get { return (UInt32)(DateTime.UtcNow.Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds; } }
///
/// Auto-incrementing property to return Ack ids.
///
internal int NextAckId { get { return System.Threading.Interlocked.Increment(ref nextAckId); } }
private int nextAckId;
///
/// Internal property to store the previous state of the manager.
///
internal States PreviousState { get; private set; }
///
/// Transport currently upgrading.
///
internal ITransport UpgradingTransport { get; set; }
#endregion
#region Privates
///
/// Namespace name -> Socket mapping
///
private Dictionary Namespaces = new Dictionary();
///
/// List of the sockets to able to iterate over them easily.
///
private List Sockets = new List();
///
/// List of unsent packets. Only instantiated when we have to use it.
///
private List OfflinePackets;
///
/// When we sent out the last heartbeat(Ping) message.
///
private DateTime LastHeartbeat = DateTime.MinValue;
///
/// When we have to try to do a reconnect attempt
///
private DateTime ReconnectAt;
///
/// When we started to connect to the server.
///
private DateTime ConnectionStarted;
///
/// Private flag to avoid multiple Close call
///
private bool closing;
///
/// Whether the connection is waiting for a ping response.
///
private bool IsWaitingPong;
#endregion
#region Constructors
///
/// Constructor to create a SocketManager instance that will connect to the given uri.
///
public SocketManager(Uri uri)
:this(uri, new SocketOptions())
{
}
///
/// Constructor to create a SocketManager instance.
///
public SocketManager(Uri uri, SocketOptions options)
{
Uri = uri;
Options = options;
State = States.Initial;
PreviousState = States.Initial;
Encoder = SocketManager.DefaultEncoder;
}
#endregion
///
/// Returns with the "/" namespace, the same as the Socket property.
///
public Socket GetSocket()
{
return GetSocket("/");
}
///
/// Returns with the specified namespace
///
public Socket GetSocket(string nsp)
{
if (string.IsNullOrEmpty(nsp))
throw new ArgumentNullException("Namespace parameter is null or empty!");
/*if (nsp[0] != '/')
nsp = "/" + nsp;*/
Socket socket = null;
if (!Namespaces.TryGetValue(nsp, out socket))
{
// No socket found, create one
socket = new Socket(nsp, this);
Namespaces.Add(nsp, socket);
Sockets.Add(socket);
(socket as ISocket).Open();
}
return socket;
}
///
/// Internal function to remove a Socket instance from this manager.
///
///
void IManager.Remove(Socket socket)
{
Namespaces.Remove(socket.Namespace);
Sockets.Remove(socket);
if (Sockets.Count == 0)
Close();
}
#region Connection to the server, and upgrading
///
/// This function will begin to open the Socket.IO connection by sending out the handshake request.
/// If the Options' AutoConnect is true, it will be called automatically.
///
public void Open()
{
if (State != States.Initial &&
State != States.Closed &&
State != States.Reconnecting)
return;
HTTPManager.Logger.Information("SocketManager", "Opening");
ReconnectAt = DateTime.MinValue;
switch (Options.ConnectWith)
{
case TransportTypes.Polling: Transport = new PollingTransport(this); break;
#if !BESTHTTP_DISABLE_WEBSOCKET
case TransportTypes.WebSocket: Transport = new WebSocketTransport(this); break;
#endif
}
Transport.Open();
(this as IManager).EmitEvent("connecting");
State = States.Opening;
ConnectionStarted = DateTime.UtcNow;
HTTPManager.Heartbeats.Subscribe(this);
// The root namespace will be opened by default
GetSocket("/");
}
///
/// Closes this Socket.IO connection.
///
public void Close()
{
(this as IManager).Close(true);
}
///
/// Closes this Socket.IO connection.
///
void IManager.Close(bool removeSockets)
{
if (State == States.Closed || closing)
return;
closing = true;
HTTPManager.Logger.Information("SocketManager", "Closing");
HTTPManager.Heartbeats.Unsubscribe(this);
// Disconnect the sockets. The Disconnect function will call the Remove function to remove it from the Sockets list.
if (removeSockets)
while (Sockets.Count > 0)
(Sockets[Sockets.Count - 1] as ISocket).Disconnect(removeSockets);
else
for (int i = 0; i < Sockets.Count; ++i)
(Sockets[i] as ISocket).Disconnect(removeSockets);
// SaveLocal to Closed after Socket's Disconnect. This way we can send the disconnect events to the server.
State = States.Closed;
LastHeartbeat = DateTime.MinValue;
if (OfflinePackets != null)
OfflinePackets.Clear();
// Remove the references from the dictionary too.
if (removeSockets)
Namespaces.Clear();
Handshake = null;
if (Transport != null)
Transport.Close();
Transport = null;
closing = false;
}
///
/// Called from a ITransport implementation when an error occurs and we may have to try to reconnect.
///
void IManager.TryToReconnect()
{
if (State == States.Reconnecting ||
State == States.Closed)
return;
if (!Options.Reconnection || HTTPManager.IsQuitting)
{
Close();
return;
}
if (++ReconnectAttempts >= Options.ReconnectionAttempts)
{
(this as IManager).EmitEvent("reconnect_failed");
Close();
return;
}
Random rand = new Random();
int delay = (int)Options.ReconnectionDelay.TotalMilliseconds * ReconnectAttempts;
ReconnectAt = DateTime.UtcNow +
TimeSpan.FromMilliseconds(Math.Min(rand.Next(/*rand min:*/(int)(delay - (delay * Options.RandomizationFactor)),
/*rand max:*/(int)(delay + (delay * Options.RandomizationFactor))),
(int)Options.ReconnectionDelayMax.TotalMilliseconds));
(this as IManager).Close(false);
State = States.Reconnecting;
for (int i = 0; i < Sockets.Count; ++i)
(Sockets[i] as ISocket).Open();
// In the Close() function we unregistered
HTTPManager.Heartbeats.Subscribe(this);
HTTPManager.Logger.Information("SocketManager", "Reconnecting");
}
///
/// Called by transports when they are connected to the server.
///
bool IManager.OnTransportConnected(ITransport trans)
{
if (State != States.Opening)
return false;
if (PreviousState == States.Reconnecting)
(this as IManager).EmitEvent("reconnect");
State = States.Open;
ReconnectAttempts = 0;
// Send out packets that we collected while there were no available transport.
SendOfflinePackets();
HTTPManager.Logger.Information("SocketManager", "Open");
#if !BESTHTTP_DISABLE_WEBSOCKET
// Can we upgrade to WebSocket transport?
if (Transport.Type != TransportTypes.WebSocket &&
Handshake.Upgrades.Contains("websocket"))
{
UpgradingTransport = new WebSocketTransport(this);
UpgradingTransport.Open();
}
#endif
return true;
}
void IManager.OnTransportError(ITransport trans, string err)
{
(this as IManager).EmitError(SocketIOErrors.Internal, err);
trans.Close();
(this as IManager).TryToReconnect();
}
void IManager.OnTransportProbed(ITransport trans)
{
HTTPManager.Logger.Information("SocketManager", "\"probe\" packet received");
// If we have to reconnect, we will go straight with the transport we were able to upgrade
Options.ConnectWith = trans.Type;
// Pause ourself to wait for any send and receive turn to finish.
State = States.Paused;
}
#endregion
#region Packet Handling
///
/// Select the best transport to send out packets.
///
private ITransport SelectTransport()
{
if (State != States.Open || Transport == null)
return null;
return Transport.IsRequestInProgress ? null : Transport;
}
///
/// Will select the best transport and sends out all packets that are in the OfflinePackets list.
///
private void SendOfflinePackets()
{
ITransport trans = SelectTransport();
// Send out packets that we not sent while no transport was available.
// This function is called before the event handlers get the 'connected' event, so
// theoretically the packet orders are remains.
if (OfflinePackets != null && OfflinePackets.Count > 0 && trans != null)
{
trans.Send(OfflinePackets);
OfflinePackets.Clear();
}
}
///
/// Internal function that called from the Socket class. It will send out the packet instantly, or if no transport is available it will store
/// the packet in the OfflinePackets list.
///
void IManager.SendPacket(Packet packet)
{
ITransport trans = SelectTransport();
if (trans != null)
{
try
{
trans.Send(packet);
}
catch(Exception ex)
{
(this as IManager).EmitError(SocketIOErrors.Internal, ex.Message + " " + ex.StackTrace);
}
}
else
{
if (OfflinePackets == null)
OfflinePackets = new List();
// The same packet can be sent through multiple Sockets.
OfflinePackets.Add(packet.Clone());
}
}
///
/// Called from the currently operating Transport. Will pass forward to the Socket that has to call the callbacks.
///
void IManager.OnPacket(Packet packet)
{
if (State == States.Closed)
return;
switch(packet.TransportEvent)
{
case TransportEventTypes.Open:
if (Handshake == null)
{
Handshake = new HandshakeData();
if (!Handshake.Parse(packet.Payload))
HTTPManager.Logger.Warning("SocketManager", "Expected handshake data, but wasn't able to pars. Payload: " + packet.Payload);
(this as IManager).OnTransportConnected(Transport);
return;
}
break;
case TransportEventTypes.Ping:
(this as IManager).SendPacket(new Packet(TransportEventTypes.Pong, SocketIOEventTypes.Unknown, "/", string.Empty));
break;
case TransportEventTypes.Pong:
IsWaitingPong = false;
break;
}
Socket socket = null;
if (Namespaces.TryGetValue(packet.Namespace, out socket))
(socket as ISocket).OnPacket(packet);
else
HTTPManager.Logger.Warning("SocketManager", "Namespace \"" + packet.Namespace + "\" not found!");
}
#endregion
///
/// Sends an event to all available namespaces.
///
public void EmitAll(string eventName, params object[] args)
{
for (int i = 0; i < Sockets.Count; ++i)
Sockets[i].Emit(eventName, args);
}
///
/// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
///
void IManager.EmitEvent(string eventName, params object[] args)
{
Socket socket = null;
if (Namespaces.TryGetValue("/", out socket))
(socket as ISocket).EmitEvent(eventName, args);
}
///
/// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
///
void IManager.EmitEvent(SocketIOEventTypes type, params object[] args)
{
(this as IManager).EmitEvent(EventNames.GetNameFor(type), args);
}
void IManager.EmitError(SocketIOErrors errCode, string msg)
{
(this as IManager).EmitEvent(SocketIOEventTypes.Error, new Error(errCode, msg));
}
void IManager.EmitAll(string eventName, params object[] args)
{
for (int i = 0; i < Sockets.Count; ++i)
(Sockets[i] as ISocket).EmitEvent(eventName, args);
}
#region IHeartbeat Implementation
///
/// Called from the HTTPManager's OnUpdate function every frame. It's main function is to send out heartbeat messages.
///
void IHeartbeat.OnHeartbeatUpdate(TimeSpan dif)
{
switch (State)
{
case States.Paused:
// To ensure no messages are lost, the upgrade packet will only be sent once all the buffers of the existing transport are flushed and the transport is considered paused.
if (!Transport.IsRequestInProgress &&
!Transport.IsPollingInProgress)
{
State = States.Open;
// Close the current transport
Transport.Close();
// and switch to the newly upgraded one
Transport = UpgradingTransport;
UpgradingTransport = null;
// We will send an Upgrade("5") packet.
Transport.Send(new Packet(TransportEventTypes.Upgrade, SocketIOEventTypes.Unknown, "/", string.Empty));
goto case States.Open;
}
break;
case States.Opening:
if (DateTime.UtcNow - ConnectionStarted >= Options.Timeout)
{
(this as IManager).EmitError(SocketIOErrors.Internal, "Connection timed out!");
(this as IManager).EmitEvent("connect_error");
(this as IManager).EmitEvent("connect_timeout");
(this as IManager).TryToReconnect();
}
break;
case States.Reconnecting:
if (ReconnectAt != DateTime.MinValue && DateTime.UtcNow >= ReconnectAt)
{
(this as IManager).EmitEvent("reconnect_attempt");
(this as IManager).EmitEvent("reconnecting");
Open();
}
break;
case States.Open:
ITransport trans = null;
// Select transport to use
if (Transport != null && Transport.State == TransportStates.Open)
trans = Transport;
// not yet open?
if (trans == null || trans.State != TransportStates.Open)
return;
// Start to poll the server for events
trans.Poll();
// Start to send out unsent packets
SendOfflinePackets();
// First time we reached this point. SaveLocal the LastHeartbeat to the current time, 'cause we are just opened.
if (LastHeartbeat == DateTime.MinValue)
{
LastHeartbeat = DateTime.UtcNow;
return;
}
// It's time to send out a ping event to the server
if (!IsWaitingPong && DateTime.UtcNow - LastHeartbeat > Handshake.PingInterval)
{
(this as IManager).SendPacket(new Packet(TransportEventTypes.Ping, SocketIOEventTypes.Unknown, "/", string.Empty));
LastHeartbeat = DateTime.UtcNow;
IsWaitingPong = true;
}
// No pong event received in the given time, we are disconnected.
if (IsWaitingPong && DateTime.UtcNow - LastHeartbeat > Handshake.PingTimeout)
{
IsWaitingPong = false;
(this as IManager).TryToReconnect();
}
break; // case States.Open:
}
}
#endregion
}
}
#endif