|
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Net.Security;
- using System.Security.Cryptography.X509Certificates;
- using System.Text;
- using System.Threading;
- using Blue;
- using UnityEngine;
- using uPLibrary.Networking.M2Mqtt;
- using uPLibrary.Networking.M2Mqtt.Messages;
- public interface IMQTTService : IService
- {
- #region MQTT代理配置
-
-
-
- string brokerAddress{ get; set; }
-
-
-
- int brokerPort{ get;set; }
-
-
-
- bool isEncrypted{ get; set; }
- #endregion
- #region MQTT连接参数
-
-
-
- int connectionDelay{ get; set; }
-
-
-
- int timeoutOnConnection{ get; set; }
-
-
-
- bool autoConnect{ get; set; }
-
-
-
- string mqttUserName{ get;set; }
-
-
-
- string mqttPassword{ get;set; }
- #endregion
- #region 连接
-
-
-
- void Connect();
-
-
-
- void OnConnecting();
-
-
-
- void OnConnected();
-
-
-
- void Disconnect();
-
-
-
-
- void OnConnectionFailed(string errorMessage);
- #endregion
-
-
-
- void SubscribeTopics(string[] topics, byte[] qosLevels);
-
-
-
- void UnsubscribeTopics(string[] topics);
-
-
-
- void Publish(string topic,string message);
- void Update();
- void OnApplicationQuit();
- }
- public class MQTTService : IMQTTService
- {
- protected MqttClient client;
- #region MQTT代理配置
- public string brokerAddress { get; set; }
- public int brokerPort { get; set; }
- public bool isEncrypted { get; set; } = false;
- #endregion
- #region MQTT代理配置
- public int connectionDelay { get; set; }= 500;
- public int timeoutOnConnection { get; set; }= MqttSettings.MQTT_CONNECT_TIMEOUT;
- public bool autoConnect { get; set; }= false;
- public string mqttUserName { get; set; }= null;
- public string mqttPassword { get; set; } = null;
- #endregion
- #region 公共
-
-
-
- public event Action ConnectionSucceeded;
-
-
-
- public event Action ConnectionFailed;
- #endregion
- public void OnInit()
- {
- frontMessageQueue = messageQueue1;
- backMessageQueue = messageQueue2;
- if (autoConnect)
- {
- Connect();
- }
- }
- #region 连接
- public void Connect()
- {
- if (client == null || !client.IsConnected)
- {
- ThreadStart ts = new ThreadStart(DoConnect);
- Thread th = new Thread(ts);
- th.Start();
- }
- Debug.LogWarning("开始连接");
- }
- public void OnConnecting()
- {
- Debug.LogWarning("连接中");
- }
- public void OnConnected()
- {
- Debug.LogWarning("连接完成");
- }
- public void Disconnect()
- {
- if (client != null)
- {
- CoroutineSystem.Instance.Start_Coroutine(DoDisconnect());
- }
- Debug.LogWarning("断开连接");
- }
- public void OnConnectionFailed(string errorMessage)
- {
- Debug.LogError("MQTT连接失败");
- ConnectionFailed?.Invoke();
-
- }
- #endregion
- private string[] SubTopics;
- public void SubscribeTopics(string[] topics, byte[] qosLevels)
- {
- SubTopics = topics;
- client.Subscribe(topics, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
- string str ="";
- foreach(var topic in topics)
- {
- str += topic+"\n";
- }
- Debug.LogWarning("订阅主题:"+"\n"+str);
- }
- public void UnsubscribeTopics(string[] topics)
- {
- client.Unsubscribe(topics);
- }
- public void Publish(string topic,string message)
- {
- client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
- }
- public void Update()
- {
- ProcessMqttEvents();
- }
- public void OnApplicationQuit()
- {
- CloseConnection();
- }
- #region 私有
- private List<MqttMsgPublishEventArgs> messageQueue1 = new List<MqttMsgPublishEventArgs>();
- private List<MqttMsgPublishEventArgs> messageQueue2 = new List<MqttMsgPublishEventArgs>();
- private List<MqttMsgPublishEventArgs> frontMessageQueue = null;
- private List<MqttMsgPublishEventArgs> backMessageQueue = null;
- private bool mqttClientConnectionClosed = false;
- private bool mqttClientConnected = false;
-
-
-
- private void DecodeMessage(string topic, byte[] message)
- {
- string msg = Encoding.UTF8.GetString(message);
- StoreMessage(msg);
- Debug.LogFormat("收到{0}主题的消息:\n{1} ", topic,msg);
- }
- private List<string> eventMessages = new List<string>();
-
-
-
- private void StoreMessage(string eventMsg)
- {
- eventMessages.Add(eventMsg);
- }
-
-
-
- private void OnDisconnected()
- {
- Debug.LogError("断开连接`");
-
- }
-
-
-
- private void OnConnectionLost()
- {
- Debug.LogError("连接丢失");
- }
-
-
-
- private void ProcessMqttEvents()
- {
-
- SwapMqttMessageQueues();
- ProcessMqttMessageBackgroundQueue();
-
- SwapMqttMessageQueues();
- ProcessMqttMessageBackgroundQueue();
- if (mqttClientConnectionClosed)
- {
- mqttClientConnectionClosed = false;
- OnConnectionLost();
- }
- }
- private void ProcessMqttMessageBackgroundQueue()
- {
- foreach (MqttMsgPublishEventArgs msg in backMessageQueue)
- {
- DecodeMessage(msg.Topic, msg.Message);
- }
- backMessageQueue.Clear();
- }
-
-
-
- private void SwapMqttMessageQueues()
- {
- frontMessageQueue = frontMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
- backMessageQueue = backMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
- }
- private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs msg)
- {
- frontMessageQueue.Add(msg);
- }
- private void OnMqttConnectionClosed(object sender, EventArgs e)
- {
-
- mqttClientConnectionClosed = mqttClientConnected;
- mqttClientConnected = false;
- }
-
-
-
-
- private void DoConnect()
- {
-
-
-
-
- if (client == null)
- {
- #if (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP)
- client = new MqttClient(brokerAddress,brokerPort,isEncrypted, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
- #else
- X509Certificate cert = new X509Certificate(Application.streamingAssetsPath + "/emqxsl-ca.crt");
- Debug.LogWarning("ip地址或url:"+ brokerAddress);
- client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
-
-
- #endif
- }
- else if (client.IsConnected)
- {
- Debug.LogError("IsConnected");
-
- }
- OnConnecting();
-
-
-
-
- string clientId = Guid.NewGuid().ToString();
- client.Connect(clientId, mqttUserName, mqttPassword);
- if (client.IsConnected)
- {
- client.ConnectionClosed += OnMqttConnectionClosed;
-
- client.MqttMsgPublishReceived += OnMqttMessageReceived;
- mqttClientConnected = true;
- OnConnected();
- }
- else
- {
- OnConnectionFailed("CONNECTION FAILED!");
- }
- }
-
-
-
- private bool userCertificateSelectionCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
- {
- throw new NotImplementedException();
- }
-
-
-
- private bool MyRemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
- {
- throw new NotImplementedException();
- }
- private IEnumerator DoDisconnect()
- {
- yield return new WaitForEndOfFrame();
- CloseConnection();
- OnDisconnected();
- }
- private void CloseConnection()
- {
- mqttClientConnected = false;
- if (client != null)
- {
- if (client.IsConnected)
- {
- UnsubscribeTopics(SubTopics);
- SubTopics = null;
- client.Disconnect();
- }
- client.MqttMsgPublishReceived -= OnMqttMessageReceived;
- client.ConnectionClosed -= OnMqttConnectionClosed;
- client = null;
- }
- }
- #if ((!UNITY_EDITOR && UNITY_WSA_10_0))
- private void OnApplicationFocus(bool focus)
- {
-
-
- if (focus)
- {
- Connect();
- }
- else
- {
- CloseConnection();
- }
- }
- #endif
- #endregion
- }
|