MQTTClient.cs 10 KB


  1. using LitJson;
  2. using login;
  3. using System;
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Text;
  7. using UnityEngine;
  8. using UnityEngine.Android;
  9. using uPLibrary.Networking.M2Mqtt.Messages;
  10. using static QTTManager;
  11. public class MQTTClient : MonoSingleton<MQTTClient>
  12. {
  13. //public string front = "client/";
  14. // public string id = "";
  15. //public string username = "u@unity3";// 需要根据服务器设置
  16. //public string password = null;// 需要根据服务器设置
  17. public string rid = "mqttx_b4c02ddc"; //其他人的ID
  18. //public string account;
  19. //public string roomId;
  20. //public string email;
  21. //public string phone;
  22. //public string roleId;
  23. //public string resourcePool;
  24. //public string expertType;
  25. //public string _username;
  26. public event Action<JsonData> OnCoordinate;
  27. public static byte[] bytes;
  28. // public static string certification;
  29. // public static byt ta;
  30. QTTManager qt;
  31. // Start is called before the first frame update
  32. private int ReconnectionCount = 0;
  33. public MqttState mqttState = MqttState.start;
  34. void Start()
  35. {
  36. // 请求文件读取和写入权限
  37. if (!Permission.HasUserAuthorizedPermission(Permission.ExternalStorageRead) ||
  38. !Permission.HasUserAuthorizedPermission(Permission.ExternalStorageWrite))
  39. {
  40. Permission.RequestUserPermission(Permission.ExternalStorageRead);
  41. Permission.RequestUserPermission(Permission.ExternalStorageWrite);
  42. }
  43. bytes = (Resources.Load("emqxsl-ca") as TextAsset).bytes;
  44. Debug.Log(" DGJ ===> emqxsl-ca.bytes " + bytes.Length);
  45. }
  46. Queue<MqttMsgPublishEventArgs> rlist = new Queue<MqttMsgPublishEventArgs>();
  47. Queue<ShowError> errorList = new Queue<ShowError>();
  48. private void OnReceived(MqttMsgPublishEventArgs obj)
  49. {
  50. rlist.Enqueue(obj);
  51. }
  52. private void OnConnecting()
  53. {
  54. Debug.Log("MQtt 连接中");
  55. }
  56. private void onSucceed()
  57. {
  58. // ErrorPopup.Instance.UpdateEmqxState(NetState.Connect);
  59. ReconnectionCount = 0;
  60. if (mqttState == MqttState.reconnection)
  61. {
  62. // ErrorPopup.Instance.ShowNetError("网络重新连接", 3);
  63. errorList.Enqueue(new ShowError("信令 重连成功", 3));
  64. }
  65. mqttState = MqttState.successfu;
  66. // Subscribe(MQTTManager.Instance.front + "online/" + MQTTManager.Instance.phone);
  67. //Subscribe(MQTTManager.Instance.front + "personnel/" + MQTTManager.Instance.resourcePool);
  68. if( !string.IsNullOrEmpty( MQTTManager.Instance.roomId))
  69. Subscribe(MQTTManager.Instance.front + "room/" + MQTTManager.Instance.roomId);
  70. Subscribe(MQTTManager.Instance.front + "personnel/" + MQTTManager.Instance.resourcePoolId);
  71. Subscribe(MQTTManager.Instance.front + "online/" + MQTTManager.Instance.id);
  72. Subscribe(MQTTManager.Instance.front + "sensor/VOICE"); //噪声
  73. Subscribe(MQTTManager.Instance.front + "sensor/TEMPERATURE_HUMIDITY"); //温湿度传感器
  74. Subscribe(MQTTManager.Instance.front + "sensor/VIBRATE"); //振动传感器
  75. Subscribe(MQTTManager.Instance.front + "room/RTC"); //RTC
  76. // Subscribe((MQTTManager.Instance.front + "younuo"));
  77. // MQTTManager.Instance.isCreateRoom = true;
  78. }
  79. private void onFaild()
  80. {
  81. Debug.Log("MQtt 连接失败");
  82. }
  83. // Update is called once per frame
  84. void Update()
  85. {
  86. if (rlist.Count > 0)
  87. {
  88. for (int i = 0; i < rlist.Count; i++)
  89. {
  90. OnUnityReceived(rlist.Dequeue());
  91. }
  92. }
  93. if (errorList.Count > 0)
  94. {
  95. ShowError error = errorList.Dequeue();
  96. // ErrorPopup.Instance.ShowEmqxError(error.error, error.timer);
  97. }
  98. }
  99. //MQTT接收到的数据
  100. public void OnUnityReceived(MqttMsgPublishEventArgs obj)
  101. {
  102. string msg = Encoding.UTF8.GetString(obj.Message);
  103. Debug.Log("uid => " + obj.Topic + ":\n" + msg);
  104. // UILogManager.Instance.text3.text = msg;
  105. // MQTTManager.Instance.Received(msg);
  106. MQTTManager.Instance.Received(obj);
  107. JsonData data = JsonMapper.ToObject(msg);
  108. if(data["device"].ToString()=="ROOM2")
  109. {
  110. if (RTCDemoManager.Instance&&RTCDemoManager.Instance.roomId!="")
  111. {
  112. JsonData obj2 = new JsonData();
  113. obj2["device"] = new JsonData();
  114. obj2["device"]["deviceCode"] = "ROOM";
  115. obj2["roomId"] = RTCDemoManager.Instance.roomId;
  116. // 用MQTT 向中考发送邀请
  117. MQTTManager.Instance.PushMsg(MQTTManager.Instance.front + "room/RTC", obj2.ToJson());
  118. }else
  119. {
  120. Debug.Log("创建房间");
  121. RTCDemoManager.Instance.CreatRoom();
  122. }
  123. }
  124. if (ProjectALLStateManager.Instance.isAIDevice)
  125. AIDeviceManager.Instance.Received(msg);
  126. }
  127. private void OnDestroy()
  128. {
  129. }
  130. private void OnApplicationQuit()
  131. {
  132. DisConnect();
  133. }
  134. //连接
  135. public void Connect()
  136. {
  137. Debug.Log(" DGJ ===> emqxsl-ca.bytes 1 " + bytes.Length);
  138. Debug.Log("DGJ ===> MQTT " + HttpActionLang.Instance.mqttSocket );
  139. Debug.Log(" DGJ ===> username " + MQTTManager.Instance.username);
  140. Debug.Log(" DGJ ===> clientId " + MQTTManager.Instance.clientId);
  141. Debug.Log(" DGJ ===> certification " + MQTTManager.certification);
  142. //if (DeviceType.type == "Phone")
  143. //{
  144. // id =UserInfo.Instance.name + "_Phone";
  145. // rid = front+UserInfo.Instance.name + "_Glasses";
  146. //}
  147. //else
  148. //{
  149. // id = UserInfo.Instance.name + "_Glasses";
  150. // rid = front+UserInfo.Instance.name + "_Phone";
  151. //}
  152. // Debug.Log("DGJ ==>MQTT " + id + account + " " + username + " " + certification + " " + HttpActionLang.Instance.mqttSocket);
  153. try
  154. {
  155. qt = new QTTManager(MQTTManager.Instance.clientId, MQTTManager.Instance.username, MQTTManager.certification, HttpActionLang.Instance.mqttSocket, "1883");
  156. Debug.Log("DGJ ===> MQTT " + qt!=null);
  157. qt.ConnectionFailed += onFaild;
  158. qt.ConnectionSucceeded += onSucceed;
  159. qt.OnConnecting += OnConnecting;
  160. qt.OnReceived += OnReceived;
  161. qt.OnClose += OnClose;
  162. qt.Connect(bytes);
  163. StartCoroutine(Reconnection());
  164. }
  165. catch (Exception e)
  166. {
  167. Debug.LogError("MQTT Connect Error "+ e.Message);
  168. }
  169. // StartSendCameraPos();
  170. }
  171. private void OnClose(EventArgs obj)
  172. {
  173. Debug.Log("断开连接");
  174. mqttState = MqttState.disconnect;
  175. // ErrorPopup.Instance.UpdateEmqxState(NetState.ReConnection);
  176. errorList.Enqueue(new ShowError("信令 断开连接 重连中... ", -1));
  177. }
  178. private IEnumerator Reconnection()
  179. {
  180. while (true)
  181. {
  182. yield return new WaitForSeconds(5);
  183. if (qt != null && !qt.IsConnect())
  184. {
  185. mqttState = MqttState.reconnection;
  186. if (ReconnectionCount >= 3)
  187. {
  188. Debug.LogError(" DGJ RTC服务重连失败,请退出应用重试 ");
  189. DisConnect();
  190. // ErrorPopup.Instance.UpdateEmqxState(NetState.DisConnect);
  191. errorList.Enqueue(new ShowError("信令重连失败,请检测网络状态", -1));
  192. // SignalClient.showErrorList.Enqueue(new ShowError("RTC服务重连失败,请退出应用重试", -1));
  193. }
  194. else
  195. {
  196. DisConnect();
  197. Debug.Log(" DGJ ===> Reconnection ");
  198. ReconnectionCount++;
  199. Connect();
  200. }
  201. }
  202. }
  203. }
  204. //断开连接
  205. public void DisConnect()
  206. {
  207. if (qt != null && qt.IsConnect())
  208. qt.DisConnect();
  209. }
  210. /// <summary>
  211. /// 订阅频道
  212. /// </summary>
  213. /// <param name="channelID"></param>
  214. public void Subscribe(string channelID)
  215. {
  216. Debug.Log(" DGJ =====> Subscribe " + channelID);
  217. if (qt != null && qt.IsConnect())
  218. {
  219. ushort s = qt.Subscribe(
  220. new string[]
  221. {
  222. channelID
  223. //front+"room/"+roomId
  224. },
  225. new byte[]
  226. {
  227. MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE,
  228. //MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE
  229. });
  230. Debug.Log(" DGJ =====> Subscribe2");
  231. }
  232. else
  233. Debug.LogError(" MQTT 未连接 !!!");
  234. }
  235. public void UnSubscribe(string channelID)
  236. {
  237. if (qt != null && qt.IsConnect())
  238. {
  239. // ushort s = qt.Unsubscribe(new string[] { channelID });
  240. }
  241. }
  242. public void publish(byte[] bs)
  243. {
  244. if (qt != null && qt.IsConnect())
  245. {
  246. // Debug.Log(id + account + " DGJ publish =====> " + front + "room/" + roomId + " " + bs.Length);
  247. // qt.Publish(front + "room/" + roomId, bs, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
  248. }
  249. else
  250. {
  251. Debug.LogError(" MQTT 未连接 ");
  252. }
  253. }
  254. public void publish(string channelID, byte[] bs)
  255. {
  256. if (qt != null && qt.IsConnect())
  257. {
  258. Debug.Log(MQTTManager.Instance.clientId + MQTTManager.Instance.account + " DGJ publish =====> " + channelID + " " + bs.Length);
  259. qt.Publish(channelID, bs, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
  260. }
  261. else
  262. {
  263. Debug.LogError(" MQTT 未连接 ");
  264. }
  265. }
  266. }
  267. public enum MqttState
  268. {
  269. start,
  270. successfu,
  271. reconnection,
  272. disconnect
  273. }
  274. public class ShowError
  275. {
  276. public string error;
  277. public float timer;
  278. public ShowError( string error,float timer)
  279. {
  280. this.error = error;
  281. this.timer = timer;
  282. }
  283. }