using System; using System.Collections; using System.Collections.Generic; using System.Net.Security; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading; using Blue; using UnityEngine; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; public interface IMQTTService : IService { #region MQTT代理配置 /// /// IP地址或URL /// string brokerAddress{ get; set; } /// /// 端口号 /// int brokerPort{ get;set; } /// /// 使用加密连接 /// bool isEncrypted{ get; set; } #endregion #region MQTT连接参数 /// /// 到代理的连接延迟了给定的毫秒 /// int connectionDelay{ get; set; } /// /// 连接超时(毫秒) /// int timeoutOnConnection{ get; set; } /// /// 启动时自动连接 /// bool autoConnect{ get; set; } /// /// MQTT代理的用户名。如果不需要用户名,请保留为空 /// string mqttUserName{ get;set; } /// /// MQTT代理的密码。如果不需要密码,请保持空 /// string mqttPassword{ get;set; } #endregion #region 连接 /// /// 使用当前设置连接到代理 /// void Connect(); /// /// 连接中 /// void OnConnecting(); /// /// 连接成功 /// void OnConnected(); /// /// 断开连接(如果已连接) /// void Disconnect(); /// /// 连接失败 /// /// 失败消息 void OnConnectionFailed(string errorMessage); #endregion /// /// 订阅MQTT主题 /// void SubscribeTopics(string[] topics, byte[] qosLevels); /// /// 取消订阅MQTT主题 /// void UnsubscribeTopics(string[] topics); /// /// 发布消息 /// void Publish(string topic,string message); void Update(); void OnApplicationQuit(); } public class MQTTService : IMQTTService { protected MqttClient client; #region MQTT代理配置 public string brokerAddress { get; set; } public int brokerPort { get; set; } public bool isEncrypted { get; set; } = false; #endregion #region MQTT代理配置 public int connectionDelay { get; set; }= 500; public int timeoutOnConnection { get; set; }= MqttSettings.MQTT_CONNECT_TIMEOUT; public bool autoConnect { get; set; }= false; public string mqttUserName { get; set; }= null; public string mqttPassword { get; set; } = null; #endregion #region 公共 /// /// 成功建立连接时激发的事件 /// public event Action ConnectionSucceeded; /// /// 连接失败时激发的事件 /// public event Action ConnectionFailed; #endregion public void OnInit() { frontMessageQueue = messageQueue1; backMessageQueue = messageQueue2; if (autoConnect) { Connect(); } } #region 连接 public void Connect() { if (client == null || !client.IsConnected) { ThreadStart ts = new ThreadStart(DoConnect); Thread th = new Thread(ts); th.Start(); } Debug.LogWarning("开始连接"); } public void OnConnecting() { Debug.LogWarning("连接中"); } public void OnConnected() { Debug.LogWarning("连接完成"); } public void Disconnect() { if (client != null) { CoroutineSystem.Instance.Start_Coroutine(DoDisconnect()); } Debug.LogWarning("断开连接"); } public void OnConnectionFailed(string errorMessage) { Debug.LogError("MQTT连接失败"); ConnectionFailed?.Invoke(); /* if (ConnectionFailed != null) { ConnectionFailed(); } */ } #endregion private string[] SubTopics; public void SubscribeTopics(string[] topics, byte[] qosLevels) { SubTopics = topics; client.Subscribe(topics, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); string str =""; foreach(var topic in topics) { str += topic+"\n"; } Debug.LogWarning("订阅主题:"+"\n"+str); } public void UnsubscribeTopics(string[] topics) { client.Unsubscribe(topics); } public void Publish(string topic,string message) { client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false); } public void Update() { ProcessMqttEvents(); } public void OnApplicationQuit() { CloseConnection(); } #region 私有 private List messageQueue1 = new List(); private List messageQueue2 = new List(); private List frontMessageQueue = null; // 前端消息队列 private List backMessageQueue = null; // 返回消息队列 private bool mqttClientConnectionClosed = false; private bool mqttClientConnected = false; /// /// 解码消息 /// private void DecodeMessage(string topic, byte[] message) { string msg = Encoding.UTF8.GetString(message); StoreMessage(msg); Debug.LogFormat("收到{0}主题的消息:\n{1} ", topic,msg); } private List eventMessages = new List(); /// /// 存储消息 /// private void StoreMessage(string eventMsg) { eventMessages.Add(eventMsg); } /// /// 断开连接 /// private void OnDisconnected() { Debug.LogError("断开连接`"); } /// /// 连接丢失 /// private void OnConnectionLost() { Debug.LogError("连接丢失"); } /// /// 处理Mqtt事件 /// private void ProcessMqttEvents() { // 处理主队列中的消息 SwapMqttMessageQueues(); ProcessMqttMessageBackgroundQueue(); // 同时处理消息收入 SwapMqttMessageQueues(); ProcessMqttMessageBackgroundQueue(); if (mqttClientConnectionClosed) { mqttClientConnectionClosed = false; OnConnectionLost(); } } private void ProcessMqttMessageBackgroundQueue() { foreach (MqttMsgPublishEventArgs msg in backMessageQueue) { DecodeMessage(msg.Topic, msg.Message); } backMessageQueue.Clear(); } /// /// 交换消息队列以在处理队列时继续接收消息。 /// private void SwapMqttMessageQueues() { frontMessageQueue = frontMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1; backMessageQueue = backMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1; } private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs msg) { frontMessageQueue.Add(msg); } private void OnMqttConnectionClosed(object sender, EventArgs e) { // 仅在连接时设置意外连接关闭(避免在受控断开的情况下进行事件处理) mqttClientConnectionClosed = mqttClientConnected; mqttClientConnected = false; } /// /// 使用当前设置连接到代理。 /// /// 执行是在协程中完成的 private void DoConnect() { // 等待给定的延迟 // yield return new WaitForSecondsRealtime(connectionDelay / 1000f); // 给Unity留点时间刷新UI // yield return new WaitForEndOfFrame(); if (client == null) { #if (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP) client = new MqttClient(brokerAddress,brokerPort,isEncrypted, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None); #else X509Certificate cert = new X509Certificate(Application.streamingAssetsPath + "/emqxsl-ca.crt"); Debug.LogWarning("ip地址或url:"+ brokerAddress); client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None); //System.Security.Cryptography.X509Certificates.X509Certificate cert = new System.Security.Cryptography.X509Certificates.X509Certificate(); // client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert,userCertificateSelectionCallback, MyRemoteCertificateValidationCallback); #endif } else if (client.IsConnected) { Debug.LogError("IsConnected"); // yield break; } OnConnecting(); // 给Unity留点时间刷新UI // yield return new WaitForEndOfFrame(); // yield return new WaitForEndOfFrame(); // client.Settings.TimeoutOnConnection = timeoutOnConnection; string clientId = Guid.NewGuid().ToString(); client.Connect(clientId, mqttUserName, mqttPassword); if (client.IsConnected) { client.ConnectionClosed += OnMqttConnectionClosed; // register to message received client.MqttMsgPublishReceived += OnMqttMessageReceived; mqttClientConnected = true; OnConnected(); } else { OnConnectionFailed("CONNECTION FAILED!"); } } /// /// 用户证书选择回调 /// private bool userCertificateSelectionCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { throw new NotImplementedException(); } /// /// 远程证书验证回调 /// private bool MyRemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { throw new NotImplementedException(); } private IEnumerator DoDisconnect() { yield return new WaitForEndOfFrame(); CloseConnection(); OnDisconnected(); } private void CloseConnection() { mqttClientConnected = false; if (client != null) { if (client.IsConnected) { UnsubscribeTopics(SubTopics); SubTopics = null; client.Disconnect(); } client.MqttMsgPublishReceived -= OnMqttMessageReceived; client.ConnectionClosed -= OnMqttConnectionClosed; client = null; } } #if ((!UNITY_EDITOR && UNITY_WSA_10_0)) private void OnApplicationFocus(bool focus) { // 在UWP10(HoloLens)上,我们无法判断应用程序是真的关闭了还是最小化了 // (https://forum.unity.com/threads/onapplicationquit-and-ondestroy-are-not-called-on-uwp-10.462597/) if (focus) { Connect(); } else { CloseConnection(); } } #endif #endregion }