WebSocketSignaling.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Security.Authentication;
  6. using System.Text;
  7. using System.Threading;
  8. using Unity.WebRTC;
  9. using UnityEngine;
  10. using WebSocketSharp;
  11. using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
  12. namespace Unity.RenderStreaming.Signaling
  13. {
  14. public class WebSocketSignaling : ISignaling
  15. {
  16. private static HashSet<WebSocketSignaling> instances = new HashSet<WebSocketSignaling>();
  17. private string m_url;
  18. private readonly float m_timeout;
  19. private readonly SynchronizationContext m_mainThreadContext;
  20. private bool m_running;
  21. private Thread m_signalingThread;
  22. private readonly AutoResetEvent m_wsCloseEvent;
  23. private WebSocket m_webSocket;
  24. public string Url { get { return m_url; } }
  25. public WebSocketSignaling(SignalingSettings signalingSettings, SynchronizationContext mainThreadContext)
  26. {
  27. if(signalingSettings == null)
  28. throw new ArgumentNullException(nameof(signalingSettings));
  29. if(!(signalingSettings is WebSocketSignalingSettings settings))
  30. throw new ArgumentException("signalingSettings is not WebSocketSignalingSettings");
  31. m_url = settings.url;
  32. m_timeout = 5.0f;
  33. m_mainThreadContext = mainThreadContext;
  34. m_wsCloseEvent = new AutoResetEvent(false);
  35. if (instances.Any(x => x.Url == m_url))
  36. {
  37. Debug.LogWarning($"Other {nameof(WebSocketSignaling)} exists with same URL:{m_url}. Signaling process may be in conflict.");
  38. }
  39. instances.Add(this);
  40. }
  41. ~WebSocketSignaling()
  42. {
  43. if (m_running)
  44. Stop();
  45. instances.Remove(this);
  46. }
  47. public void Start()
  48. {
  49. if (m_running)
  50. throw new InvalidOperationException("This object is already started.");
  51. m_running = true;
  52. m_signalingThread = new Thread(WSManage);
  53. m_signalingThread.Start();
  54. }
  55. public void Stop()
  56. {
  57. if (m_running)
  58. {
  59. m_running = false;
  60. m_webSocket?.Close();
  61. if (m_signalingThread.ThreadState == ThreadState.WaitSleepJoin)
  62. {
  63. m_signalingThread.Abort();
  64. }
  65. else
  66. {
  67. m_signalingThread.Join(1000);
  68. }
  69. m_signalingThread = null;
  70. }
  71. }
  72. public event OnStartHandler OnStart;
  73. public event OnConnectHandler OnCreateConnection;
  74. public event OnDisconnectHandler OnDestroyConnection;
  75. public event OnOfferHandler OnOffer;
  76. #pragma warning disable 0067
  77. // this event is never used in this class
  78. public event OnAnswerHandler OnAnswer;
  79. #pragma warning restore 0067
  80. public event OnIceCandidateHandler OnIceCandidate;
  81. public void SendOffer(string connectionId, RTCSessionDescription offer)
  82. {
  83. DescData data = new DescData();
  84. data.connectionId = connectionId;
  85. data.sdp = offer.sdp;
  86. data.type = "offer";
  87. RoutedMessage<DescData> routedMessage = new RoutedMessage<DescData>();
  88. routedMessage.from = connectionId;
  89. routedMessage.data = data;
  90. routedMessage.type = "offer";
  91. WSSend(routedMessage);
  92. }
  93. public void SendAnswer(string connectionId, RTCSessionDescription answer)
  94. {
  95. DescData data = new DescData();
  96. data.connectionId = connectionId;
  97. data.sdp = answer.sdp;
  98. data.type = "answer";
  99. RoutedMessage<DescData> routedMessage = new RoutedMessage<DescData>();
  100. routedMessage.from = connectionId;
  101. routedMessage.data = data;
  102. routedMessage.type = "answer";
  103. WSSend(routedMessage);
  104. }
  105. public void SendCandidate(string connectionId, RTCIceCandidate candidate)
  106. {
  107. CandidateData data = new CandidateData();
  108. data.connectionId = connectionId;
  109. data.candidate = candidate.Candidate;
  110. data.sdpMLineIndex = candidate.SdpMLineIndex.GetValueOrDefault(0);
  111. data.sdpMid = candidate.SdpMid;
  112. RoutedMessage<CandidateData> routedMessage = new RoutedMessage<CandidateData>();
  113. routedMessage.from = connectionId;
  114. routedMessage.data = data;
  115. routedMessage.type = "candidate";
  116. WSSend(routedMessage);
  117. }
  118. public void OpenConnection(string connectionId)
  119. {
  120. this.WSSend($"{{\"type\":\"connect\", \"connectionId\":\"{connectionId}\"}}");
  121. }
  122. public void CloseConnection(string connectionId)
  123. {
  124. this.WSSend($"{{\"type\":\"disconnect\", \"connectionId\":\"{connectionId}\"}}");
  125. }
  126. private void WSManage()
  127. {
  128. while (m_running)
  129. {
  130. WSCreate();
  131. try
  132. {
  133. m_wsCloseEvent.WaitOne();
  134. Thread.Sleep((int)(m_timeout * 1000));
  135. }
  136. catch (ThreadAbortException)
  137. {
  138. // Thread.Abort() called from main thread. Ignore
  139. return;
  140. }
  141. }
  142. Debug.Log("Signaling: WS managing thread ended");
  143. }
  144. private void WSCreate()
  145. {
  146. if (File.Exists("D:/Socket.txt"))
  147. {
  148. m_url= File.ReadAllText("D:/Socket.txt");
  149. }
  150. #if !UNITY_EDITOR
  151. if(File.Exists("/storage/emulated/0/Android/data/com.imm.sdk_GHZ/files/Socket.txt"))
  152. {
  153. m_url= File.ReadAllText("/storage/emulated/0/Android/data/com.imm.sdk_GHZ/files/Socket.txt");
  154. }
  155. #endif
  156. m_webSocket = new WebSocket(m_url);
  157. if (m_url.StartsWith("wss"))
  158. {
  159. m_webSocket.SslConfiguration.EnabledSslProtocols =
  160. SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12;
  161. }
  162. m_webSocket.OnOpen += WSConnected;
  163. m_webSocket.OnMessage += WSProcessMessage;
  164. m_webSocket.OnError += WSError;
  165. m_webSocket.OnClose += WSClosed;
  166. Monitor.Enter(m_webSocket);
  167. Debug.Log($"Signaling: Connecting WS {m_url}");
  168. m_webSocket.ConnectAsync();
  169. }
  170. private void WSProcessMessage(object sender, MessageEventArgs e)
  171. {
  172. var content = Encoding.UTF8.GetString(e.RawData);
  173. Debug.Log($"Signaling: Receiving message: {content}");
  174. try
  175. {
  176. var routedMessage = JsonUtility.FromJson<RoutedMessage<SignalingMessage>>(content);
  177. SignalingMessage msg;
  178. if (!string.IsNullOrEmpty(routedMessage.type))
  179. {
  180. msg = routedMessage.data;
  181. }
  182. else
  183. {
  184. msg = JsonUtility.FromJson<SignalingMessage>(content);
  185. }
  186. if (!string.IsNullOrEmpty(routedMessage.type))
  187. {
  188. if (routedMessage.type == "connect")
  189. {
  190. msg = JsonUtility.FromJson<SignalingMessage>(content);
  191. m_mainThreadContext.Post(d => OnCreateConnection?.Invoke(this, msg.connectionId, msg.polite), null);
  192. }
  193. else if (routedMessage.type == "disconnect")
  194. {
  195. msg = JsonUtility.FromJson<SignalingMessage>(content);
  196. m_mainThreadContext.Post(d => OnDestroyConnection?.Invoke(this, msg.connectionId), null);
  197. }
  198. else if (routedMessage.type == "offer")
  199. {
  200. DescData offer = new DescData();
  201. offer.connectionId = routedMessage.from;
  202. offer.sdp = msg.sdp;
  203. offer.polite = msg.polite;
  204. m_mainThreadContext.Post(d => OnOffer?.Invoke(this, offer), null);
  205. }
  206. else if (routedMessage.type == "answer")
  207. {
  208. DescData answer = new DescData
  209. {
  210. connectionId = routedMessage.from,
  211. sdp = msg.sdp
  212. };
  213. m_mainThreadContext.Post(d => OnAnswer?.Invoke(this, answer), null);
  214. }
  215. else if (routedMessage.type == "candidate")
  216. {
  217. CandidateData candidate = new CandidateData
  218. {
  219. connectionId = routedMessage.@from,
  220. candidate = msg.candidate,
  221. sdpMLineIndex = msg.sdpMLineIndex,
  222. sdpMid = msg.sdpMid
  223. };
  224. m_mainThreadContext.Post(d => OnIceCandidate?.Invoke(this, candidate), null);
  225. }
  226. else if (routedMessage.type == "error")
  227. {
  228. msg = JsonUtility.FromJson<SignalingMessage>(content);
  229. Debug.LogError(msg.message);
  230. }
  231. }
  232. }
  233. catch (Exception ex)
  234. {
  235. Debug.LogError("Signaling: Failed to parse message: " + ex);
  236. }
  237. }
  238. private void WSConnected(object sender, EventArgs e)
  239. {
  240. Debug.Log("Signaling: WS connected.");
  241. m_mainThreadContext.Post(d => OnStart?.Invoke(this), null);
  242. }
  243. private void WSError(object sender, ErrorEventArgs e)
  244. {
  245. Debug.LogError($"Signaling: WS connection error: {e.Message}");
  246. }
  247. private void WSClosed(object sender, CloseEventArgs e)
  248. {
  249. Debug.Log($"Signaling: WS connection closed, code: {e.Code}");
  250. m_wsCloseEvent.Set();
  251. m_webSocket = null;
  252. }
  253. private void WSSend(object data)
  254. {
  255. if (m_webSocket == null || m_webSocket.ReadyState != WebSocketState.Open)
  256. {
  257. Debug.LogError("Signaling: WS is not connected. Unable to send message");
  258. return;
  259. }
  260. if (data is string s)
  261. {
  262. Debug.Log("Signaling: Sending WS data: " + s);
  263. m_webSocket.Send(s);
  264. }
  265. else
  266. {
  267. string str = JsonUtility.ToJson(data);
  268. Debug.Log("Signaling: Sending WS data: " + str);
  269. m_webSocket.Send(str);
  270. }
  271. }
  272. }
  273. }