MQTTClient.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. using LitJson;
  2. using System;
  3. using System.Collections;
  4. using System.Collections.Generic;
  5. using System.Runtime.InteropServices;
  6. using System.Text;
  7. using UnityEngine;
  8. using UnityEngine.Android;
  9. using UnityEngine.UI;
  10. using uPLibrary.Networking.M2Mqtt.Messages;
  11. using static QTTManager;
  12. public class MQTTClient : MonoBehaviour
  13. {
  14. [DllImport("__Internal")]
  15. private static extern void Connect(string host, string port, string clientId, string username, string password, string destination);
  16. [DllImport("__Internal")]
  17. private static extern void Subscribe(string topic);
  18. [DllImport("__Internal")]
  19. private static extern void Send(string topic, string payload);
  20. [DllImport("__Internal")]
  21. private static extern void Unsubscribe(string topic);
  22. [DllImport("__Internal")]
  23. private static extern void Disconnect();
  24. public static MQTTClient Instance;
  25. //public string front = "client/";
  26. // public string id = "";
  27. //public string username = "u@unity3";// 需要根据服务器设置
  28. //public string password = null;// 需要根据服务器设置
  29. public string rid = "mqttx_b4c02ddc"; //其他人的ID
  30. //public string account;
  31. //public string roomId;
  32. //public string email;
  33. //public string phone;
  34. //public string roleId;
  35. //public string resourcePool;
  36. //public string expertType;
  37. //public string _username;
  38. public event Action<JsonData> OnCoordinate;
  39. public static byte[] bytes;
  40. // public static string certification;
  41. // public static byt ta;
  42. QTTManager qt;
  43. // Start is called before the first frame update
  44. private int ReconnectionCount = 0;
  45. public MqttState mqttState = MqttState.start;
  46. private void Awake()
  47. {
  48. Instance = this;
  49. }
  50. void Start()
  51. {
  52. // 请求文件读取和写入权限
  53. if (!Permission.HasUserAuthorizedPermission(Permission.ExternalStorageRead) ||
  54. !Permission.HasUserAuthorizedPermission(Permission.ExternalStorageWrite))
  55. {
  56. Permission.RequestUserPermission(Permission.ExternalStorageRead);
  57. Permission.RequestUserPermission(Permission.ExternalStorageWrite);
  58. }
  59. bytes = (Resources.Load("emqxsl-ca") as TextAsset).bytes;
  60. Debug.Log(" DGJ ===> emqxsl-ca.bytes " + bytes.Length);
  61. }
  62. Queue<MqttMsgPublishEventArgs> rlist = new Queue<MqttMsgPublishEventArgs>();
  63. Queue<ShowError> errorList = new Queue<ShowError>();
  64. private void OnReceived(MqttMsgPublishEventArgs obj)
  65. {
  66. rlist.Enqueue(obj);
  67. }
  68. private void OnConnecting()
  69. {
  70. Debug.Log("MQtt 连接中");
  71. }
  72. private void onSucceed()
  73. {
  74. // ErrorPopup.Instance.UpdateEmqxState(NetState.Connect);
  75. ReconnectionCount = 0;
  76. if (mqttState == MqttState.reconnection)
  77. {
  78. // ErrorPopup.Instance.ShowNetError("网络重新连接", 3);
  79. errorList.Enqueue(new ShowError("信令 重连成功", 3));
  80. }
  81. mqttState = MqttState.successfu;
  82. // Subscribe(MQTTManager.Instance.front + "online/" + MQTTManager.Instance.phone);
  83. //Subscribe(MQTTManager.Instance.front + "personnel/" + MQTTManager.Instance.resourcePool);
  84. if( !string.IsNullOrEmpty( MQTTManager.Instance.roomId))
  85. Subscribe(MQTTManager.Instance.front + "room/" + MQTTManager.Instance.roomId);
  86. Subscribe(MQTTManager.Instance.front + "personnel/" + MQTTManager.Instance.resourcePoolId);
  87. Subscribe(MQTTManager.Instance.front + "online/" + MQTTManager.Instance.id);
  88. Subscribe(MQTTManager.Instance.front + "sensor/VOICE"); //噪声
  89. Subscribe(MQTTManager.Instance.front + "sensor/TEMPERATURE_HUMIDITY"); //温湿度传感器
  90. Subscribe(MQTTManager.Instance.front + "sensor/VIBRATE"); //振动传感器
  91. //if (!string.IsNullOrEmpty( MQTTManager.Instance.topiceNavigation))
  92. //{
  93. // Subscribe(MQTTManager.Instance.topiceNavigation);
  94. // Subscribe(MQTTManager.Instance.topiceSync);
  95. // Subscribe(MQTTManager.Instance.topiceClose);
  96. //}
  97. // Subscribe((MQTTManager.Instance.front + "younuo"));
  98. // MQTTManager.Instance.isCreateRoom = true;
  99. }
  100. private void onFaild()
  101. {
  102. Debug.Log("MQtt 连接失败");
  103. }
  104. // Update is called once per frame
  105. void Update()
  106. {
  107. if (rlist.Count > 0)
  108. {
  109. for (int i = 0; i < rlist.Count; i++)
  110. {
  111. OnUnityReceived(rlist.Dequeue());
  112. }
  113. }
  114. if (errorList.Count > 0)
  115. {
  116. ShowError error = errorList.Dequeue();
  117. // ErrorPopup.Instance.ShowEmqxError(error.error, error.timer);
  118. }
  119. }
  120. public Text SENSO;
  121. public Text disp;
  122. public Text speed;
  123. public Text temperature;
  124. public Text humidity;
  125. //MQTT接收到的数据
  126. public void OnUnityReceived(MqttMsgPublishEventArgs obj)
  127. {
  128. string msg = Encoding.UTF8.GetString(obj.Message);
  129. Debug.Log("uid => " + obj.Topic + ":\n" + msg);
  130. JsonData data = JsonMapper.ToObject(msg);
  131. switch (data["device"]["deviceCode"].ToString())
  132. {
  133. case "VOICE_SENSO_JSTZ01"://噪声
  134. SENSO.text = data["value"].ToString()+ data["unit"].ToString();
  135. break;
  136. case "VIBRATE_SENSOR_JSTZ01"://震动
  137. disp.text = data["disp"]["value"].ToString() + data["disp"]["unit"].ToString();
  138. speed.text = data["speed"]["value"].ToString() + data["speed"]["unit"].ToString();
  139. break;
  140. case "TEMPERATURE_HUMIDITY_SENSO_JSTZ01"://温湿度
  141. temperature.text = data["temperature"]["value"].ToString() + data["temperature"]["unit"].ToString();
  142. humidity.text =data["humidity"]["value"].ToString() + data["humidity"]["unit"].ToString();
  143. break;
  144. }
  145. // UILogManager.Instance.text3.text = msg;
  146. // MQTTManager.Instance.Received(msg);
  147. MQTTManager.Instance.Received(obj);
  148. }
  149. private void OnDestroy()
  150. {
  151. }
  152. private void OnApplicationQuit()
  153. {
  154. DisConnect();
  155. }
  156. //连接
  157. public void Connect(string host)
  158. {
  159. Debug.Log(" DGJ ===> emqxsl-ca.bytes 1 " + bytes.Length);
  160. Debug.Log(" DGJ ===> username " + MQTTManager.Instance.username);
  161. Debug.Log(" DGJ ===> clientId " + MQTTManager.Instance.clientId);
  162. Debug.Log(" DGJ ===> certification " + MQTTManager.certification);
  163. //if (DeviceType.type == "Phone")
  164. //{
  165. // id =UserInfo.Instance.name + "_Phone";
  166. // rid = front+UserInfo.Instance.name + "_Glasses";
  167. //}
  168. //else
  169. //{
  170. // id = UserInfo.Instance.name + "_Glasses";
  171. // rid = front+UserInfo.Instance.name + "_Phone";
  172. //}
  173. // Debug.Log("DGJ ==>MQTT " + id + account + " " + username + " " + certification + " " + HttpActionLang.Instance.mqttSocket);
  174. Connect(host, "1883", MQTTManager.Instance.clientId, MQTTManager.Instance.username,"", MQTTManager.certification);
  175. return;
  176. try
  177. {
  178. qt = new QTTManager(MQTTManager.Instance.clientId, MQTTManager.Instance.username, MQTTManager.certification, host, "1883");
  179. Debug.Log("DGJ ===> MQTT " + qt!=null);
  180. qt.ConnectionFailed += onFaild;
  181. qt.ConnectionSucceeded += onSucceed;
  182. qt.OnConnecting += OnConnecting;
  183. qt.OnReceived += OnReceived;
  184. qt.OnClose += OnClose;
  185. qt.Connect(bytes);
  186. StartCoroutine(Reconnection());
  187. }
  188. catch (Exception e)
  189. {
  190. Debug.LogError("MQTT Connect Error "+ e.Message);
  191. }
  192. // StartSendCameraPos();
  193. }
  194. private void OnClose(EventArgs obj)
  195. {
  196. Debug.Log("断开连接");
  197. mqttState = MqttState.disconnect;
  198. // ErrorPopup.Instance.UpdateEmqxState(NetState.ReConnection);
  199. errorList.Enqueue(new ShowError("信令 断开连接 重连中... ", -1));
  200. }
  201. private IEnumerator Reconnection()
  202. {
  203. while (true)
  204. {
  205. yield return new WaitForSeconds(5);
  206. if (qt != null && !qt.IsConnect())
  207. {
  208. mqttState = MqttState.reconnection;
  209. if (ReconnectionCount >= 3)
  210. {
  211. Debug.LogError(" DGJ RTC服务重连失败,请退出应用重试 ");
  212. DisConnect();
  213. // ErrorPopup.Instance.UpdateEmqxState(NetState.DisConnect);
  214. errorList.Enqueue(new ShowError("信令重连失败,请检测网络状态", -1));
  215. // SignalClient.showErrorList.Enqueue(new ShowError("RTC服务重连失败,请退出应用重试", -1));
  216. }
  217. else
  218. {
  219. DisConnect();
  220. Debug.Log(" DGJ ===> Reconnection ");
  221. ReconnectionCount++;
  222. Connect(MQTTManager.host);
  223. }
  224. }
  225. }
  226. }
  227. //断开连接
  228. public void DisConnect()
  229. {
  230. if (qt != null && qt.IsConnect())
  231. qt.DisConnect();
  232. }
  233. /*
  234. /// <summary>
  235. /// 订阅频道
  236. /// </summary>
  237. /// <param name="channelID"></param>
  238. public void Subscribe(string channelID)
  239. {
  240. Debug.Log(" DGJ =====> Subscribe " + channelID);
  241. if (qt != null && qt.IsConnect())
  242. {
  243. ushort s = qt.Subscribe(
  244. new string[]
  245. {
  246. channelID
  247. //front+"room/"+roomId
  248. },
  249. new byte[]
  250. {
  251. MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE,
  252. //MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE
  253. });
  254. Debug.Log(" DGJ =====> Subscribe2");
  255. }
  256. else
  257. Debug.LogError(" MQTT 未连接 !!!");
  258. }
  259. public void UnSubscribe(string channelID)
  260. {
  261. if (qt != null && qt.IsConnect())
  262. {
  263. // ushort s = qt.Unsubscribe(new string[] { channelID });
  264. }
  265. }*/
  266. public void publish(byte[] bs)
  267. {
  268. if (qt != null && qt.IsConnect())
  269. {
  270. // Debug.Log(id + account + " DGJ publish =====> " + front + "room/" + roomId + " " + bs.Length);
  271. // qt.Publish(front + "room/" + roomId, bs, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
  272. }
  273. else
  274. {
  275. Debug.LogError(" MQTT 未连接 ");
  276. }
  277. }
  278. public void publish(string channelID, byte[] bs)
  279. {
  280. if (qt != null && qt.IsConnect())
  281. {
  282. Debug.Log(MQTTManager.Instance.clientId + MQTTManager.Instance.account + " DGJ publish =====> " + channelID + " " + bs.Length);
  283. qt.Publish(channelID, bs, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
  284. }
  285. else
  286. {
  287. Debug.LogError(" MQTT 未连接 ");
  288. }
  289. }
  290. }
  291. public enum MqttState
  292. {
  293. start,
  294. successfu,
  295. reconnection,
  296. disconnect
  297. }
  298. public class ShowError
  299. {
  300. public string error;
  301. public float timer;
  302. public ShowError( string error,float timer)
  303. {
  304. this.error = error;
  305. this.timer = timer;
  306. }
  307. }