MQTTClient.cs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. using LitJson;
  2. using login;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using UnityEngine;
  8. using UnityEngine.Android;
  9. using uPLibrary.Networking.M2Mqtt.Messages;
  10. using static QTTManager;
  11. public class MQTTClient : MonoSingleton<MQTTClient>
  12. {
  13. //public string front = "client/";
  14. // public string id = "";
  15. //public string username = "u@unity3";// 需要根据服务器设置
  16. //public string password = null;// 需要根据服务器设置
  17. public string rid = "mqttx_b4c02ddc"; //其他人的ID
  18. //public string account;
  19. //public string roomId;
  20. //public string email;
  21. //public string phone;
  22. //public string roleId;
  23. //public string resourcePool;
  24. //public string expertType;
  25. //public string _username;
  26. public event Action<JsonData> OnCoordinate;
  27. public static byte[] bytes;
  28. // public static string certification;
  29. // public static byt ta;
  30. QTTManager qt;
  31. // Start is called before the first frame update
  32. private int ReconnectionCount = 0;
  33. public MqttState mqttState = MqttState.start;
  34. void Start()
  35. {
  36. // 请求文件读取和写入权限
  37. if (!Permission.HasUserAuthorizedPermission(Permission.ExternalStorageRead) ||
  38. !Permission.HasUserAuthorizedPermission(Permission.ExternalStorageWrite))
  39. {
  40. Permission.RequestUserPermission(Permission.ExternalStorageRead);
  41. Permission.RequestUserPermission(Permission.ExternalStorageWrite);
  42. }
  43. bytes = (Resources.Load("emqxsl-ca") as TextAsset).bytes;
  44. Debug.Log(" DGJ ===> emqxsl-ca.bytes " + bytes.Length);
  45. }
  46. Queue<MqttMsgPublishEventArgs> rlist = new Queue<MqttMsgPublishEventArgs>();
  47. Queue<ShowError> errorList = new Queue<ShowError>();
  48. private void OnReceived(MqttMsgPublishEventArgs obj)
  49. {
  50. rlist.Enqueue(obj);
  51. }
  52. private void OnConnecting()
  53. {
  54. Debug.Log("MQtt 连接中");
  55. }
  56. private void onSucceed()
  57. {
  58. // ErrorPopup.Instance.UpdateEmqxState(NetState.Connect);
  59. ReconnectionCount = 0;
  60. if (mqttState == MqttState.reconnection)
  61. {
  62. // ErrorPopup.Instance.ShowNetError("网络重新连接", 3);
  63. errorList.Enqueue(new ShowError("信令 重连成功", 3));
  64. }
  65. mqttState = MqttState.successfu;
  66. // Subscribe(MQTTManager.Instance.front + "online/" + MQTTManager.Instance.phone);
  67. //Subscribe(MQTTManager.Instance.front + "personnel/" + MQTTManager.Instance.resourcePool);
  68. if( !string.IsNullOrEmpty( MQTTManager.Instance.roomId))
  69. Subscribe(MQTTManager.Instance.front + "room/" + MQTTManager.Instance.roomId);
  70. Subscribe(MQTTManager.Instance.front + "personnel/" + MQTTManager.Instance.resourcePoolId);
  71. Subscribe(MQTTManager.Instance.front + "online/" + MQTTManager.Instance.id);
  72. Subscribe(MQTTManager.Instance.front + "sensor/VOICE"); //噪声
  73. Subscribe(MQTTManager.Instance.front + "sensor/TEMPERATURE_HUMIDITY"); //温湿度传感器
  74. Subscribe(MQTTManager.Instance.front + "sensor/VIBRATE"); //振动传感器
  75. //if (!string.IsNullOrEmpty( MQTTManager.Instance.topiceNavigation))
  76. //{
  77. // Subscribe(MQTTManager.Instance.topiceNavigation);
  78. // Subscribe(MQTTManager.Instance.topiceSync);
  79. // Subscribe(MQTTManager.Instance.topiceClose);
  80. //}
  81. // Subscribe((MQTTManager.Instance.front + "younuo"));
  82. // MQTTManager.Instance.isCreateRoom = true;
  83. }
  84. private void onFaild()
  85. {
  86. Debug.Log("MQtt 连接失败");
  87. }
  88. // Update is called once per frame
  89. void Update()
  90. {
  91. if (rlist.Count > 0)
  92. {
  93. for (int i = 0; i < rlist.Count; i++)
  94. {
  95. OnUnityReceived(rlist.Dequeue());
  96. }
  97. }
  98. if (errorList.Count > 0)
  99. {
  100. ShowError error = errorList.Dequeue();
  101. // ErrorPopup.Instance.ShowEmqxError(error.error, error.timer);
  102. }
  103. }
  104. //MQTT接收到的数据
  105. public void OnUnityReceived(MqttMsgPublishEventArgs obj)
  106. {
  107. string msg = Encoding.UTF8.GetString(obj.Message);
  108. Debug.Log("uid => " + obj.Topic + ":\n" + msg);
  109. // UILogManager.Instance.text3.text = msg;
  110. // MQTTManager.Instance.Received(msg);
  111. MQTTManager.Instance.Received(obj);
  112. if (ProjectALLStateManager.Instance.isAIDevice)
  113. AIDeviceManager.Instance.Received(msg);
  114. }
  115. private void OnDestroy()
  116. {
  117. }
  118. private void OnApplicationQuit()
  119. {
  120. DisConnect();
  121. }
  122. //连接
  123. public void Connect()
  124. {
  125. Debug.Log(" DGJ ===> emqxsl-ca.bytes 1 " + bytes.Length);
  126. Debug.Log("DGJ ===> MQTT " + HttpActionLang.Instance.mqttSocket );
  127. Debug.Log(" DGJ ===> username " + MQTTManager.Instance.username);
  128. Debug.Log(" DGJ ===> clientId " + MQTTManager.Instance.clientId);
  129. Debug.Log(" DGJ ===> certification " + MQTTManager.certification);
  130. //if (DeviceType.type == "Phone")
  131. //{
  132. // id =UserInfo.Instance.name + "_Phone";
  133. // rid = front+UserInfo.Instance.name + "_Glasses";
  134. //}
  135. //else
  136. //{
  137. // id = UserInfo.Instance.name + "_Glasses";
  138. // rid = front+UserInfo.Instance.name + "_Phone";
  139. //}
  140. // Debug.Log("DGJ ==>MQTT " + id + account + " " + username + " " + certification + " " + HttpActionLang.Instance.mqttSocket);
  141. try
  142. {
  143. qt = new QTTManager(MQTTManager.Instance.clientId, MQTTManager.Instance.username, MQTTManager.certification, HttpActionLang.Instance.mqttSocket, "1883");
  144. Debug.Log("DGJ ===> MQTT " + qt!=null);
  145. qt.ConnectionFailed += onFaild;
  146. qt.ConnectionSucceeded += onSucceed;
  147. qt.OnConnecting += OnConnecting;
  148. qt.OnReceived += OnReceived;
  149. qt.OnClose += OnClose;
  150. qt.Connect(bytes);
  151. StartCoroutine(Reconnection());
  152. }
  153. catch (Exception e)
  154. {
  155. Debug.LogError("MQTT Connect Error "+ e.Message);
  156. }
  157. // StartSendCameraPos();
  158. }
  159. private void OnClose(EventArgs obj)
  160. {
  161. Debug.Log("断开连接");
  162. mqttState = MqttState.disconnect;
  163. // ErrorPopup.Instance.UpdateEmqxState(NetState.ReConnection);
  164. errorList.Enqueue(new ShowError("信令 断开连接 重连中... ", -1));
  165. }
  166. private IEnumerator Reconnection()
  167. {
  168. while (true)
  169. {
  170. yield return new WaitForSeconds(5);
  171. if (qt != null && !qt.IsConnect())
  172. {
  173. mqttState = MqttState.reconnection;
  174. if (ReconnectionCount >= 3)
  175. {
  176. Debug.LogError(" DGJ RTC服务重连失败,请退出应用重试 ");
  177. DisConnect();
  178. // ErrorPopup.Instance.UpdateEmqxState(NetState.DisConnect);
  179. errorList.Enqueue(new ShowError("信令重连失败,请检测网络状态", -1));
  180. // SignalClient.showErrorList.Enqueue(new ShowError("RTC服务重连失败,请退出应用重试", -1));
  181. }
  182. else
  183. {
  184. DisConnect();
  185. Debug.Log(" DGJ ===> Reconnection ");
  186. ReconnectionCount++;
  187. Connect();
  188. }
  189. }
  190. }
  191. }
  192. //断开连接
  193. public void DisConnect()
  194. {
  195. if (qt != null && qt.IsConnect())
  196. qt.DisConnect();
  197. }
  198. /// <summary>
  199. /// 订阅频道
  200. /// </summary>
  201. /// <param name="channelID"></param>
  202. public void Subscribe(string channelID)
  203. {
  204. Debug.Log(" DGJ =====> Subscribe " + channelID);
  205. if (qt != null && qt.IsConnect())
  206. {
  207. ushort s = qt.Subscribe(
  208. new string[]
  209. {
  210. channelID
  211. //front+"room/"+roomId
  212. },
  213. new byte[]
  214. {
  215. MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE,
  216. //MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE
  217. });
  218. Debug.Log(" DGJ =====> Subscribe2");
  219. }
  220. else
  221. Debug.LogError(" MQTT 未连接 !!!");
  222. }
  223. public void UnSubscribe(string channelID)
  224. {
  225. if (qt != null && qt.IsConnect())
  226. {
  227. // ushort s = qt.Unsubscribe(new string[] { channelID });
  228. }
  229. }
  230. public void publish(byte[] bs)
  231. {
  232. if (qt != null && qt.IsConnect())
  233. {
  234. // Debug.Log(id + account + " DGJ publish =====> " + front + "room/" + roomId + " " + bs.Length);
  235. // qt.Publish(front + "room/" + roomId, bs, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
  236. }
  237. else
  238. {
  239. Debug.LogError(" MQTT 未连接 ");
  240. }
  241. }
  242. public void publish(string channelID, byte[] bs)
  243. {
  244. if (qt != null && qt.IsConnect())
  245. {
  246. Debug.Log(MQTTManager.Instance.clientId + MQTTManager.Instance.account + " DGJ publish =====> " + channelID + " " + bs.Length);
  247. qt.Publish(channelID, bs, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
  248. }
  249. else
  250. {
  251. Debug.LogError(" MQTT 未连接 ");
  252. }
  253. }
  254. }
  255. public enum MqttState
  256. {
  257. start,
  258. successfu,
  259. reconnection,
  260. disconnect
  261. }
  262. public class ShowError
  263. {
  264. public string error;
  265. public float timer;
  266. public ShowError( string error,float timer)
  267. {
  268. this.error = error;
  269. this.timer = timer;
  270. }
  271. }