123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- using System;
- using System.IO;
- using System.Security.Cryptography.X509Certificates;
- using System.Threading;
- using UnityEngine;
- using uPLibrary.Networking.M2Mqtt;
- using uPLibrary.Networking.M2Mqtt.Messages;
- public class QTTManager
- {
- bool isConnect = false;
- bool isConnecting = false;
- /// <summary>
- /// 连接失败时激发的事件
- /// </summary>
- public event Action ConnectionFailed;
- /// <summary>
- /// 成功建立连接时激发的事件
- /// </summary>
- public event Action ConnectionSucceeded;
- MqttClient client;
- string brokerAddress;
- int brokerPort;
- bool isEncrypted;
- int timeoutOnConnection;
- string clientId;
- string mqttUserName = null; // MQTT代理的用户名。如果不需要用户名,请保留为空
- string mqttPassword = null; // MQTT代理的密码。如果不需要密码,请保持空
- // Start is called before the first frame update
- public QTTManager(string cid, string UName, string pwd, string url, string port)
- {
- brokerAddress = url;
- brokerPort = int.Parse(port);
- isEncrypted = false;
- timeoutOnConnection = MqttSettings.MQTT_CONNECT_TIMEOUT; // 连接超时(毫秒)
- this.clientId = cid;
- this.mqttUserName = UName;
- this.mqttPassword = pwd;
- }
- byte[] bytes;
- public Thread th;
- public void Connect(byte[] tbytes)
- {
- Debug.Log("HJJ Connect==>");
- if (isConnecting || isConnect)
- {
- return;
- }
- bytes = tbytes;//
- isConnecting = true;
- DoConnect();
- }
- public bool IsConnect()
- {
- if (isConnecting || isConnect)
- {
- return true;
- }
- return false;
- }
- // current message identifier generated
- private ushort messageIdCounter = 0;
- /// <summary>
- /// 订阅消息主题
- /// </summary>
- /// <param name="topics">要监听的主题数组</param>
- /// <param name="qosLevels">主题相关的 QOS levels</param>
- /// <returns>监听消息相关的消息ID</returns>
- public ushort Subscribe(string[] topics, byte[] qosLevels)
- {
- return client.Subscribe(topics, qosLevels);
- }
- /// <summary>
- /// 退订主题
- /// </summary>
- /// <param name="topics"></param>
- /// <returns></returns>
- public ushort Unsubscribe(string[] topics)
- {
- return client.Unsubscribe(topics);
- }
- public Action OnConnecting;
- /// <summary>
- /// 使用当前设置连接到代理。
- /// </summary>
- /// <returns>执行是在协程中完成的</returns>
- private void DoConnect()
- {
- Debug.Log("HJJ DoConnect==>");
- // 等待给定的延迟
- // yield return new WaitForSecondsRealtime(connectionDelay / 1000f);
- // 给Unity留点时间刷新UI
- // yield return new WaitForEndOfFrame();
- // create client instance
- X509Certificate cert = new X509Certificate(bytes);
- Debug.Log("IP地址或URL==>" + brokerAddress);
- isEncrypted = false;
- client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, cert, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
- OnConnecting?.Invoke();
- // 给Unity留点时间刷新UI
- // yield return new WaitForEndOfFrame();
- // yield return new WaitForEndOfFrame();
- client.Settings.TimeoutOnConnection = timeoutOnConnection;
- client.Connect(clientId, mqttUserName, mqttPassword);
- if (client.IsConnected)
- {
- Debug.Log("HJJ IsConnected==>");
- isConnecting = false;
- isConnect = true;
- client.ConnectionClosed += OnMqttConnectionClosed;
- // register to message received
- client.MqttMsgPublishReceived += OnMqttMessageReceived;
- OnConnected();
- }
- else
- {
- Debug.Log("HJJ FAILED==>");
- isConnecting = false;
- isConnect = false;
- OnConnectionFailed("CONNECTION FAILED!");
- }
- }
- public void DisConnect()
- {
- client?.Disconnect();
- }
- /// <summary>
- /// 如果连接失败,请重写此方法以执行某些操作
- /// </summary>
- protected virtual void OnConnectionFailed(string errorMessage)
- {
- Debug.LogError("MQTT连接失败");
- if (ConnectionFailed != null)
- {
- ConnectionFailed();
- }
- }
- /// <summary>
- /// 如果连接成功,请重写此方法以执行某些操作。
- /// </summary>
- protected virtual void OnConnected()
- {
- Debug.LogFormat("已经连接到 {0}:{1}...\n", brokerAddress, brokerPort.ToString());
- if (ConnectionSucceeded != null)
- {
- ConnectionSucceeded();
- }
- }
- public Action<MqttMsgPublishEventArgs> OnReceived;
- private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs e)
- {
- OnReceived?.Invoke(e);
- }
- public Action<EventArgs> OnClose;
- private void OnMqttConnectionClosed(object sender, EventArgs e)
- {
- isConnect = false;
- OnClose?.Invoke(e);
- }
- /// <summary>
- /// 异步发布消息
- /// </summary>
- /// <param name="topic">Message topic</param>
- /// <param name="message">Message data (payload)</param>
- /// <param name="qosLevel">QoS Level</param>
- /// <param name="retain">Retain flag</param>
- /// <returns>Message Id related to PUBLISH message</returns>
- public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
- {
- return client.Publish(topic, message, qosLevel, retain);
- }
- public class MQttReceived
- {
- public string topic;
- public byte[] message;
- }
- }
|