IMQTTService.cs 12 KB


  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Net.Security;
  5. using System.Security.Cryptography.X509Certificates;
  6. using System.Text;
  7. using System.Threading;
  8. using Blue;
  9. using UnityEngine;
  10. using uPLibrary.Networking.M2Mqtt;
  11. using uPLibrary.Networking.M2Mqtt.Messages;
  12. public interface IMQTTService : IService
  13. {
  14. #region MQTT代理配置
  15. /// <summary>
  16. /// IP地址或URL
  17. /// </summary>
  18. string brokerAddress{ get; set; }
  19. /// <summary>
  20. /// 端口号
  21. /// </summary>
  22. int brokerPort{ get;set; }
  23. /// <summary>
  24. /// 使用加密连接
  25. /// </summary>
  26. bool isEncrypted{ get; set; }
  27. #endregion
  28. #region MQTT连接参数
  29. /// <summary>
  30. /// 到代理的连接延迟了给定的毫秒
  31. /// </summary>
  32. int connectionDelay{ get; set; }
  33. /// <summary>
  34. /// 连接超时(毫秒)
  35. /// </summary>
  36. int timeoutOnConnection{ get; set; }
  37. /// <summary>
  38. /// 启动时自动连接
  39. /// </summary>
  40. bool autoConnect{ get; set; }
  41. /// <summary>
  42. /// MQTT代理的用户名。如果不需要用户名,请保留为空
  43. /// </summary>
  44. string mqttUserName{ get;set; }
  45. /// <summary>
  46. /// MQTT代理的密码。如果不需要密码,请保持空
  47. /// </summary>
  48. string mqttPassword{ get;set; }
  49. #endregion
  50. #region 连接
  51. /// <summary>
  52. /// 使用当前设置连接到代理
  53. /// </summary>
  54. void Connect();
  55. /// <summary>
  56. /// 连接中
  57. /// </summary>
  58. void OnConnecting();
  59. /// <summary>
  60. /// 连接成功
  61. /// </summary>
  62. void OnConnected();
  63. /// <summary>
  64. /// 断开连接(如果已连接)
  65. /// </summary>
  66. void Disconnect();
  67. /// <summary>
  68. /// 连接失败
  69. /// </summary>
  70. /// <param name="errorMessage">失败消息</param>
  71. void OnConnectionFailed(string errorMessage);
  72. #endregion
  73. /// <summary>
  74. /// 订阅MQTT主题
  75. /// </summary>
  76. void SubscribeTopics(string[] topics, byte[] qosLevels);
  77. /// <summary>
  78. /// 取消订阅MQTT主题
  79. /// </summary>
  80. void UnsubscribeTopics(string[] topics);
  81. /// <summary>
  82. /// 发布消息
  83. /// </summary>
  84. void Publish(string topic,string message);
  85. void Update();
  86. void OnApplicationQuit();
  87. }
  88. public class MQTTService : IMQTTService
  89. {
  90. protected MqttClient client;
  91. #region MQTT代理配置
  92. public string brokerAddress { get; set; }
  93. public int brokerPort { get; set; }
  94. public bool isEncrypted { get; set; } = false;
  95. #endregion
  96. #region MQTT代理配置
  97. public int connectionDelay { get; set; }= 500;
  98. public int timeoutOnConnection { get; set; }= MqttSettings.MQTT_CONNECT_TIMEOUT;
  99. public bool autoConnect { get; set; }= false;
  100. public string mqttUserName { get; set; }= null;
  101. public string mqttPassword { get; set; } = null;
  102. #endregion
  103. #region 公共
  104. /// <summary>
  105. /// 成功建立连接时激发的事件
  106. /// </summary>
  107. public event Action ConnectionSucceeded;
  108. /// <summary>
  109. /// 连接失败时激发的事件
  110. /// </summary>
  111. public event Action ConnectionFailed;
  112. #endregion
  113. public void OnInit()
  114. {
  115. frontMessageQueue = messageQueue1;
  116. backMessageQueue = messageQueue2;
  117. if (autoConnect)
  118. {
  119. Connect();
  120. }
  121. }
  122. #region 连接
  123. public void Connect()
  124. {
  125. if (client == null || !client.IsConnected)
  126. {
  127. ThreadStart ts = new ThreadStart(DoConnect);
  128. Thread th = new Thread(ts);
  129. th.Start();
  130. }
  131. Debug.LogWarning("开始连接");
  132. }
  133. public void OnConnecting()
  134. {
  135. Debug.LogWarning("连接中");
  136. }
  137. public void OnConnected()
  138. {
  139. Debug.LogWarning("连接完成");
  140. }
  141. public void Disconnect()
  142. {
  143. if (client != null)
  144. {
  145. CoroutineSystem.Instance.Start_Coroutine(DoDisconnect());
  146. }
  147. Debug.LogWarning("断开连接");
  148. }
  149. public void OnConnectionFailed(string errorMessage)
  150. {
  151. Debug.LogError("MQTT连接失败");
  152. ConnectionFailed?.Invoke();
  153. /*
  154. if (ConnectionFailed != null)
  155. {
  156. ConnectionFailed();
  157. }
  158. */
  159. }
  160. #endregion
  161. private string[] SubTopics;
  162. public void SubscribeTopics(string[] topics, byte[] qosLevels)
  163. {
  164. SubTopics = topics;
  165. client.Subscribe(topics, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
  166. string str ="";
  167. foreach(var topic in topics)
  168. {
  169. str += topic+"\n";
  170. }
  171. Debug.LogWarning("订阅主题:"+"\n"+str);
  172. }
  173. public void UnsubscribeTopics(string[] topics)
  174. {
  175. client.Unsubscribe(topics);
  176. }
  177. public void Publish(string topic,string message)
  178. {
  179. client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
  180. }
  181. public void Update()
  182. {
  183. ProcessMqttEvents();
  184. }
  185. public void OnApplicationQuit()
  186. {
  187. CloseConnection();
  188. }
  189. #region 私有
  190. private List<MqttMsgPublishEventArgs> messageQueue1 = new List<MqttMsgPublishEventArgs>();
  191. private List<MqttMsgPublishEventArgs> messageQueue2 = new List<MqttMsgPublishEventArgs>();
  192. private List<MqttMsgPublishEventArgs> frontMessageQueue = null; // 前端消息队列
  193. private List<MqttMsgPublishEventArgs> backMessageQueue = null; // 返回消息队列
  194. private bool mqttClientConnectionClosed = false;
  195. private bool mqttClientConnected = false;
  196. /// <summary>
  197. /// 解码消息
  198. /// </summary>
  199. private void DecodeMessage(string topic, byte[] message)
  200. {
  201. string msg = Encoding.UTF8.GetString(message);
  202. StoreMessage(msg);
  203. Debug.LogFormat("收到{0}主题的消息:\n{1} ", topic,msg);
  204. }
  205. private List<string> eventMessages = new List<string>();
  206. /// <summary>
  207. /// 存储消息
  208. /// </summary>
  209. private void StoreMessage(string eventMsg)
  210. {
  211. eventMessages.Add(eventMsg);
  212. }
  213. /// <summary>
  214. /// 断开连接
  215. /// </summary>
  216. private void OnDisconnected()
  217. {
  218. Debug.LogError("断开连接`");
  219. }
  220. /// <summary>
  221. /// 连接丢失
  222. /// </summary>
  223. private void OnConnectionLost()
  224. {
  225. Debug.LogError("连接丢失");
  226. }
  227. /// <summary>
  228. /// 处理Mqtt事件
  229. /// </summary>
  230. private void ProcessMqttEvents()
  231. {
  232. // 处理主队列中的消息
  233. SwapMqttMessageQueues();
  234. ProcessMqttMessageBackgroundQueue();
  235. // 同时处理消息收入
  236. SwapMqttMessageQueues();
  237. ProcessMqttMessageBackgroundQueue();
  238. if (mqttClientConnectionClosed)
  239. {
  240. mqttClientConnectionClosed = false;
  241. OnConnectionLost();
  242. }
  243. }
  244. private void ProcessMqttMessageBackgroundQueue()
  245. {
  246. foreach (MqttMsgPublishEventArgs msg in backMessageQueue)
  247. {
  248. DecodeMessage(msg.Topic, msg.Message);
  249. }
  250. backMessageQueue.Clear();
  251. }
  252. /// <summary>
  253. /// 交换消息队列以在处理队列时继续接收消息。
  254. /// </summary>
  255. private void SwapMqttMessageQueues()
  256. {
  257. frontMessageQueue = frontMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
  258. backMessageQueue = backMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
  259. }
  260. private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs msg)
  261. {
  262. frontMessageQueue.Add(msg);
  263. }
  264. private void OnMqttConnectionClosed(object sender, EventArgs e)
  265. {
  266. // 仅在连接时设置意外连接关闭(避免在受控断开的情况下进行事件处理)
  267. mqttClientConnectionClosed = mqttClientConnected;
  268. mqttClientConnected = false;
  269. }
  270. /// <summary>
  271. /// 使用当前设置连接到代理。
  272. /// </summary>
  273. /// <returns>执行是在协程中完成的</returns>
  274. private void DoConnect()
  275. {
  276. // 等待给定的延迟
  277. // yield return new WaitForSecondsRealtime(connectionDelay / 1000f);
  278. // 给Unity留点时间刷新UI
  279. // yield return new WaitForEndOfFrame();
  280. if (client == null)
  281. {
  282. #if (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP)
  283. client = new MqttClient(brokerAddress,brokerPort,isEncrypted, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
  284. #else
  285. X509Certificate cert = new X509Certificate(Application.streamingAssetsPath + "/emqxsl-ca.crt");
  286. Debug.LogWarning("ip地址或url:"+ brokerAddress);
  287. client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
  288. //System.Security.Cryptography.X509Certificates.X509Certificate cert = new System.Security.Cryptography.X509Certificates.X509Certificate();
  289. // client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert,userCertificateSelectionCallback, MyRemoteCertificateValidationCallback);
  290. #endif
  291. }
  292. else if (client.IsConnected)
  293. {
  294. Debug.LogError("IsConnected");
  295. // yield break;
  296. }
  297. OnConnecting();
  298. // 给Unity留点时间刷新UI
  299. // yield return new WaitForEndOfFrame();
  300. // yield return new WaitForEndOfFrame();
  301. client.Settings.TimeoutOnConnection = timeoutOnConnection;
  302. string clientId = Guid.NewGuid().ToString();
  303. client.Connect(clientId, mqttUserName, mqttPassword);
  304. if (client.IsConnected)
  305. {
  306. client.ConnectionClosed += OnMqttConnectionClosed;
  307. // register to message received
  308. client.MqttMsgPublishReceived += OnMqttMessageReceived;
  309. mqttClientConnected = true;
  310. OnConnected();
  311. }
  312. else
  313. {
  314. OnConnectionFailed("CONNECTION FAILED!");
  315. }
  316. }
  317. /// <summary>
  318. /// 用户证书选择回调
  319. /// </summary>
  320. private bool userCertificateSelectionCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
  321. {
  322. throw new NotImplementedException();
  323. }
  324. /// <summary>
  325. /// 远程证书验证回调
  326. /// </summary>
  327. private bool MyRemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
  328. {
  329. throw new NotImplementedException();
  330. }
  331. private IEnumerator DoDisconnect()
  332. {
  333. yield return new WaitForEndOfFrame();
  334. CloseConnection();
  335. OnDisconnected();
  336. }
  337. private void CloseConnection()
  338. {
  339. mqttClientConnected = false;
  340. if (client != null)
  341. {
  342. if (client.IsConnected)
  343. {
  344. UnsubscribeTopics(SubTopics);
  345. SubTopics = null;
  346. client.Disconnect();
  347. }
  348. client.MqttMsgPublishReceived -= OnMqttMessageReceived;
  349. client.ConnectionClosed -= OnMqttConnectionClosed;
  350. client = null;
  351. }
  352. }
  353. #if ((!UNITY_EDITOR && UNITY_WSA_10_0))
  354. private void OnApplicationFocus(bool focus)
  355. {
  356. // 在UWP10(HoloLens)上,我们无法判断应用程序是真的关闭了还是最小化了
  357. // (https://forum.unity.com/threads/onapplicationquit-and-ondestroy-are-not-called-on-uwp-10.462597/)
  358. if (focus)
  359. {
  360. Connect();
  361. }
  362. else
  363. {
  364. CloseConnection();
  365. }
  366. }
  367. #endif
  368. #endregion
  369. }