|
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Net.Security;
- using System.Security.Cryptography.X509Certificates;
- using System.Text;
- using System.Threading;
- using Blue;
- using UnityEngine;
- using uPLibrary.Networking.M2Mqtt;
- using uPLibrary.Networking.M2Mqtt.Messages;
- public interface IMQTTService : IService
- {
- #region MQTT代理配置
- /// <summary>
- /// IP地址或URL
- /// </summary>
- string brokerAddress{ get; set; }
- /// <summary>
- /// 端口号
- /// </summary>
- int brokerPort{ get;set; }
- /// <summary>
- /// 使用加密连接
- /// </summary>
- bool isEncrypted{ get; set; }
- #endregion
- #region MQTT连接参数
- /// <summary>
- /// 到代理的连接延迟了给定的毫秒
- /// </summary>
- int connectionDelay{ get; set; }
- /// <summary>
- /// 连接超时(毫秒)
- /// </summary>
- int timeoutOnConnection{ get; set; }
- /// <summary>
- /// 启动时自动连接
- /// </summary>
- bool autoConnect{ get; set; }
- /// <summary>
- /// MQTT代理的用户名。如果不需要用户名,请保留为空
- /// </summary>
- string mqttUserName{ get;set; }
- /// <summary>
- /// MQTT代理的密码。如果不需要密码,请保持空
- /// </summary>
- string mqttPassword{ get;set; }
- #endregion
- #region 连接
- /// <summary>
- /// 使用当前设置连接到代理
- /// </summary>
- void Connect();
- /// <summary>
- /// 连接中
- /// </summary>
- void OnConnecting();
- /// <summary>
- /// 连接成功
- /// </summary>
- void OnConnected();
- /// <summary>
- /// 断开连接(如果已连接)
- /// </summary>
- void Disconnect();
- /// <summary>
- /// 连接失败
- /// </summary>
- /// <param name="errorMessage">失败消息</param>
- void OnConnectionFailed(string errorMessage);
- #endregion
- /// <summary>
- /// 订阅MQTT主题
- /// </summary>
- void SubscribeTopics(string[] topics, byte[] qosLevels);
- /// <summary>
- /// 取消订阅MQTT主题
- /// </summary>
- void UnsubscribeTopics(string[] topics);
- /// <summary>
- /// 发布消息
- /// </summary>
- void Publish(string topic,string message);
- void Update();
- void OnApplicationQuit();
- }
- public class MQTTService : IMQTTService
- {
- protected MqttClient client;
- #region MQTT代理配置
- public string brokerAddress { get; set; }
- public int brokerPort { get; set; }
- public bool isEncrypted { get; set; } = false;
- #endregion
- #region MQTT代理配置
- public int connectionDelay { get; set; }= 500;
- public int timeoutOnConnection { get; set; }= MqttSettings.MQTT_CONNECT_TIMEOUT;
- public bool autoConnect { get; set; }= false;
- public string mqttUserName { get; set; }= null;
- public string mqttPassword { get; set; } = null;
- #endregion
- #region 公共
- /// <summary>
- /// 成功建立连接时激发的事件
- /// </summary>
- public event Action ConnectionSucceeded;
- /// <summary>
- /// 连接失败时激发的事件
- /// </summary>
- public event Action ConnectionFailed;
- #endregion
- public void OnInit()
- {
- frontMessageQueue = messageQueue1;
- backMessageQueue = messageQueue2;
- if (autoConnect)
- {
- Connect();
- }
- }
- #region 连接
- public void Connect()
- {
- if (client == null || !client.IsConnected)
- {
- ThreadStart ts = new ThreadStart(DoConnect);
- Thread th = new Thread(ts);
- th.Start();
- }
- Debug.LogWarning("开始连接");
- }
- public void OnConnecting()
- {
- Debug.LogWarning("连接中");
- }
- public void OnConnected()
- {
- Debug.LogWarning("连接完成");
- }
- public void Disconnect()
- {
- if (client != null)
- {
- CoroutineSystem.Instance.Start_Coroutine(DoDisconnect());
- }
- Debug.LogWarning("断开连接");
- }
- public void OnConnectionFailed(string errorMessage)
- {
- Debug.LogError("MQTT连接失败");
- ConnectionFailed?.Invoke();
- /*
- if (ConnectionFailed != null)
- {
- ConnectionFailed();
- }
- */
- }
- #endregion
- private string[] SubTopics;
- public void SubscribeTopics(string[] topics, byte[] qosLevels)
- {
- SubTopics = topics;
- client.Subscribe(topics, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
- string str ="";
- foreach(var topic in topics)
- {
- str += topic+"\n";
- }
- Debug.LogWarning("订阅主题:"+"\n"+str);
- }
- public void UnsubscribeTopics(string[] topics)
- {
- client.Unsubscribe(topics);
- }
- public void Publish(string topic,string message)
- {
- client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
- }
- public void Update()
- {
- ProcessMqttEvents();
- }
- public void OnApplicationQuit()
- {
- CloseConnection();
- }
- #region 私有
- private List<MqttMsgPublishEventArgs> messageQueue1 = new List<MqttMsgPublishEventArgs>();
- private List<MqttMsgPublishEventArgs> messageQueue2 = new List<MqttMsgPublishEventArgs>();
- private List<MqttMsgPublishEventArgs> frontMessageQueue = null; // 前端消息队列
- private List<MqttMsgPublishEventArgs> backMessageQueue = null; // 返回消息队列
- private bool mqttClientConnectionClosed = false;
- private bool mqttClientConnected = false;
- /// <summary>
- /// 解码消息
- /// </summary>
- private void DecodeMessage(string topic, byte[] message)
- {
- string msg = Encoding.UTF8.GetString(message);
- StoreMessage(msg);
- Debug.LogFormat("收到{0}主题的消息:\n{1} ", topic,msg);
- }
- private List<string> eventMessages = new List<string>();
- /// <summary>
- /// 存储消息
- /// </summary>
- private void StoreMessage(string eventMsg)
- {
- eventMessages.Add(eventMsg);
- }
- /// <summary>
- /// 断开连接
- /// </summary>
- private void OnDisconnected()
- {
- Debug.LogError("断开连接`");
- }
- /// <summary>
- /// 连接丢失
- /// </summary>
- private void OnConnectionLost()
- {
- Debug.LogError("连接丢失");
- }
- /// <summary>
- /// 处理Mqtt事件
- /// </summary>
- private void ProcessMqttEvents()
- {
- // 处理主队列中的消息
- SwapMqttMessageQueues();
- ProcessMqttMessageBackgroundQueue();
- // 同时处理消息收入
- SwapMqttMessageQueues();
- ProcessMqttMessageBackgroundQueue();
- if (mqttClientConnectionClosed)
- {
- mqttClientConnectionClosed = false;
- OnConnectionLost();
- }
- }
- private void ProcessMqttMessageBackgroundQueue()
- {
- foreach (MqttMsgPublishEventArgs msg in backMessageQueue)
- {
- DecodeMessage(msg.Topic, msg.Message);
- }
- backMessageQueue.Clear();
- }
- /// <summary>
- /// 交换消息队列以在处理队列时继续接收消息。
- /// </summary>
- private void SwapMqttMessageQueues()
- {
- frontMessageQueue = frontMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
- backMessageQueue = backMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
- }
- private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs msg)
- {
- frontMessageQueue.Add(msg);
- }
- private void OnMqttConnectionClosed(object sender, EventArgs e)
- {
- // 仅在连接时设置意外连接关闭(避免在受控断开的情况下进行事件处理)
- mqttClientConnectionClosed = mqttClientConnected;
- mqttClientConnected = false;
- }
- /// <summary>
- /// 使用当前设置连接到代理。
- /// </summary>
- /// <returns>执行是在协程中完成的</returns>
- private void DoConnect()
- {
- // 等待给定的延迟
- // yield return new WaitForSecondsRealtime(connectionDelay / 1000f);
- // 给Unity留点时间刷新UI
- // yield return new WaitForEndOfFrame();
- if (client == null)
- {
- #if (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP)
- client = new MqttClient(brokerAddress,brokerPort,isEncrypted, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
- #else
- X509Certificate cert = new X509Certificate(Application.streamingAssetsPath + "/emqxsl-ca.crt");
- Debug.LogWarning("ip地址或url:"+ brokerAddress);
- client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
- //System.Security.Cryptography.X509Certificates.X509Certificate cert = new System.Security.Cryptography.X509Certificates.X509Certificate();
- // client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert,userCertificateSelectionCallback, MyRemoteCertificateValidationCallback);
- #endif
- }
- else if (client.IsConnected)
- {
- Debug.LogError("IsConnected");
- // yield break;
- }
- OnConnecting();
- // 给Unity留点时间刷新UI
- // yield return new WaitForEndOfFrame();
- // yield return new WaitForEndOfFrame();
- client.Settings.TimeoutOnConnection = timeoutOnConnection;
- string clientId = Guid.NewGuid().ToString();
- client.Connect(clientId, mqttUserName, mqttPassword);
- if (client.IsConnected)
- {
- client.ConnectionClosed += OnMqttConnectionClosed;
- // register to message received
- client.MqttMsgPublishReceived += OnMqttMessageReceived;
- mqttClientConnected = true;
- OnConnected();
- }
- else
- {
- OnConnectionFailed("CONNECTION FAILED!");
- }
- }
- /// <summary>
- /// 用户证书选择回调
- /// </summary>
- private bool userCertificateSelectionCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
- {
- throw new NotImplementedException();
- }
- /// <summary>
- /// 远程证书验证回调
- /// </summary>
- private bool MyRemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
- {
- throw new NotImplementedException();
- }
- private IEnumerator DoDisconnect()
- {
- yield return new WaitForEndOfFrame();
- CloseConnection();
- OnDisconnected();
- }
- private void CloseConnection()
- {
- mqttClientConnected = false;
- if (client != null)
- {
- if (client.IsConnected)
- {
- UnsubscribeTopics(SubTopics);
- SubTopics = null;
- client.Disconnect();
- }
- client.MqttMsgPublishReceived -= OnMqttMessageReceived;
- client.ConnectionClosed -= OnMqttConnectionClosed;
- client = null;
- }
- }
- #if ((!UNITY_EDITOR && UNITY_WSA_10_0))
- private void OnApplicationFocus(bool focus)
- {
- // 在UWP10(HoloLens)上,我们无法判断应用程序是真的关闭了还是最小化了
- // (https://forum.unity.com/threads/onapplicationquit-and-ondestroy-are-not-called-on-uwp-10.462597/)
- if (focus)
- {
- Connect();
- }
- else
- {
- CloseConnection();
- }
- }
- #endif
- #endregion
- }
|