using System;
using System.Collections;
using System.Collections.Generic;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using Blue;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
public interface IMQTTService : IService
{
#region MQTT代理配置
///
/// IP地址或URL
///
string brokerAddress{ get; set; }
///
/// 端口号
///
int brokerPort{ get;set; }
///
/// 使用加密连接
///
bool isEncrypted{ get; set; }
#endregion
#region MQTT连接参数
///
/// 到代理的连接延迟了给定的毫秒
///
int connectionDelay{ get; set; }
///
/// 连接超时(毫秒)
///
int timeoutOnConnection{ get; set; }
///
/// 启动时自动连接
///
bool autoConnect{ get; set; }
///
/// MQTT代理的用户名。如果不需要用户名,请保留为空
///
string mqttUserName{ get;set; }
///
/// MQTT代理的密码。如果不需要密码,请保持空
///
string mqttPassword{ get;set; }
#endregion
#region 连接
///
/// 使用当前设置连接到代理
///
void Connect();
///
/// 连接中
///
void OnConnecting();
///
/// 连接成功
///
void OnConnected();
///
/// 断开连接(如果已连接)
///
void Disconnect();
///
/// 连接失败
///
/// 失败消息
void OnConnectionFailed(string errorMessage);
#endregion
///
/// 订阅MQTT主题
///
void SubscribeTopics(string[] topics, byte[] qosLevels);
///
/// 取消订阅MQTT主题
///
void UnsubscribeTopics(string[] topics);
///
/// 发布消息
///
void Publish(string topic,string message);
void Update();
void OnApplicationQuit();
}
public class MQTTService : IMQTTService
{
protected MqttClient client;
#region MQTT代理配置
public string brokerAddress { get; set; }
public int brokerPort { get; set; }
public bool isEncrypted { get; set; } = false;
#endregion
#region MQTT代理配置
public int connectionDelay { get; set; }= 500;
public int timeoutOnConnection { get; set; }= MqttSettings.MQTT_CONNECT_TIMEOUT;
public bool autoConnect { get; set; }= false;
public string mqttUserName { get; set; }= null;
public string mqttPassword { get; set; } = null;
#endregion
#region 公共
///
/// 成功建立连接时激发的事件
///
public event Action ConnectionSucceeded;
///
/// 连接失败时激发的事件
///
public event Action ConnectionFailed;
#endregion
public void OnInit()
{
frontMessageQueue = messageQueue1;
backMessageQueue = messageQueue2;
if (autoConnect)
{
Connect();
}
}
#region 连接
public void Connect()
{
if (client == null || !client.IsConnected)
{
ThreadStart ts = new ThreadStart(DoConnect);
Thread th = new Thread(ts);
th.Start();
}
Debug.LogWarning("开始连接");
}
public void OnConnecting()
{
Debug.LogWarning("连接中");
}
public void OnConnected()
{
Debug.LogWarning("连接完成");
}
public void Disconnect()
{
if (client != null)
{
CoroutineSystem.Instance.Start_Coroutine(DoDisconnect());
}
Debug.LogWarning("断开连接");
}
public void OnConnectionFailed(string errorMessage)
{
Debug.LogError("MQTT连接失败");
ConnectionFailed?.Invoke();
/*
if (ConnectionFailed != null)
{
ConnectionFailed();
}
*/
}
#endregion
private string[] SubTopics;
public void SubscribeTopics(string[] topics, byte[] qosLevels)
{
SubTopics = topics;
client.Subscribe(topics, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE , MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
string str ="";
foreach(var topic in topics)
{
str += topic+"\n";
}
Debug.LogWarning("订阅主题:"+"\n"+str);
}
public void UnsubscribeTopics(string[] topics)
{
client.Unsubscribe(topics);
}
public void Publish(string topic,string message)
{
client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);
}
public void Update()
{
ProcessMqttEvents();
}
public void OnApplicationQuit()
{
CloseConnection();
}
#region 私有
private List messageQueue1 = new List();
private List messageQueue2 = new List();
private List frontMessageQueue = null; // 前端消息队列
private List backMessageQueue = null; // 返回消息队列
private bool mqttClientConnectionClosed = false;
private bool mqttClientConnected = false;
///
/// 解码消息
///
private void DecodeMessage(string topic, byte[] message)
{
string msg = Encoding.UTF8.GetString(message);
StoreMessage(msg);
Debug.LogFormat("收到{0}主题的消息:\n{1} ", topic,msg);
}
private List eventMessages = new List();
///
/// 存储消息
///
private void StoreMessage(string eventMsg)
{
eventMessages.Add(eventMsg);
}
///
/// 断开连接
///
private void OnDisconnected()
{
Debug.LogError("断开连接`");
}
///
/// 连接丢失
///
private void OnConnectionLost()
{
Debug.LogError("连接丢失");
}
///
/// 处理Mqtt事件
///
private void ProcessMqttEvents()
{
// 处理主队列中的消息
SwapMqttMessageQueues();
ProcessMqttMessageBackgroundQueue();
// 同时处理消息收入
SwapMqttMessageQueues();
ProcessMqttMessageBackgroundQueue();
if (mqttClientConnectionClosed)
{
mqttClientConnectionClosed = false;
OnConnectionLost();
}
}
private void ProcessMqttMessageBackgroundQueue()
{
foreach (MqttMsgPublishEventArgs msg in backMessageQueue)
{
DecodeMessage(msg.Topic, msg.Message);
}
backMessageQueue.Clear();
}
///
/// 交换消息队列以在处理队列时继续接收消息。
///
private void SwapMqttMessageQueues()
{
frontMessageQueue = frontMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
backMessageQueue = backMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
}
private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs msg)
{
frontMessageQueue.Add(msg);
}
private void OnMqttConnectionClosed(object sender, EventArgs e)
{
// 仅在连接时设置意外连接关闭(避免在受控断开的情况下进行事件处理)
mqttClientConnectionClosed = mqttClientConnected;
mqttClientConnected = false;
}
///
/// 使用当前设置连接到代理。
///
/// 执行是在协程中完成的
private void DoConnect()
{
// 等待给定的延迟
// yield return new WaitForSecondsRealtime(connectionDelay / 1000f);
// 给Unity留点时间刷新UI
// yield return new WaitForEndOfFrame();
if (client == null)
{
#if (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP)
client = new MqttClient(brokerAddress,brokerPort,isEncrypted, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
#else
X509Certificate cert = new X509Certificate(Application.streamingAssetsPath + "/emqxsl-ca.crt");
Debug.LogWarning("ip地址或url:"+ brokerAddress);
client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
//System.Security.Cryptography.X509Certificates.X509Certificate cert = new System.Security.Cryptography.X509Certificates.X509Certificate();
// client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert,userCertificateSelectionCallback, MyRemoteCertificateValidationCallback);
#endif
}
else if (client.IsConnected)
{
Debug.LogError("IsConnected");
// yield break;
}
OnConnecting();
// 给Unity留点时间刷新UI
// yield return new WaitForEndOfFrame();
// yield return new WaitForEndOfFrame();
client.Settings.TimeoutOnConnection = timeoutOnConnection;
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId, mqttUserName, mqttPassword);
if (client.IsConnected)
{
client.ConnectionClosed += OnMqttConnectionClosed;
// register to message received
client.MqttMsgPublishReceived += OnMqttMessageReceived;
mqttClientConnected = true;
OnConnected();
}
else
{
OnConnectionFailed("CONNECTION FAILED!");
}
}
///
/// 用户证书选择回调
///
private bool userCertificateSelectionCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
throw new NotImplementedException();
}
///
/// 远程证书验证回调
///
private bool MyRemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
throw new NotImplementedException();
}
private IEnumerator DoDisconnect()
{
yield return new WaitForEndOfFrame();
CloseConnection();
OnDisconnected();
}
private void CloseConnection()
{
mqttClientConnected = false;
if (client != null)
{
if (client.IsConnected)
{
UnsubscribeTopics(SubTopics);
SubTopics = null;
client.Disconnect();
}
client.MqttMsgPublishReceived -= OnMqttMessageReceived;
client.ConnectionClosed -= OnMqttConnectionClosed;
client = null;
}
}
#if ((!UNITY_EDITOR && UNITY_WSA_10_0))
private void OnApplicationFocus(bool focus)
{
// 在UWP10(HoloLens)上,我们无法判断应用程序是真的关闭了还是最小化了
// (https://forum.unity.com/threads/onapplicationquit-and-ondestroy-are-not-called-on-uwp-10.462597/)
if (focus)
{
Connect();
}
else
{
CloseConnection();
}
}
#endif
#endregion
}