QTTManager.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. using System;
  2. using System.IO;
  3. using System.Security.Cryptography.X509Certificates;
  4. using System.Threading;
  5. using UnityEngine;
  6. using uPLibrary.Networking.M2Mqtt;
  7. using uPLibrary.Networking.M2Mqtt.Messages;
  8. public class QTTManager
  9. {
  10. bool isConnect = false;
  11. bool isConnecting = false;
  12. /// <summary>
  13. /// 连接失败时激发的事件
  14. /// </summary>
  15. public event Action ConnectionFailed;
  16. /// <summary>
  17. /// 成功建立连接时激发的事件
  18. /// </summary>
  19. public event Action ConnectionSucceeded;
  20. MqttClient client;
  21. string brokerAddress;
  22. int brokerPort;
  23. bool isEncrypted;
  24. int timeoutOnConnection;
  25. string clientId;
  26. string mqttUserName = null; // MQTT代理的用户名。如果不需要用户名,请保留为空
  27. string mqttPassword = null; // MQTT代理的密码。如果不需要密码,请保持空
  28. // Start is called before the first frame update
  29. public QTTManager(string cid, string UName, string pwd, string url, string port)
  30. {
  31. brokerAddress = url;
  32. brokerPort = int.Parse(port);
  33. isEncrypted = false;
  34. timeoutOnConnection = MqttSettings.MQTT_CONNECT_TIMEOUT; // 连接超时(毫秒)
  35. this.clientId = cid;
  36. this.mqttUserName = UName;
  37. this.mqttPassword = pwd;
  38. }
  39. byte[] bytes;
  40. public Thread th;
  41. public void Connect(byte[] tbytes)
  42. {
  43. Debug.Log("HJJ Connect==>");
  44. if (isConnecting || isConnect)
  45. {
  46. return;
  47. }
  48. bytes = tbytes;//
  49. isConnecting = true;
  50. DoConnect();
  51. }
  52. public bool IsConnect()
  53. {
  54. if (isConnecting || isConnect)
  55. {
  56. return true;
  57. }
  58. return false;
  59. }
  60. // current message identifier generated
  61. private ushort messageIdCounter = 0;
  62. /// <summary>
  63. /// 订阅消息主题
  64. /// </summary>
  65. /// <param name="topics">要监听的主题数组</param>
  66. /// <param name="qosLevels">主题相关的 QOS levels</param>
  67. /// <returns>监听消息相关的消息ID</returns>
  68. public ushort Subscribe(string[] topics, byte[] qosLevels)
  69. {
  70. return client.Subscribe(topics, qosLevels);
  71. }
  72. /// <summary>
  73. /// 退订主题
  74. /// </summary>
  75. /// <param name="topics"></param>
  76. /// <returns></returns>
  77. public ushort Unsubscribe(string[] topics)
  78. {
  79. return client.Unsubscribe(topics);
  80. }
  81. public Action OnConnecting;
  82. /// <summary>
  83. /// 使用当前设置连接到代理。
  84. /// </summary>
  85. /// <returns>执行是在协程中完成的</returns>
  86. private void DoConnect()
  87. {
  88. Debug.Log("HJJ DoConnect==>");
  89. // 等待给定的延迟
  90. // yield return new WaitForSecondsRealtime(connectionDelay / 1000f);
  91. // 给Unity留点时间刷新UI
  92. // yield return new WaitForEndOfFrame();
  93. // create client instance
  94. X509Certificate cert = new X509Certificate(bytes);
  95. Debug.Log("IP地址或URL==>" + brokerAddress);
  96. isEncrypted = false;
  97. client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
  98. OnConnecting?.Invoke();
  99. // 给Unity留点时间刷新UI
  100. // yield return new WaitForEndOfFrame();
  101. // yield return new WaitForEndOfFrame();
  102. client.Settings.TimeoutOnConnection = timeoutOnConnection;
  103. client.Connect(clientId, mqttUserName, mqttPassword);
  104. if (client.IsConnected)
  105. {
  106. Debug.Log("HJJ IsConnected==>");
  107. isConnecting = false;
  108. isConnect = true;
  109. client.ConnectionClosed += OnMqttConnectionClosed;
  110. // register to message received
  111. client.MqttMsgPublishReceived += OnMqttMessageReceived;
  112. OnConnected();
  113. }
  114. else
  115. {
  116. Debug.Log("HJJ FAILED==>");
  117. isConnecting = false;
  118. isConnect = false;
  119. OnConnectionFailed("CONNECTION FAILED!");
  120. }
  121. }
  122. public void DisConnect()
  123. {
  124. client?.Disconnect();
  125. }
  126. /// <summary>
  127. /// 如果连接失败,请重写此方法以执行某些操作
  128. /// </summary>
  129. protected virtual void OnConnectionFailed(string errorMessage)
  130. {
  131. Debug.LogError("MQTT连接失败");
  132. if (ConnectionFailed != null)
  133. {
  134. ConnectionFailed();
  135. }
  136. }
  137. /// <summary>
  138. /// 如果连接成功,请重写此方法以执行某些操作。
  139. /// </summary>
  140. protected virtual void OnConnected()
  141. {
  142. Debug.LogFormat("已经连接到 {0}:{1}...\n", brokerAddress, brokerPort.ToString());
  143. if (ConnectionSucceeded != null)
  144. {
  145. ConnectionSucceeded();
  146. }
  147. }
  148. public Action<MqttMsgPublishEventArgs> OnReceived;
  149. private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs e)
  150. {
  151. OnReceived?.Invoke(e);
  152. }
  153. public Action<EventArgs> OnClose;
  154. private void OnMqttConnectionClosed(object sender, EventArgs e)
  155. {
  156. isConnect = false;
  157. OnClose?.Invoke(e);
  158. }
  159. /// <summary>
  160. /// 异步发布消息
  161. /// </summary>
  162. /// <param name="topic">Message topic</param>
  163. /// <param name="message">Message data (payload)</param>
  164. /// <param name="qosLevel">QoS Level</param>
  165. /// <param name="retain">Retain flag</param>
  166. /// <returns>Message Id related to PUBLISH message</returns>
  167. public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
  168. {
  169. return client.Publish(topic, message, qosLevel, retain);
  170. }
  171. public class MQttReceived
  172. {
  173. public string topic;
  174. public byte[] message;
  175. }
  176. }