using System; using System.Collections; using System.Collections.Generic; using System.Net.Security; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading; using Blue; using UnityEngine; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; public interface IMQTTService : IService { #region MQTT代理配置 /// <summary> /// IP地址或URL /// </summary> string brokerAddress{ get; set; } /// <summary> /// 端口号 /// </summary> int brokerPort{ get;set; } /// <summary> /// 使用加密连接 /// </summary> bool isEncrypted{ get; set; } #endregion #region MQTT连接参数 /// <summary> /// 到代理的连接延迟了给定的毫秒 /// </summary> int connectionDelay{ get; set; } /// <summary> /// 连接超时(毫秒) /// </summary> int timeoutOnConnection{ get; set; } /// <summary> /// 启动时自动连接 /// </summary> bool autoConnect{ get; set; } /// <summary> /// MQTT代理的用户名。如果不需要用户名,请保留为空 /// </summary> string mqttUserName{ get;set; } /// <summary> /// MQTT代理的密码。如果不需要密码,请保持空 /// </summary> string mqttPassword{ get;set; } #endregion #region 连接 /// <summary> /// 使用当前设置连接到代理 /// </summary> void Connect(); /// <summary> /// 连接中 /// </summary> void OnConnecting(); /// <summary> /// 连接成功 /// </summary> void OnConnected(); /// <summary> /// 断开连接(如果已连接) /// </summary> void Disconnect(); /// <summary> /// 连接失败 /// </summary> /// <param name="errorMessage">失败消息</param> void OnConnectionFailed(string errorMessage); #endregion /// <summary> /// 订阅MQTT主题 /// </summary> void SubscribeTopics(string[] topics, byte[] qosLevels); /// <summary> /// 取消订阅MQTT主题 /// </summary> void UnsubscribeTopics(string[] topics); /// <summary> /// 发布消息 /// </summary> void Publish(string topic,string message); void Update(); void OnApplicationQuit(); } public class MQTTService : IMQTTService { protected MqttClient client; #region MQTT代理配置 public string brokerAddress { get; set; } public int brokerPort { get; set; } public bool isEncrypted { get; set; } = false; #endregion #region MQTT代理配置 public int connectionDelay { get; set; }= 500; public int timeoutOnConnection { get; set; }= MqttSettings.MQTT_CONNECT_TIMEOUT; public bool autoConnect { get; set; }= false; public string mqttUserName { get; set; }= null; public string mqttPassword { get; set; } = null; #endregion #region 公共 /// <summary> /// 成功建立连接时激发的事件 /// </summary> public event Action ConnectionSucceeded; /// <summary> /// 连接失败时激发的事件 /// </summary> public event Action ConnectionFailed; #endregion public void OnInit() { frontMessageQueue = messageQueue1; backMessageQueue = messageQueue2; if (autoConnect) { Connect(); } } #region 连接 public void Connect() { if (client == null || !client.IsConnected) { ThreadStart ts = new ThreadStart(DoConnect); Thread th = new Thread(ts); th.Start(); } Debug.LogWarning("开始连接"); } public void OnConnecting() { Debug.LogWarning("连接中"); } public void OnConnected() { Debug.LogWarning("连接完成"); } public void Disconnect() { if (client != null) { CoroutineSystem.Instance.Start_Coroutine(DoDisconnect()); } Debug.LogWarning("断开连接"); } public void OnConnectionFailed(string errorMessage) { Debug.LogError("MQTT连接失败"); ConnectionFailed?.Invoke(); /* if (ConnectionFailed != null) { ConnectionFailed(); } */ } #endregion private string[] SubTopics; public void SubscribeTopics(string[] topics, byte[] qosLevels) { SubTopics = topics; client.Subscribe(topics, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); string str =""; foreach(var topic in topics) { str += topic+"\n"; } Debug.LogWarning("订阅主题:"+"\n"+str); } public void UnsubscribeTopics(string[] topics) { client.Unsubscribe(topics); } public void Publish(string topic,string message) { client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false); } public void Update() { ProcessMqttEvents(); } public void OnApplicationQuit() { CloseConnection(); } #region 私有 private List<MqttMsgPublishEventArgs> messageQueue1 = new List<MqttMsgPublishEventArgs>(); private List<MqttMsgPublishEventArgs> messageQueue2 = new List<MqttMsgPublishEventArgs>(); private List<MqttMsgPublishEventArgs> frontMessageQueue = null; // 前端消息队列 private List<MqttMsgPublishEventArgs> backMessageQueue = null; // 返回消息队列 private bool mqttClientConnectionClosed = false; private bool mqttClientConnected = false; /// <summary> /// 解码消息 /// </summary> private void DecodeMessage(string topic, byte[] message) { string msg = Encoding.UTF8.GetString(message); StoreMessage(msg); Debug.LogFormat("收到{0}主题的消息:\n{1} ", topic,msg); } private List<string> eventMessages = new List<string>(); /// <summary> /// 存储消息 /// </summary> private void StoreMessage(string eventMsg) { eventMessages.Add(eventMsg); } /// <summary> /// 断开连接 /// </summary> private void OnDisconnected() { Debug.LogError("断开连接`"); } /// <summary> /// 连接丢失 /// </summary> private void OnConnectionLost() { Debug.LogError("连接丢失"); } /// <summary> /// 处理Mqtt事件 /// </summary> private void ProcessMqttEvents() { // 处理主队列中的消息 SwapMqttMessageQueues(); ProcessMqttMessageBackgroundQueue(); // 同时处理消息收入 SwapMqttMessageQueues(); ProcessMqttMessageBackgroundQueue(); if (mqttClientConnectionClosed) { mqttClientConnectionClosed = false; OnConnectionLost(); } } private void ProcessMqttMessageBackgroundQueue() { foreach (MqttMsgPublishEventArgs msg in backMessageQueue) { DecodeMessage(msg.Topic, msg.Message); } backMessageQueue.Clear(); } /// <summary> /// 交换消息队列以在处理队列时继续接收消息。 /// </summary> private void SwapMqttMessageQueues() { frontMessageQueue = frontMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1; backMessageQueue = backMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1; } private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs msg) { frontMessageQueue.Add(msg); } private void OnMqttConnectionClosed(object sender, EventArgs e) { // 仅在连接时设置意外连接关闭(避免在受控断开的情况下进行事件处理) mqttClientConnectionClosed = mqttClientConnected; mqttClientConnected = false; } /// <summary> /// 使用当前设置连接到代理。 /// </summary> /// <returns>执行是在协程中完成的</returns> private void DoConnect() { // 等待给定的延迟 // yield return new WaitForSecondsRealtime(connectionDelay / 1000f); // 给Unity留点时间刷新UI // yield return new WaitForEndOfFrame(); if (client == null) { #if (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP) client = new MqttClient(brokerAddress,brokerPort,isEncrypted, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None); #else X509Certificate cert = new X509Certificate(Application.streamingAssetsPath + "/emqxsl-ca.crt"); Debug.LogWarning("ip地址或url:"+ brokerAddress); client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None); //System.Security.Cryptography.X509Certificates.X509Certificate cert = new System.Security.Cryptography.X509Certificates.X509Certificate(); // client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert,userCertificateSelectionCallback, MyRemoteCertificateValidationCallback); #endif } else if (client.IsConnected) { Debug.LogError("IsConnected"); // yield break; } OnConnecting(); // 给Unity留点时间刷新UI // yield return new WaitForEndOfFrame(); // yield return new WaitForEndOfFrame(); // client.Settings.TimeoutOnConnection = timeoutOnConnection; string clientId = Guid.NewGuid().ToString(); client.Connect(clientId, mqttUserName, mqttPassword); if (client.IsConnected) { client.ConnectionClosed += OnMqttConnectionClosed; // register to message received client.MqttMsgPublishReceived += OnMqttMessageReceived; mqttClientConnected = true; OnConnected(); } else { OnConnectionFailed("CONNECTION FAILED!"); } } /// <summary> /// 用户证书选择回调 /// </summary> private bool userCertificateSelectionCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { throw new NotImplementedException(); } /// <summary> /// 远程证书验证回调 /// </summary> private bool MyRemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { throw new NotImplementedException(); } private IEnumerator DoDisconnect() { yield return new WaitForEndOfFrame(); CloseConnection(); OnDisconnected(); } private void CloseConnection() { mqttClientConnected = false; if (client != null) { if (client.IsConnected) { UnsubscribeTopics(SubTopics); SubTopics = null; client.Disconnect(); } client.MqttMsgPublishReceived -= OnMqttMessageReceived; client.ConnectionClosed -= OnMqttConnectionClosed; client = null; } } #if ((!UNITY_EDITOR && UNITY_WSA_10_0)) private void OnApplicationFocus(bool focus) { // 在UWP10(HoloLens)上,我们无法判断应用程序是真的关闭了还是最小化了 // (https://forum.unity.com/threads/onapplicationquit-and-ondestroy-are-not-called-on-uwp-10.462597/) if (focus) { Connect(); } else { CloseConnection(); } } #endif #endregion }