MqttClient.cs 121 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. ----------------------------------------------------------------------------
  13. Giovanni Paolo Vigano' - preprocessor directives for platform dependent compilation in Unity
  14. */
  15. using System;
  16. using System.Net;
  17. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  18. using System.Net.Sockets;
  19. using System.Security.Cryptography.X509Certificates;
  20. #endif
  21. using System.Threading;
  22. using uPLibrary.Networking.M2Mqtt.Exceptions;
  23. using uPLibrary.Networking.M2Mqtt.Messages;
  24. using uPLibrary.Networking.M2Mqtt.Session;
  25. using uPLibrary.Networking.M2Mqtt.Utility;
  26. using uPLibrary.Networking.M2Mqtt.Internal;
  27. // if .Net Micro Framework
  28. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  29. using Microsoft.SPOT;
  30. #if SSL
  31. using Microsoft.SPOT.Net.Security;
  32. #endif
  33. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  34. #else
  35. using System.Collections.Generic;
  36. #if (SSL && !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP)))
  37. using System.Security.Authentication;
  38. using System.Net.Security;
  39. #endif
  40. #endif
  41. #if (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP))
  42. using Windows.Networking.Sockets;
  43. #endif
  44. using System.Collections;
  45. // alias needed due to Microsoft.SPOT.Trace in .Net Micro Framework
  46. // (it's ambiguos with uPLibrary.Networking.M2Mqtt.Utility.Trace)
  47. using MqttUtility = uPLibrary.Networking.M2Mqtt.Utility;
  48. using System.IO;
  49. using System.Net.Security;
  50. using UnityEngine;
  51. namespace uPLibrary.Networking.M2Mqtt
  52. {
  53. /// <summary>
  54. /// MQTT Client
  55. /// </summary>
  56. public class MqttClient
  57. {
  58. #if BROKER
  59. #region Constants ...
  60. // thread names
  61. private const string RECEIVE_THREAD_NAME = "ReceiveThread";
  62. private const string RECEIVE_EVENT_THREAD_NAME = "DispatchEventThread";
  63. private const string PROCESS_INFLIGHT_THREAD_NAME = "ProcessInflightThread";
  64. private const string KEEP_ALIVE_THREAD = "KeepAliveThread";
  65. #endregion
  66. #endif
  67. /// <summary>
  68. /// Delagate that defines event handler for PUBLISH message received
  69. /// </summary>
  70. public delegate void MqttMsgPublishEventHandler(object sender, MqttMsgPublishEventArgs e);
  71. /// <summary>
  72. /// Delegate that defines event handler for published message
  73. /// </summary>
  74. public delegate void MqttMsgPublishedEventHandler(object sender, MqttMsgPublishedEventArgs e);
  75. /// <summary>
  76. /// Delagate that defines event handler for subscribed topic
  77. /// </summary>
  78. public delegate void MqttMsgSubscribedEventHandler(object sender, MqttMsgSubscribedEventArgs e);
  79. /// <summary>
  80. /// Delagate that defines event handler for unsubscribed topic
  81. /// </summary>
  82. public delegate void MqttMsgUnsubscribedEventHandler(object sender, MqttMsgUnsubscribedEventArgs e);
  83. #if BROKER
  84. /// <summary>
  85. /// Delagate that defines event handler for SUBSCRIBE message received
  86. /// </summary>
  87. public delegate void MqttMsgSubscribeEventHandler(object sender, MqttMsgSubscribeEventArgs e);
  88. /// <summary>
  89. /// Delagate that defines event handler for UNSUBSCRIBE message received
  90. /// </summary>
  91. public delegate void MqttMsgUnsubscribeEventHandler(object sender, MqttMsgUnsubscribeEventArgs e);
  92. /// <summary>
  93. /// Delagate that defines event handler for CONNECT message received
  94. /// </summary>
  95. public delegate void MqttMsgConnectEventHandler(object sender, MqttMsgConnectEventArgs e);
  96. /// <summary>
  97. /// Delegate that defines event handler for client disconnection (DISCONNECT message or not)
  98. /// </summary>
  99. public delegate void MqttMsgDisconnectEventHandler(object sender, EventArgs e);
  100. #endif
  101. /// <summary>
  102. /// Delegate that defines event handler for cliet/peer disconnection
  103. /// </summary>
  104. public delegate void ConnectionClosedEventHandler(object sender, EventArgs e);
  105. // broker hostname (or ip address) and port
  106. private string brokerHostName;
  107. private int brokerPort;
  108. // running status of threads
  109. private bool isRunning;
  110. // event for raising received message event
  111. private AutoResetEvent receiveEventWaitHandle;
  112. // event for starting process inflight queue asynchronously
  113. private AutoResetEvent inflightWaitHandle;
  114. // event for signaling synchronous receive
  115. AutoResetEvent syncEndReceiving;
  116. // message received
  117. MqttMsgBase msgReceived;
  118. // exeption thrown during receiving
  119. Exception exReceiving;
  120. // keep alive period (in ms)
  121. private int keepAlivePeriod;
  122. // events for signaling on keep alive thread
  123. private AutoResetEvent keepAliveEvent;
  124. private AutoResetEvent keepAliveEventEnd;
  125. // last communication time in ticks
  126. private int lastCommTime;
  127. // event for PUBLISH message received
  128. public event MqttMsgPublishEventHandler MqttMsgPublishReceived;
  129. // event for published message
  130. public event MqttMsgPublishedEventHandler MqttMsgPublished;
  131. // event for subscribed topic
  132. public event MqttMsgSubscribedEventHandler MqttMsgSubscribed;
  133. // event for unsubscribed topic
  134. public event MqttMsgUnsubscribedEventHandler MqttMsgUnsubscribed;
  135. #if BROKER
  136. // event for SUBSCRIBE message received
  137. public event MqttMsgSubscribeEventHandler MqttMsgSubscribeReceived;
  138. // event for USUBSCRIBE message received
  139. public event MqttMsgUnsubscribeEventHandler MqttMsgUnsubscribeReceived;
  140. // event for CONNECT message received
  141. public event MqttMsgConnectEventHandler MqttMsgConnected;
  142. // event for DISCONNECT message received
  143. public event MqttMsgDisconnectEventHandler MqttMsgDisconnected;
  144. #endif
  145. // event for peer/client disconnection
  146. public event ConnectionClosedEventHandler ConnectionClosed;
  147. // channel to communicate over the network
  148. private IMqttNetworkChannel channel;
  149. // inflight messages queue
  150. private Queue inflightQueue;
  151. // internal queue for received messages about inflight messages
  152. private Queue internalQueue;
  153. // internal queue for dispatching events
  154. private Queue eventQueue;
  155. // session
  156. private MqttClientSession session;
  157. // reference to avoid access to singleton via property
  158. private MqttSettings settings;
  159. // current message identifier generated
  160. private ushort messageIdCounter = 0;
  161. // connection is closing due to peer
  162. private bool isConnectionClosing;
  163. /// <summary>
  164. /// Connection status between client and broker
  165. /// </summary>
  166. public bool IsConnected { get; private set; }
  167. /// <summary>
  168. /// Client identifier
  169. /// </summary>
  170. public string ClientId { get; private set; }
  171. /// <summary>
  172. /// Clean session flag
  173. /// </summary>
  174. public bool CleanSession { get; private set; }
  175. /// <summary>
  176. /// Will flag
  177. /// </summary>
  178. public bool WillFlag { get; private set; }
  179. /// <summary>
  180. /// Will QOS level
  181. /// </summary>
  182. public byte WillQosLevel { get; private set; }
  183. /// <summary>
  184. /// Will topic
  185. /// </summary>
  186. public string WillTopic { get; private set; }
  187. /// <summary>
  188. /// Will message
  189. /// </summary>
  190. public string WillMessage { get; private set; }
  191. /// <summary>
  192. /// MQTT protocol version
  193. /// </summary>
  194. public MqttProtocolVersion ProtocolVersion { get; set; }
  195. #if BROKER
  196. /// <summary>
  197. /// MQTT Client Session
  198. /// </summary>
  199. public MqttClientSession Session
  200. {
  201. get { return this.session; }
  202. set { this.session = value; }
  203. }
  204. #endif
  205. /// <summary>
  206. /// MQTT client settings
  207. /// </summary>
  208. public MqttSettings Settings
  209. {
  210. get { return this.settings; }
  211. }
  212. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  213. /// <summary>
  214. /// Constructor
  215. /// </summary>
  216. /// <param name="brokerIpAddress">Broker IP address</param>
  217. [Obsolete("Use this ctor MqttClient(string brokerHostName) insted")]
  218. public MqttClient(IPAddress brokerIpAddress) :
  219. this(brokerIpAddress, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None)
  220. {
  221. }
  222. /// <summary>
  223. /// Constructor
  224. /// </summary>
  225. /// <param name="brokerIpAddress">Broker IP address</param>
  226. /// <param name="brokerPort">Broker port</param>
  227. /// <param name="secure">Using secure connection</param>
  228. /// <param name="caCert">CA certificate for secure connection</param>
  229. /// <param name="clientCert">Client certificate</param>
  230. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  231. [Obsolete("Use this ctor MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert) insted")]
  232. public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  233. {
  234. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  235. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
  236. #else
  237. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol);
  238. #endif
  239. }
  240. #endif
  241. /// <summary>
  242. /// Constructor
  243. /// </summary>
  244. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  245. public MqttClient(string brokerHostName) :
  246. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  247. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None)
  248. #else
  249. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, MqttSslProtocols.None)
  250. #endif
  251. {
  252. }
  253. /// <summary>
  254. /// Constructor
  255. /// </summary>
  256. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  257. /// <param name="brokerPort">Broker port</param>
  258. /// <param name="secure">使用安全连接</param>
  259. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  260. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP))
  261. /// <param name="caCert">CA certificate for secure connection</param>
  262. /// <param name="clientCert">Client certificate</param>
  263. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  264. #else
  265. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  266. #endif
  267. {
  268. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  269. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
  270. #elif (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  271. this.Init(brokerHostName, brokerPort, secure, sslProtocol);
  272. #else
  273. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol);
  274. #endif
  275. }
  276. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  277. /// <summary>
  278. /// Constructor
  279. /// </summary>
  280. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  281. /// <param name="brokerPort">Broker port</param>
  282. /// <param name="secure">Using secure connection</param>
  283. /// <param name="caCert">CA certificate for secure connection</param>
  284. /// <param name="clientCert">Client certificate</param>
  285. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  286. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  287. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  288. RemoteCertificateValidationCallback userCertificateValidationCallback)
  289. : this(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, null)
  290. {
  291. }
  292. /// <summary>
  293. /// Constructor
  294. /// </summary>
  295. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  296. /// <param name="brokerPort">Broker port</param>
  297. /// <param name="secure">Using secure connection</param>
  298. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  299. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  300. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  301. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol,
  302. RemoteCertificateValidationCallback userCertificateValidationCallback,
  303. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  304. : this(brokerHostName, brokerPort, secure, null, null, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback)
  305. {
  306. }
  307. /// <summary>
  308. /// Constructor
  309. /// </summary>
  310. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  311. /// <param name="brokerPort">Broker port</param>
  312. /// <param name="secure">Using secure connection</param>
  313. /// <param name="caCert">CA certificate for secure connection</param>
  314. /// <param name="clientCert">Client certificate</param>
  315. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  316. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  317. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  318. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  319. RemoteCertificateValidationCallback userCertificateValidationCallback,
  320. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  321. {
  322. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  323. }
  324. #endif
  325. #if BROKER
  326. /// <summary>
  327. /// Constructor
  328. /// </summary>
  329. /// <param name="channel">Network channel for communication</param>
  330. public MqttClient(IMqttNetworkChannel channel)
  331. {
  332. // set default MQTT protocol version (default is 3.1.1)
  333. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  334. this.channel = channel;
  335. // reference to MQTT settings
  336. this.settings = MqttSettings.Instance;
  337. // client not connected yet (CONNACK not send from client), some default values
  338. this.IsConnected = false;
  339. this.ClientId = null;
  340. this.CleanSession = true;
  341. this.keepAliveEvent = new AutoResetEvent(false);
  342. // queue for handling inflight messages (publishing and acknowledge)
  343. this.inflightWaitHandle = new AutoResetEvent(false);
  344. this.inflightQueue = new Queue();
  345. // queue for received message
  346. this.receiveEventWaitHandle = new AutoResetEvent(false);
  347. this.eventQueue = new Queue();
  348. this.internalQueue = new Queue();
  349. // session
  350. this.session = null;
  351. }
  352. #endif
  353. /// <summary>
  354. /// MqttClient initialization
  355. /// </summary>
  356. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  357. /// <param name="brokerPort">Broker port</param>
  358. /// <param name="secure">>Using secure connection</param>
  359. /// <param name="caCert">CA certificate for secure connection</param>
  360. /// <param name="clientCert">Client certificate</param>
  361. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  362. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || ((!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP)))
  363. /// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  364. /// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  365. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  366. RemoteCertificateValidationCallback userCertificateValidationCallback,
  367. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  368. #elif (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  369. private void Init(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  370. #else
  371. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  372. #endif
  373. {
  374. // set default MQTT protocol version (default is 3.1.1)
  375. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  376. #if !SSL
  377. // check security parameters
  378. if (secure)
  379. throw new ArgumentException("Library compiled without SSL support");
  380. #endif
  381. this.brokerHostName = brokerHostName;
  382. this.brokerPort = brokerPort;
  383. // reference to MQTT settings
  384. this.settings = MqttSettings.Instance;
  385. // set settings port based on secure connection or not
  386. if (!secure)
  387. this.settings.Port = this.brokerPort;
  388. else
  389. this.settings.SslPort = this.brokerPort;
  390. this.syncEndReceiving = new AutoResetEvent(false);
  391. this.keepAliveEvent = new AutoResetEvent(false);
  392. // queue for handling inflight messages (publishing and acknowledge)
  393. this.inflightWaitHandle = new AutoResetEvent(false);
  394. this.inflightQueue = new Queue();
  395. // queue for received message
  396. this.receiveEventWaitHandle = new AutoResetEvent(false);
  397. this.eventQueue = new Queue();
  398. this.internalQueue = new Queue();
  399. // session
  400. this.session = null;
  401. // create network channel
  402. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  403. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  404. #elif (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  405. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, sslProtocol);
  406. #else
  407. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol);
  408. #endif
  409. }
  410. /// <summary>
  411. /// Connect to broker
  412. /// </summary>
  413. /// <param name="clientId">Client identifier</param>
  414. /// <returns>Return code of CONNACK message from broker</returns>
  415. public byte Connect(string clientId)
  416. {
  417. return this.Connect(clientId, null, null, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  418. }
  419. /// <summary>
  420. /// Connect to broker
  421. /// </summary>
  422. /// <param name="clientId">Client identifier</param>
  423. /// <param name="username">Username</param>
  424. /// <param name="password">Password</param>
  425. /// <returns>Return code of CONNACK message from broker</returns>
  426. public byte Connect(string clientId,
  427. string username,
  428. string password)
  429. {
  430. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  431. }
  432. /// <summary>
  433. /// Connect to broker
  434. /// </summary>
  435. /// <param name="clientId">Client identifier</param>
  436. /// <param name="username">Username</param>
  437. /// <param name="password">Password</param>
  438. /// <param name="cleanSession">Clean sessione flag</param>
  439. /// <param name="keepAlivePeriod">Keep alive period</param>
  440. /// <returns>Return code of CONNACK message from broker</returns>
  441. public byte Connect(string clientId,
  442. string username,
  443. string password,
  444. bool cleanSession,
  445. ushort keepAlivePeriod)
  446. {
  447. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, cleanSession, keepAlivePeriod);
  448. }
  449. /// <summary>
  450. /// Connect to broker
  451. /// </summary>
  452. /// <param name="clientId">Client identifier</param>
  453. /// <param name="username">Username</param>
  454. /// <param name="password">Password</param>
  455. /// <param name="willRetain">Will retain flag</param>
  456. /// <param name="willQosLevel">Will QOS level</param>
  457. /// <param name="willFlag">Will flag</param>
  458. /// <param name="willTopic">Will topic</param>
  459. /// <param name="willMessage">Will message</param>
  460. /// <param name="cleanSession">Clean sessione flag</param>
  461. /// <param name="keepAlivePeriod">Keep alive period</param>
  462. /// <returns>Return code of CONNACK message from broker</returns>
  463. public byte Connect(string clientId,
  464. string username,
  465. string password,
  466. bool willRetain,
  467. byte willQosLevel,
  468. bool willFlag,
  469. string willTopic,
  470. string willMessage,
  471. bool cleanSession,
  472. ushort keepAlivePeriod)
  473. {
  474. // create CONNECT message
  475. MqttMsgConnect connect = new MqttMsgConnect(clientId,
  476. username,
  477. password,
  478. willRetain,
  479. willQosLevel,
  480. willFlag,
  481. willTopic,
  482. willMessage,
  483. cleanSession,
  484. keepAlivePeriod,
  485. (byte)this.ProtocolVersion);
  486. try
  487. {
  488. // connect to the broker
  489. this.channel.Connect();
  490. }
  491. catch (Exception ex)
  492. {
  493. throw new MqttConnectionException("Exception connecting to the broker", ex);
  494. }
  495. this.lastCommTime = 0;
  496. this.isRunning = true;
  497. this.isConnectionClosing = false;
  498. // start thread for receiving messages from broker
  499. Fx.StartThread(this.ReceiveThread);
  500. MqttMsgConnack connack = (MqttMsgConnack)this.SendReceive(connect);
  501. // if connection accepted, start keep alive timer and
  502. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  503. {
  504. // set all client properties
  505. this.ClientId = clientId;
  506. this.CleanSession = cleanSession;
  507. this.WillFlag = willFlag;
  508. this.WillTopic = willTopic;
  509. this.WillMessage = willMessage;
  510. this.WillQosLevel = willQosLevel;
  511. this.keepAlivePeriod = keepAlivePeriod * 1000; // convert in ms
  512. // restore previous session
  513. this.RestoreSession();
  514. // keep alive period equals zero means turning off keep alive mechanism
  515. if (this.keepAlivePeriod != 0)
  516. {
  517. // start thread for sending keep alive message to the broker
  518. Fx.StartThread(this.KeepAliveThread);
  519. }
  520. // start thread for raising received message event from broker
  521. Fx.StartThread(this.DispatchEventThread);
  522. // start thread for handling inflight messages queue to broker asynchronously (publish and acknowledge)
  523. Fx.StartThread(this.ProcessInflightThread);
  524. this.IsConnected = true;
  525. }
  526. return connack.ReturnCode;
  527. }
  528. /// <summary>
  529. /// Disconnect from broker
  530. /// </summary>
  531. public void Disconnect()
  532. {
  533. MqttMsgDisconnect disconnect = new MqttMsgDisconnect();
  534. this.Send(disconnect);
  535. // close client
  536. this.OnConnectionClosing();
  537. }
  538. #if BROKER
  539. /// <summary>
  540. /// Open client communication
  541. /// </summary>
  542. public void Open()
  543. {
  544. this.isRunning = true;
  545. // start thread for receiving messages from client
  546. Fx.StartThread(this.ReceiveThread);
  547. // start thread for raising received message event from client
  548. Fx.StartThread(this.DispatchEventThread);
  549. // start thread for handling inflight messages queue to client asynchronously (publish and acknowledge)
  550. Fx.StartThread(this.ProcessInflightThread);
  551. }
  552. #endif
  553. /// <summary>
  554. /// Close client
  555. /// </summary>
  556. #if BROKER
  557. public void Close()
  558. #else
  559. private void Close()
  560. #endif
  561. {
  562. // stop receiving thread
  563. this.isRunning = false;
  564. // wait end receive event thread
  565. if (this.receiveEventWaitHandle != null)
  566. this.receiveEventWaitHandle.Set();
  567. // wait end process inflight thread
  568. if (this.inflightWaitHandle != null)
  569. this.inflightWaitHandle.Set();
  570. #if BROKER
  571. // unlock keep alive thread
  572. this.keepAliveEvent.Set();
  573. #else
  574. // unlock keep alive thread and wait
  575. this.keepAliveEvent.Set();
  576. if (this.keepAliveEventEnd != null)
  577. this.keepAliveEventEnd.WaitOne();
  578. #endif
  579. // clear all queues
  580. this.inflightQueue.Clear();
  581. this.internalQueue.Clear();
  582. this.eventQueue.Clear();
  583. // close network channel
  584. this.channel.Close();
  585. this.IsConnected = false;
  586. }
  587. /// <summary>
  588. /// Execute ping to broker for keep alive
  589. /// </summary>
  590. /// <returns>PINGRESP message from broker</returns>
  591. private MqttMsgPingResp Ping()
  592. {
  593. MqttMsgPingReq pingreq = new MqttMsgPingReq();
  594. try
  595. {
  596. // broker must send PINGRESP within timeout equal to keep alive period
  597. return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod);
  598. }
  599. catch (Exception e)
  600. {
  601. #if TRACE
  602. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  603. #endif
  604. // client must close connection
  605. this.OnConnectionClosing();
  606. return null;
  607. }
  608. }
  609. #if BROKER
  610. /// <summary>
  611. /// Send CONNACK message to the client (connection accepted or not)
  612. /// </summary>
  613. /// <param name="connect">CONNECT message with all client information</param>
  614. /// <param name="returnCode">Return code for CONNACK message</param>
  615. /// <param name="clientId">If not null, client id assigned by broker</param>
  616. /// <param name="sessionPresent">Session present on the broker</param>
  617. public void Connack(MqttMsgConnect connect, byte returnCode, string clientId, bool sessionPresent)
  618. {
  619. this.lastCommTime = 0;
  620. // create CONNACK message and ...
  621. MqttMsgConnack connack = new MqttMsgConnack();
  622. connack.ReturnCode = returnCode;
  623. // [v3.1.1] session present flag
  624. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1_1)
  625. connack.SessionPresent = sessionPresent;
  626. // ... send it to the client
  627. this.Send(connack);
  628. // connection accepted, start keep alive thread checking
  629. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  630. {
  631. // [v3.1.1] if client id isn't null, the CONNECT message has a cliend id with zero bytes length
  632. // and broker assigned a unique identifier to the client
  633. this.ClientId = (clientId == null) ? connect.ClientId : clientId;
  634. this.CleanSession = connect.CleanSession;
  635. this.WillFlag = connect.WillFlag;
  636. this.WillTopic = connect.WillTopic;
  637. this.WillMessage = connect.WillMessage;
  638. this.WillQosLevel = connect.WillQosLevel;
  639. this.keepAlivePeriod = connect.KeepAlivePeriod * 1000; // convert in ms
  640. // broker has a tolerance of 1.5 specified keep alive period
  641. this.keepAlivePeriod += (this.keepAlivePeriod / 2);
  642. // start thread for checking keep alive period timeout
  643. Fx.StartThread(this.KeepAliveThread);
  644. this.isConnectionClosing = false;
  645. this.IsConnected = true;
  646. }
  647. // connection refused, close TCP/IP channel
  648. else
  649. {
  650. this.Close();
  651. }
  652. }
  653. /// <summary>
  654. /// Send SUBACK message to the client
  655. /// </summary>
  656. /// <param name="messageId">Message Id for the SUBSCRIBE message that is being acknowledged</param>
  657. /// <param name="grantedQosLevels">Granted QoS Levels</param>
  658. public void Suback(ushort messageId, byte[] grantedQosLevels)
  659. {
  660. MqttMsgSuback suback = new MqttMsgSuback();
  661. suback.MessageId = messageId;
  662. suback.GrantedQoSLevels = grantedQosLevels;
  663. this.Send(suback);
  664. }
  665. /// <summary>
  666. /// Send UNSUBACK message to the client
  667. /// </summary>
  668. /// <param name="messageId">Message Id for the UNSUBSCRIBE message that is being acknowledged</param>
  669. public void Unsuback(ushort messageId)
  670. {
  671. MqttMsgUnsuback unsuback = new MqttMsgUnsuback();
  672. unsuback.MessageId = messageId;
  673. this.Send(unsuback);
  674. }
  675. #endif
  676. /// <summary>
  677. /// 订阅消息主题
  678. /// </summary>
  679. /// <param name="topics">要监听的主题数组</param>
  680. /// <param name="qosLevels">主题相关的 QOS levels</param>
  681. /// <returns>监听消息相关的消息ID</returns>
  682. public ushort Subscribe(string[] topics, byte[] qosLevels)
  683. {
  684. MqttMsgSubscribe subscribe = new MqttMsgSubscribe(topics, qosLevels);
  685. subscribe.MessageId = this.GetMessageId();
  686. // 将订阅请求排入机上队列
  687. this.EnqueueInflight(subscribe, MqttMsgFlow.ToPublish);
  688. //UnityEngine.Debug.LogError("订阅消息主题"+subscribe);
  689. return subscribe.MessageId;
  690. }
  691. /// <summary>
  692. /// Unsubscribe for message topics
  693. /// </summary>
  694. /// <param name="topics">List of topics to unsubscribe</param>
  695. /// <returns>Message Id in UNSUBACK message from broker</returns>
  696. public ushort Unsubscribe(string[] topics)
  697. {
  698. MqttMsgUnsubscribe unsubscribe =
  699. new MqttMsgUnsubscribe(topics);
  700. unsubscribe.MessageId = this.GetMessageId();
  701. // enqueue unsubscribe request into the inflight queue
  702. this.EnqueueInflight(unsubscribe, MqttMsgFlow.ToPublish);
  703. return unsubscribe.MessageId;
  704. }
  705. /// <summary>
  706. /// 异步发布消息(QoS级别0且未保留)
  707. /// Publish a message asynchronously (QoS Level 0 and not retained)
  708. /// </summary>
  709. /// <param name="topic">Message topic</param>
  710. /// <param name="message">Message data (payload)</param>
  711. /// <returns>Message Id related to PUBLISH message</returns>
  712. public ushort Publish(string topic, byte[] message)
  713. {
  714. return this.Publish(topic, message, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
  715. }
  716. /// <summary>
  717. /// 异步发布消息
  718. /// </summary>
  719. /// <param name="topic">Message topic</param>
  720. /// <param name="message">Message data (payload)</param>
  721. /// <param name="qosLevel">QoS Level</param>
  722. /// <param name="retain">Retain flag</param>
  723. /// <returns>Message Id related to PUBLISH message</returns>
  724. public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
  725. {
  726. MqttMsgPublish publish =
  727. new MqttMsgPublish(topic, message, false, qosLevel, retain);
  728. publish.MessageId = this.GetMessageId();
  729. // enqueue message to publish into the inflight queue
  730. bool enqueue = this.EnqueueInflight(publish, MqttMsgFlow.ToPublish);
  731. // message enqueued
  732. if (enqueue)
  733. return publish.MessageId;
  734. // infligh queue full, message not enqueued
  735. else
  736. throw new MqttClientException(MqttClientErrorCode.InflightQueueFull);
  737. }
  738. /// <summary>
  739. /// Wrapper method for raising events
  740. /// </summary>
  741. /// <param name="internalEvent">Internal event</param>
  742. private void OnInternalEvent(InternalEvent internalEvent)
  743. {
  744. lock (this.eventQueue)
  745. {
  746. this.eventQueue.Enqueue(internalEvent);
  747. }
  748. this.receiveEventWaitHandle.Set();
  749. }
  750. /// <summary>
  751. /// Wrapper method for raising closing connection event
  752. /// </summary>
  753. private void OnConnectionClosing()
  754. {
  755. if (!this.isConnectionClosing)
  756. {
  757. this.isConnectionClosing = true;
  758. this.receiveEventWaitHandle.Set();
  759. }
  760. }
  761. /// <summary>
  762. /// Wrapper method for raising PUBLISH message received event
  763. /// </summary>
  764. /// <param name="publish">PUBLISH message received</param>
  765. private void OnMqttMsgPublishReceived(MqttMsgPublish publish)
  766. {
  767. if (this.MqttMsgPublishReceived != null)
  768. {
  769. this.MqttMsgPublishReceived(this,
  770. new MqttMsgPublishEventArgs(publish.Topic, publish.Message, publish.DupFlag, publish.QosLevel, publish.Retain));
  771. }
  772. }
  773. /// <summary>
  774. /// Wrapper method for raising published message event
  775. /// </summary>
  776. /// <param name="messageId">Message identifier for published message</param>
  777. /// <param name="isPublished">Publish flag</param>
  778. private void OnMqttMsgPublished(ushort messageId, bool isPublished)
  779. {
  780. if (this.MqttMsgPublished != null)
  781. {
  782. this.MqttMsgPublished(this,
  783. new MqttMsgPublishedEventArgs(messageId, isPublished));
  784. }
  785. }
  786. /// <summary>
  787. /// Wrapper method for raising subscribed topic event
  788. /// </summary>
  789. /// <param name="suback">SUBACK message received</param>
  790. private void OnMqttMsgSubscribed(MqttMsgSuback suback)
  791. {
  792. if (this.MqttMsgSubscribed != null)
  793. {
  794. this.MqttMsgSubscribed(this,
  795. new MqttMsgSubscribedEventArgs(suback.MessageId, suback.GrantedQoSLevels));
  796. }
  797. }
  798. /// <summary>
  799. /// Wrapper method for raising unsubscribed topic event
  800. /// </summary>
  801. /// <param name="messageId">Message identifier for unsubscribed topic</param>
  802. private void OnMqttMsgUnsubscribed(ushort messageId)
  803. {
  804. if (this.MqttMsgUnsubscribed != null)
  805. {
  806. this.MqttMsgUnsubscribed(this,
  807. new MqttMsgUnsubscribedEventArgs(messageId));
  808. }
  809. }
  810. #if BROKER
  811. /// <summary>
  812. /// Wrapper method for raising SUBSCRIBE message event
  813. /// </summary>
  814. /// <param name="messageId">Message identifier for subscribe topics request</param>
  815. /// <param name="topics">Topics requested to subscribe</param>
  816. /// <param name="qosLevels">List of QOS Levels requested</param>
  817. private void OnMqttMsgSubscribeReceived(ushort messageId, string[] topics, byte[] qosLevels)
  818. {
  819. if (this.MqttMsgSubscribeReceived != null)
  820. {
  821. this.MqttMsgSubscribeReceived(this,
  822. new MqttMsgSubscribeEventArgs(messageId, topics, qosLevels));
  823. }
  824. }
  825. /// <summary>
  826. /// Wrapper method for raising UNSUBSCRIBE message event
  827. /// </summary>
  828. /// <param name="messageId">Message identifier for unsubscribe topics request</param>
  829. /// <param name="topics">Topics requested to unsubscribe</param>
  830. private void OnMqttMsgUnsubscribeReceived(ushort messageId, string[] topics)
  831. {
  832. if (this.MqttMsgUnsubscribeReceived != null)
  833. {
  834. this.MqttMsgUnsubscribeReceived(this,
  835. new MqttMsgUnsubscribeEventArgs(messageId, topics));
  836. }
  837. }
  838. /// <summary>
  839. /// Wrapper method for raising CONNECT message event
  840. /// </summary>
  841. private void OnMqttMsgConnected(MqttMsgConnect connect)
  842. {
  843. if (this.MqttMsgConnected != null)
  844. {
  845. this.ProtocolVersion = (MqttProtocolVersion)connect.ProtocolVersion;
  846. this.MqttMsgConnected(this, new MqttMsgConnectEventArgs(connect));
  847. }
  848. }
  849. /// <summary>
  850. /// Wrapper method for raising DISCONNECT message event
  851. /// </summary>
  852. private void OnMqttMsgDisconnected()
  853. {
  854. if (this.MqttMsgDisconnected != null)
  855. {
  856. this.MqttMsgDisconnected(this, EventArgs.Empty);
  857. }
  858. }
  859. #endif
  860. /// <summary>
  861. /// Wrapper method for peer/client disconnection
  862. /// </summary>
  863. private void OnConnectionClosed()
  864. {
  865. if (this.ConnectionClosed != null)
  866. {
  867. this.ConnectionClosed(this, EventArgs.Empty);
  868. }
  869. }
  870. /// <summary>
  871. /// Send a message
  872. /// </summary>
  873. /// <param name="msgBytes">Message bytes</param>
  874. private void Send(byte[] msgBytes)
  875. {
  876. try
  877. {
  878. // send message
  879. this.channel.Send(msgBytes);
  880. #if !BROKER
  881. // update last message sent ticks
  882. this.lastCommTime = Environment.TickCount;
  883. #endif
  884. }
  885. catch (Exception e)
  886. {
  887. #if TRACE
  888. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  889. #endif
  890. throw new MqttCommunicationException(e);
  891. }
  892. }
  893. /// <summary>
  894. /// Send a message
  895. /// </summary>
  896. /// <param name="msg">Message</param>
  897. private void Send(MqttMsgBase msg)
  898. {
  899. #if TRACE
  900. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  901. #endif
  902. this.Send(msg.GetBytes((byte)this.ProtocolVersion));
  903. }
  904. /// <summary>
  905. /// Send a message to the broker and wait answer
  906. /// </summary>
  907. /// <param name="msgBytes">Message bytes</param>
  908. /// <returns>MQTT message response</returns>
  909. private MqttMsgBase SendReceive(byte[] msgBytes)
  910. {
  911. return this.SendReceive(msgBytes, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  912. }
  913. /// <summary>
  914. /// Send a message to the broker and wait answer
  915. /// </summary>
  916. /// <param name="msgBytes">Message bytes</param>
  917. /// <param name="timeout">Timeout for receiving answer</param>
  918. /// <returns>MQTT message response</returns>
  919. private MqttMsgBase SendReceive(byte[] msgBytes, int timeout)
  920. {
  921. // reset handle before sending
  922. this.syncEndReceiving.Reset();
  923. try
  924. {
  925. // send message
  926. this.channel.Send(msgBytes);
  927. // update last message sent ticks
  928. this.lastCommTime = Environment.TickCount;
  929. }
  930. catch (Exception e)
  931. {
  932. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  933. if (typeof(SocketException) == e.GetType())
  934. {
  935. // connection reset by broker
  936. if (((SocketException)e).SocketErrorCode == SocketError.ConnectionReset)
  937. this.IsConnected = false;
  938. }
  939. #endif
  940. #if TRACE
  941. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  942. #endif
  943. throw new MqttCommunicationException(e);
  944. }
  945. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  946. // wait for answer from broker
  947. if (this.syncEndReceiving.WaitOne(timeout, false))
  948. #else
  949. // wait for answer from broker
  950. if (this.syncEndReceiving.WaitOne(timeout))
  951. #endif
  952. {
  953. // message received without exception
  954. if (this.exReceiving == null)
  955. return this.msgReceived;
  956. // receiving thread catched exception
  957. else
  958. throw this.exReceiving;
  959. }
  960. else
  961. {
  962. // throw timeout exception
  963. throw new MqttCommunicationException();
  964. }
  965. }
  966. /// <summary>
  967. /// Send a message to the broker and wait answer
  968. /// </summary>
  969. /// <param name="msg">Message</param>
  970. /// <returns>MQTT message response</returns>
  971. private MqttMsgBase SendReceive(MqttMsgBase msg)
  972. {
  973. return this.SendReceive(msg, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  974. }
  975. /// <summary>
  976. /// Send a message to the broker and wait answer
  977. /// </summary>
  978. /// <param name="msg">Message</param>
  979. /// <param name="timeout">Timeout for receiving answer</param>
  980. /// <returns>MQTT message response</returns>
  981. private MqttMsgBase SendReceive(MqttMsgBase msg, int timeout)
  982. {
  983. #if TRACE
  984. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  985. #endif
  986. return this.SendReceive(msg.GetBytes((byte)this.ProtocolVersion), timeout);
  987. }
  988. /// <summary>
  989. /// Enqueue a message into the inflight queue
  990. /// </summary>
  991. /// <param name="msg">Message to enqueue</param>
  992. /// <param name="flow">Message flow (publish, acknowledge)</param>
  993. /// <returns>Message enqueued or not</returns>
  994. private bool EnqueueInflight(MqttMsgBase msg, MqttMsgFlow flow)
  995. {
  996. // enqueue is needed (or not)
  997. bool enqueue = true;
  998. // if it is a PUBLISH message with QoS Level 2
  999. if ((msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1000. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  1001. {
  1002. lock (this.inflightQueue)
  1003. {
  1004. // if it is a PUBLISH message already received (it is in the inflight queue), the publisher
  1005. // re-sent it because it didn't received the PUBREC. In this case, we have to re-send PUBREC
  1006. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1007. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1008. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  1009. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1010. // the PUBLISH message is alredy in the inflight queue, we don't need to re-enqueue but we need
  1011. // to change state to re-send PUBREC
  1012. if (msgCtx != null)
  1013. {
  1014. msgCtx.State = MqttMsgState.QueuedQos2;
  1015. msgCtx.Flow = MqttMsgFlow.ToAcknowledge;
  1016. enqueue = false;
  1017. }
  1018. }
  1019. }
  1020. if (enqueue)
  1021. {
  1022. // set a default state
  1023. MqttMsgState state = MqttMsgState.QueuedQos0;
  1024. // based on QoS level, the messages flow between broker and client changes
  1025. switch (msg.QosLevel)
  1026. {
  1027. // QoS Level 0
  1028. case MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE:
  1029. state = MqttMsgState.QueuedQos0;
  1030. break;
  1031. // QoS Level 1
  1032. case MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE:
  1033. state = MqttMsgState.QueuedQos1;
  1034. break;
  1035. // QoS Level 2
  1036. case MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE:
  1037. state = MqttMsgState.QueuedQos2;
  1038. break;
  1039. }
  1040. // [v3.1.1] SUBSCRIBE and UNSUBSCRIBE aren't "officially" QOS = 1
  1041. // so QueuedQos1 state isn't valid for them
  1042. if (msg.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1043. state = MqttMsgState.SendSubscribe;
  1044. else if (msg.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1045. state = MqttMsgState.SendUnsubscribe;
  1046. // queue message context
  1047. MqttMsgContext msgContext = new MqttMsgContext()
  1048. {
  1049. Message = msg,
  1050. State = state,
  1051. Flow = flow,
  1052. Attempt = 0
  1053. };
  1054. lock (this.inflightQueue)
  1055. {
  1056. // check number of messages inside inflight queue
  1057. enqueue = (this.inflightQueue.Count < this.settings.InflightQueueSize);
  1058. if (enqueue)
  1059. {
  1060. // enqueue message and unlock send thread
  1061. this.inflightQueue.Enqueue(msgContext);
  1062. #if TRACE
  1063. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1064. #endif
  1065. // PUBLISH message
  1066. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1067. {
  1068. // to publish and QoS level 1 or 2
  1069. if ((msgContext.Flow == MqttMsgFlow.ToPublish) &&
  1070. ((msg.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) ||
  1071. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)))
  1072. {
  1073. if (this.session != null)
  1074. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1075. }
  1076. // to acknowledge and QoS level 2
  1077. else if ((msgContext.Flow == MqttMsgFlow.ToAcknowledge) &&
  1078. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  1079. {
  1080. if (this.session != null)
  1081. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1082. }
  1083. }
  1084. }
  1085. }
  1086. }
  1087. this.inflightWaitHandle.Set();
  1088. return enqueue;
  1089. }
  1090. /// <summary>
  1091. /// Enqueue a message into the internal queue
  1092. /// </summary>
  1093. /// <param name="msg">Message to enqueue</param>
  1094. private void EnqueueInternal(MqttMsgBase msg)
  1095. {
  1096. // enqueue is needed (or not)
  1097. bool enqueue = true;
  1098. // if it is a PUBREL message (for QoS Level 2)
  1099. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE)
  1100. {
  1101. lock (this.inflightQueue)
  1102. {
  1103. // if it is a PUBREL but the corresponding PUBLISH isn't in the inflight queue,
  1104. // it means that we processed PUBLISH message and received PUBREL and we sent PUBCOMP
  1105. // but publisher didn't receive PUBCOMP so it re-sent PUBREL. We need only to re-send PUBCOMP.
  1106. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1107. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1108. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  1109. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1110. // the PUBLISH message isn't in the inflight queue, it was already processed so
  1111. // we need to re-send PUBCOMP only
  1112. if (msgCtx == null)
  1113. {
  1114. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1115. pubcomp.MessageId = msg.MessageId;
  1116. this.Send(pubcomp);
  1117. enqueue = false;
  1118. }
  1119. }
  1120. }
  1121. // if it is a PUBCOMP message (for QoS Level 2)
  1122. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE)
  1123. {
  1124. lock (this.inflightQueue)
  1125. {
  1126. // if it is a PUBCOMP but the corresponding PUBLISH isn't in the inflight queue,
  1127. // it means that we sent PUBLISH message, sent PUBREL (after receiving PUBREC) and already received PUBCOMP
  1128. // but publisher didn't receive PUBREL so it re-sent PUBCOMP. We need only to ignore this PUBCOMP.
  1129. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1130. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1131. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1132. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1133. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBCOMP
  1134. if (msgCtx == null)
  1135. {
  1136. enqueue = false;
  1137. }
  1138. }
  1139. }
  1140. // if it is a PUBREC message (for QoS Level 2)
  1141. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE)
  1142. {
  1143. lock (this.inflightQueue)
  1144. {
  1145. // if it is a PUBREC but the corresponding PUBLISH isn't in the inflight queue,
  1146. // it means that we sent PUBLISH message more times (retries) but broker didn't send PUBREC in time
  1147. // the publish is failed and we need only to ignore this PUBREC.
  1148. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1149. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1150. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1151. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1152. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBREC
  1153. if (msgCtx == null)
  1154. {
  1155. enqueue = false;
  1156. }
  1157. }
  1158. }
  1159. if (enqueue)
  1160. {
  1161. lock (this.internalQueue)
  1162. {
  1163. this.internalQueue.Enqueue(msg);
  1164. #if TRACE
  1165. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1166. #endif
  1167. this.inflightWaitHandle.Set();
  1168. }
  1169. }
  1170. }
  1171. /// <summary>
  1172. /// Thread for receiving messages
  1173. /// </summary>
  1174. private void ReceiveThread()
  1175. {
  1176. int readBytes = 0;
  1177. byte[] fixedHeaderFirstByte = new byte[1];
  1178. byte msgType;
  1179. while (this.isRunning)
  1180. {
  1181. try
  1182. {
  1183. // read first byte (fixed header)
  1184. readBytes = this.channel.Receive(fixedHeaderFirstByte);
  1185. if (readBytes > 0)
  1186. {
  1187. #if BROKER
  1188. // update last message received ticks
  1189. this.lastCommTime = Environment.TickCount;
  1190. #endif
  1191. // extract message type from received byte
  1192. msgType = (byte)((fixedHeaderFirstByte[0] & MqttMsgBase.MSG_TYPE_MASK) >> MqttMsgBase.MSG_TYPE_OFFSET);
  1193. switch (msgType)
  1194. {
  1195. // CONNECT message received
  1196. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1197. #if BROKER
  1198. MqttMsgConnect connect = MqttMsgConnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1199. #if TRACE
  1200. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", connect);
  1201. #endif
  1202. // raise message received event
  1203. this.OnInternalEvent(new MsgInternalEvent(connect));
  1204. break;
  1205. #else
  1206. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1207. #endif
  1208. // CONNACK message received
  1209. case MqttMsgBase.MQTT_MSG_CONNACK_TYPE:
  1210. #if BROKER
  1211. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1212. #else
  1213. this.msgReceived = MqttMsgConnack.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1214. #if TRACE
  1215. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1216. #endif
  1217. this.syncEndReceiving.Set();
  1218. break;
  1219. #endif
  1220. // PINGREQ message received
  1221. case MqttMsgBase.MQTT_MSG_PINGREQ_TYPE:
  1222. #if BROKER
  1223. this.msgReceived = MqttMsgPingReq.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1224. #if TRACE
  1225. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1226. #endif
  1227. MqttMsgPingResp pingresp = new MqttMsgPingResp();
  1228. this.Send(pingresp);
  1229. break;
  1230. #else
  1231. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1232. #endif
  1233. // PINGRESP message received
  1234. case MqttMsgBase.MQTT_MSG_PINGRESP_TYPE:
  1235. #if BROKER
  1236. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1237. #else
  1238. this.msgReceived = MqttMsgPingResp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1239. #if TRACE
  1240. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1241. #endif
  1242. this.syncEndReceiving.Set();
  1243. break;
  1244. #endif
  1245. // SUBSCRIBE message received
  1246. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1247. #if BROKER
  1248. MqttMsgSubscribe subscribe = MqttMsgSubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1249. #if TRACE
  1250. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", subscribe);
  1251. #endif
  1252. // raise message received event
  1253. this.OnInternalEvent(new MsgInternalEvent(subscribe));
  1254. break;
  1255. #else
  1256. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1257. #endif
  1258. // SUBACK message received
  1259. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1260. #if BROKER
  1261. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1262. #else
  1263. // enqueue SUBACK message received (for QoS Level 1) into the internal queue
  1264. MqttMsgSuback suback = MqttMsgSuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1265. #if TRACE
  1266. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", suback);
  1267. #endif
  1268. // enqueue SUBACK message into the internal queue
  1269. this.EnqueueInternal(suback);
  1270. break;
  1271. #endif
  1272. // PUBLISH message received
  1273. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1274. MqttMsgPublish publish = MqttMsgPublish.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1275. #if TRACE
  1276. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", publish);
  1277. #endif
  1278. // enqueue PUBLISH message to acknowledge into the inflight queue
  1279. this.EnqueueInflight(publish, MqttMsgFlow.ToAcknowledge);
  1280. break;
  1281. // PUBACK message received
  1282. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1283. // enqueue PUBACK message received (for QoS Level 1) into the internal queue
  1284. MqttMsgPuback puback = MqttMsgPuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1285. #if TRACE
  1286. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", puback);
  1287. #endif
  1288. // enqueue PUBACK message into the internal queue
  1289. this.EnqueueInternal(puback);
  1290. break;
  1291. // PUBREC message received
  1292. case MqttMsgBase.MQTT_MSG_PUBREC_TYPE:
  1293. // enqueue PUBREC message received (for QoS Level 2) into the internal queue
  1294. MqttMsgPubrec pubrec = MqttMsgPubrec.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1295. #if TRACE
  1296. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrec);
  1297. #endif
  1298. // enqueue PUBREC message into the internal queue
  1299. this.EnqueueInternal(pubrec);
  1300. break;
  1301. // PUBREL message received
  1302. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1303. // enqueue PUBREL message received (for QoS Level 2) into the internal queue
  1304. MqttMsgPubrel pubrel = MqttMsgPubrel.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1305. #if TRACE
  1306. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrel);
  1307. #endif
  1308. // enqueue PUBREL message into the internal queue
  1309. this.EnqueueInternal(pubrel);
  1310. break;
  1311. // PUBCOMP message received
  1312. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1313. // enqueue PUBCOMP message received (for QoS Level 2) into the internal queue
  1314. MqttMsgPubcomp pubcomp = MqttMsgPubcomp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1315. #if TRACE
  1316. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubcomp);
  1317. #endif
  1318. // enqueue PUBCOMP message into the internal queue
  1319. this.EnqueueInternal(pubcomp);
  1320. break;
  1321. // UNSUBSCRIBE message received
  1322. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1323. #if BROKER
  1324. MqttMsgUnsubscribe unsubscribe = MqttMsgUnsubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1325. #if TRACE
  1326. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsubscribe);
  1327. #endif
  1328. // raise message received event
  1329. this.OnInternalEvent(new MsgInternalEvent(unsubscribe));
  1330. break;
  1331. #else
  1332. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1333. #endif
  1334. // UNSUBACK message received
  1335. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1336. #if BROKER
  1337. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1338. #else
  1339. // enqueue UNSUBACK message received (for QoS Level 1) into the internal queue
  1340. MqttMsgUnsuback unsuback = MqttMsgUnsuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1341. #if TRACE
  1342. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsuback);
  1343. #endif
  1344. // enqueue UNSUBACK message into the internal queue
  1345. this.EnqueueInternal(unsuback);
  1346. break;
  1347. #endif
  1348. // DISCONNECT message received
  1349. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1350. #if BROKER
  1351. MqttMsgDisconnect disconnect = MqttMsgDisconnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1352. #if TRACE
  1353. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", disconnect);
  1354. #endif
  1355. // raise message received event
  1356. this.OnInternalEvent(new MsgInternalEvent(disconnect));
  1357. break;
  1358. #else
  1359. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1360. #endif
  1361. default:
  1362. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1363. }
  1364. this.exReceiving = null;
  1365. }
  1366. // zero bytes read, peer gracefully closed socket
  1367. else
  1368. {
  1369. // wake up thread that will notify connection is closing
  1370. this.OnConnectionClosing();
  1371. }
  1372. }
  1373. catch (Exception e)
  1374. {
  1375. #if TRACE
  1376. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  1377. #endif
  1378. this.exReceiving = new MqttCommunicationException(e);
  1379. bool close = false;
  1380. if (e.GetType() == typeof(MqttClientException))
  1381. {
  1382. // [v3.1.1] scenarios the receiver MUST close the network connection
  1383. MqttClientException ex = e as MqttClientException;
  1384. close = ((ex.ErrorCode == MqttClientErrorCode.InvalidFlagBits) ||
  1385. (ex.ErrorCode == MqttClientErrorCode.InvalidProtocolName) ||
  1386. (ex.ErrorCode == MqttClientErrorCode.InvalidConnectFlags));
  1387. }
  1388. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  1389. else if ((e.GetType() == typeof(IOException)) || (e.GetType() == typeof(SocketException)) ||
  1390. ((e.InnerException != null) && (e.InnerException.GetType() == typeof(SocketException)))) // added for SSL/TLS incoming connection that use SslStream that wraps SocketException
  1391. {
  1392. close = true;
  1393. }
  1394. #endif
  1395. if (close)
  1396. {
  1397. // wake up thread that will notify connection is closing
  1398. this.OnConnectionClosing();
  1399. }
  1400. }
  1401. }
  1402. }
  1403. /// <summary>
  1404. /// Thread for handling keep alive message
  1405. /// </summary>
  1406. private void KeepAliveThread()
  1407. {
  1408. int delta = 0;
  1409. int wait = this.keepAlivePeriod;
  1410. // create event to signal that current thread is end
  1411. this.keepAliveEventEnd = new AutoResetEvent(false);
  1412. while (this.isRunning)
  1413. {
  1414. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1415. // waiting...
  1416. this.keepAliveEvent.WaitOne(wait, false);
  1417. #else
  1418. // waiting...
  1419. this.keepAliveEvent.WaitOne(wait);
  1420. #endif
  1421. if (this.isRunning)
  1422. {
  1423. delta = Environment.TickCount - this.lastCommTime;
  1424. // if timeout exceeded ...
  1425. if (delta >= this.keepAlivePeriod)
  1426. {
  1427. #if BROKER
  1428. // client must close connection
  1429. this.OnConnectionClosing();
  1430. #else
  1431. // ... send keep alive
  1432. this.Ping();
  1433. wait = this.keepAlivePeriod;
  1434. #endif
  1435. }
  1436. else
  1437. {
  1438. // update waiting time
  1439. wait = this.keepAlivePeriod - delta;
  1440. }
  1441. }
  1442. }
  1443. // signal thread end
  1444. this.keepAliveEventEnd.Set();
  1445. }
  1446. /// <summary>
  1447. /// Thread for raising event
  1448. /// </summary>
  1449. private void DispatchEventThread()
  1450. {
  1451. while (this.isRunning)
  1452. {
  1453. #if BROKER
  1454. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1455. {
  1456. // broker need to receive the first message (CONNECT)
  1457. // within a reasonable amount of time after TCP/IP connection
  1458. if (!this.IsConnected)
  1459. {
  1460. // wait on receiving message from client with a connection timeout
  1461. if (!this.receiveEventWaitHandle.WaitOne(this.settings.TimeoutOnConnection))
  1462. {
  1463. // client must close connection
  1464. this.Close();
  1465. // client raw disconnection
  1466. this.OnConnectionClosed();
  1467. }
  1468. }
  1469. else
  1470. {
  1471. // wait on receiving message from client
  1472. this.receiveEventWaitHandle.WaitOne();
  1473. }
  1474. }
  1475. #else
  1476. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1477. // wait on receiving message from client
  1478. this.receiveEventWaitHandle.WaitOne();
  1479. #endif
  1480. // check if it is running or we are closing client
  1481. if (this.isRunning)
  1482. {
  1483. // get event from queue
  1484. InternalEvent internalEvent = null;
  1485. lock (this.eventQueue)
  1486. {
  1487. if (this.eventQueue.Count > 0)
  1488. internalEvent = (InternalEvent)this.eventQueue.Dequeue();
  1489. }
  1490. // it's an event with a message inside
  1491. if (internalEvent != null)
  1492. {
  1493. MqttMsgBase msg = ((MsgInternalEvent)internalEvent).Message;
  1494. if (msg != null)
  1495. {
  1496. switch (msg.Type)
  1497. {
  1498. // CONNECT message received
  1499. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1500. #if BROKER
  1501. // raise connected client event (CONNECT message received)
  1502. this.OnMqttMsgConnected((MqttMsgConnect)msg);
  1503. break;
  1504. #else
  1505. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1506. #endif
  1507. // SUBSCRIBE message received
  1508. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1509. #if BROKER
  1510. MqttMsgSubscribe subscribe = (MqttMsgSubscribe)msg;
  1511. // raise subscribe topic event (SUBSCRIBE message received)
  1512. this.OnMqttMsgSubscribeReceived(subscribe.MessageId, subscribe.Topics, subscribe.QoSLevels);
  1513. break;
  1514. #else
  1515. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1516. #endif
  1517. // SUBACK message received
  1518. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1519. // raise subscribed topic event (SUBACK message received)
  1520. this.OnMqttMsgSubscribed((MqttMsgSuback)msg);
  1521. break;
  1522. // PUBLISH message received
  1523. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1524. // PUBLISH message received in a published internal event, no publish succeeded
  1525. if (internalEvent.GetType() == typeof(MsgPublishedInternalEvent))
  1526. this.OnMqttMsgPublished(msg.MessageId, false);
  1527. else
  1528. // raise PUBLISH message received event
  1529. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1530. break;
  1531. // PUBACK message received
  1532. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1533. // raise published message event
  1534. // (PUBACK received for QoS Level 1)
  1535. this.OnMqttMsgPublished(msg.MessageId, true);
  1536. break;
  1537. // PUBREL message received
  1538. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1539. // raise message received event
  1540. // (PUBREL received for QoS Level 2)
  1541. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1542. break;
  1543. // PUBCOMP message received
  1544. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1545. // raise published message event
  1546. // (PUBCOMP received for QoS Level 2)
  1547. this.OnMqttMsgPublished(msg.MessageId, true);
  1548. break;
  1549. // UNSUBSCRIBE message received from client
  1550. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1551. #if BROKER
  1552. MqttMsgUnsubscribe unsubscribe = (MqttMsgUnsubscribe)msg;
  1553. // raise unsubscribe topic event (UNSUBSCRIBE message received)
  1554. this.OnMqttMsgUnsubscribeReceived(unsubscribe.MessageId, unsubscribe.Topics);
  1555. break;
  1556. #else
  1557. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1558. #endif
  1559. // UNSUBACK message received
  1560. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1561. // raise unsubscribed topic event
  1562. this.OnMqttMsgUnsubscribed(msg.MessageId);
  1563. break;
  1564. // DISCONNECT message received from client
  1565. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1566. #if BROKER
  1567. // raise disconnected client event (DISCONNECT message received)
  1568. this.OnMqttMsgDisconnected();
  1569. break;
  1570. #else
  1571. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1572. #endif
  1573. }
  1574. }
  1575. }
  1576. // all events for received messages dispatched, check if there is closing connection
  1577. if ((this.eventQueue.Count == 0) && this.isConnectionClosing)
  1578. {
  1579. // client must close connection
  1580. this.Close();
  1581. // client raw disconnection
  1582. this.OnConnectionClosed();
  1583. }
  1584. }
  1585. }
  1586. }
  1587. /// <summary>
  1588. /// Process inflight messages queue
  1589. /// </summary>
  1590. private void ProcessInflightThread()
  1591. {
  1592. MqttMsgContext msgContext = null;
  1593. MqttMsgBase msgInflight = null;
  1594. MqttMsgBase msgReceived = null;
  1595. InternalEvent internalEvent = null;
  1596. bool acknowledge = false;
  1597. int timeout = Timeout.Infinite;
  1598. int delta;
  1599. bool msgReceivedProcessed = false;
  1600. try
  1601. {
  1602. while (this.isRunning)
  1603. {
  1604. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1605. // wait on message queueud to inflight
  1606. this.inflightWaitHandle.WaitOne(timeout, false);
  1607. #else
  1608. // wait on message queueud to inflight
  1609. this.inflightWaitHandle.WaitOne(timeout);
  1610. #endif
  1611. // it could be unblocked because Close() method is joining
  1612. if (this.isRunning)
  1613. {
  1614. lock (this.inflightQueue)
  1615. {
  1616. // message received and peeked from internal queue is processed
  1617. // NOTE : it has the corresponding message in inflight queue based on messageId
  1618. // (ex. a PUBREC for a PUBLISH, a SUBACK for a SUBSCRIBE, ...)
  1619. // if it's orphan we need to remove from internal queue
  1620. msgReceivedProcessed = false;
  1621. acknowledge = false;
  1622. msgReceived = null;
  1623. // set timeout tu MaxValue instead of Infinte (-1) to perform
  1624. // compare with calcultad current msgTimeout
  1625. timeout = Int32.MaxValue;
  1626. // a message inflight could be re-enqueued but we have to
  1627. // analyze it only just one time for cycle
  1628. int count = this.inflightQueue.Count;
  1629. // process all inflight queued messages
  1630. while (count > 0)
  1631. {
  1632. count--;
  1633. acknowledge = false;
  1634. msgReceived = null;
  1635. // check to be sure that client isn't closing and all queues are now empty !
  1636. if (!this.isRunning)
  1637. break;
  1638. // dequeue message context from queue
  1639. msgContext = (MqttMsgContext)this.inflightQueue.Dequeue();
  1640. // get inflight message
  1641. msgInflight = (MqttMsgBase)msgContext.Message;
  1642. switch (msgContext.State)
  1643. {
  1644. case MqttMsgState.QueuedQos0:
  1645. // QoS 0, PUBLISH message to send to broker, no state change, no acknowledge
  1646. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1647. {
  1648. this.Send(msgInflight);
  1649. }
  1650. // QoS 0, no need acknowledge
  1651. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1652. {
  1653. internalEvent = new MsgInternalEvent(msgInflight);
  1654. // notify published message from broker (no need acknowledged)
  1655. this.OnInternalEvent(internalEvent);
  1656. }
  1657. #if TRACE
  1658. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1659. #endif
  1660. break;
  1661. case MqttMsgState.QueuedQos1:
  1662. // [v3.1.1] SUBSCRIBE and UNSIBSCRIBE aren't "officially" QOS = 1
  1663. case MqttMsgState.SendSubscribe:
  1664. case MqttMsgState.SendUnsubscribe:
  1665. // QoS 1, PUBLISH or SUBSCRIBE/UNSUBSCRIBE message to send to broker, state change to wait PUBACK or SUBACK/UNSUBACK
  1666. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1667. {
  1668. msgContext.Timestamp = Environment.TickCount;
  1669. msgContext.Attempt++;
  1670. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1671. {
  1672. // PUBLISH message to send, wait for PUBACK
  1673. msgContext.State = MqttMsgState.WaitForPuback;
  1674. // retry ? set dup flag [v3.1.1] only for PUBLISH message
  1675. if (msgContext.Attempt > 1)
  1676. msgInflight.DupFlag = true;
  1677. }
  1678. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1679. // SUBSCRIBE message to send, wait for SUBACK
  1680. msgContext.State = MqttMsgState.WaitForSuback;
  1681. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1682. // UNSUBSCRIBE message to send, wait for UNSUBACK
  1683. msgContext.State = MqttMsgState.WaitForUnsuback;
  1684. this.Send(msgInflight);
  1685. // update timeout : minimum between delay (based on current message sent) or current timeout
  1686. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1687. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1688. this.inflightQueue.Enqueue(msgContext);
  1689. }
  1690. // QoS 1, PUBLISH message received from broker to acknowledge, send PUBACK
  1691. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1692. {
  1693. MqttMsgPuback puback = new MqttMsgPuback();
  1694. puback.MessageId = msgInflight.MessageId;
  1695. this.Send(puback);
  1696. internalEvent = new MsgInternalEvent(msgInflight);
  1697. // notify published message from broker and acknowledged
  1698. this.OnInternalEvent(internalEvent);
  1699. #if TRACE
  1700. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1701. #endif
  1702. }
  1703. break;
  1704. case MqttMsgState.QueuedQos2:
  1705. // QoS 2, PUBLISH message to send to broker, state change to wait PUBREC
  1706. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1707. {
  1708. msgContext.Timestamp = Environment.TickCount;
  1709. msgContext.Attempt++;
  1710. msgContext.State = MqttMsgState.WaitForPubrec;
  1711. // retry ? set dup flag
  1712. if (msgContext.Attempt > 1)
  1713. msgInflight.DupFlag = true;
  1714. this.Send(msgInflight);
  1715. // update timeout : minimum between delay (based on current message sent) or current timeout
  1716. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1717. // re-enqueue message (I have to re-analyze for receiving PUBREC)
  1718. this.inflightQueue.Enqueue(msgContext);
  1719. }
  1720. // QoS 2, PUBLISH message received from broker to acknowledge, send PUBREC, state change to wait PUBREL
  1721. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1722. {
  1723. MqttMsgPubrec pubrec = new MqttMsgPubrec();
  1724. pubrec.MessageId = msgInflight.MessageId;
  1725. msgContext.State = MqttMsgState.WaitForPubrel;
  1726. this.Send(pubrec);
  1727. // re-enqueue message (I have to re-analyze for receiving PUBREL)
  1728. this.inflightQueue.Enqueue(msgContext);
  1729. }
  1730. break;
  1731. case MqttMsgState.WaitForPuback:
  1732. case MqttMsgState.WaitForSuback:
  1733. case MqttMsgState.WaitForUnsuback:
  1734. // QoS 1, waiting for PUBACK of a PUBLISH message sent or
  1735. // waiting for SUBACK of a SUBSCRIBE message sent or
  1736. // waiting for UNSUBACK of a UNSUBSCRIBE message sent or
  1737. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1738. {
  1739. acknowledge = false;
  1740. lock (this.internalQueue)
  1741. {
  1742. if (this.internalQueue.Count > 0)
  1743. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1744. }
  1745. // it is a PUBACK message or a SUBACK/UNSUBACK message
  1746. if (msgReceived != null)
  1747. {
  1748. // PUBACK message or SUBACK/UNSUBACK message for the current message
  1749. if (((msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1750. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_SUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1751. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)))
  1752. {
  1753. lock (this.internalQueue)
  1754. {
  1755. // received message processed
  1756. this.internalQueue.Dequeue();
  1757. acknowledge = true;
  1758. msgReceivedProcessed = true;
  1759. #if TRACE
  1760. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1761. #endif
  1762. }
  1763. // if PUBACK received, confirm published with flag
  1764. if (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE)
  1765. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  1766. else
  1767. internalEvent = new MsgInternalEvent(msgReceived);
  1768. // notify received acknowledge from broker of a published message or subscribe/unsubscribe message
  1769. this.OnInternalEvent(internalEvent);
  1770. // PUBACK received for PUBLISH message with QoS Level 1, remove from session state
  1771. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1772. (this.session != null) &&
  1773. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1774. (this.session.InflightMessages.Contains(msgContext.Key)))
  1775. #else
  1776. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1777. #endif
  1778. {
  1779. this.session.InflightMessages.Remove(msgContext.Key);
  1780. }
  1781. #if TRACE
  1782. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1783. #endif
  1784. }
  1785. }
  1786. // current message not acknowledged, no PUBACK or SUBACK/UNSUBACK or not equal messageid
  1787. if (!acknowledge)
  1788. {
  1789. delta = Environment.TickCount - msgContext.Timestamp;
  1790. // check timeout for receiving PUBACK since PUBLISH was sent or
  1791. // for receiving SUBACK since SUBSCRIBE was sent or
  1792. // for receiving UNSUBACK since UNSUBSCRIBE was sent
  1793. if (delta >= this.settings.DelayOnRetry)
  1794. {
  1795. // max retry not reached, resend
  1796. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1797. {
  1798. msgContext.State = MqttMsgState.QueuedQos1;
  1799. // re-enqueue message
  1800. this.inflightQueue.Enqueue(msgContext);
  1801. // update timeout (0 -> reanalyze queue immediately)
  1802. timeout = 0;
  1803. }
  1804. else
  1805. {
  1806. // if PUBACK for a PUBLISH message not received after retries, raise event for not published
  1807. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1808. {
  1809. // PUBACK not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1810. if ((this.session != null) &&
  1811. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1812. (this.session.InflightMessages.Contains(msgContext.Key)))
  1813. #else
  1814. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1815. #endif
  1816. {
  1817. this.session.InflightMessages.Remove(msgContext.Key);
  1818. }
  1819. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1820. // notify not received acknowledge from broker and message not published
  1821. this.OnInternalEvent(internalEvent);
  1822. }
  1823. // NOTE : not raise events for SUBACK or UNSUBACK not received
  1824. // for the user no event raised means subscribe/unsubscribe failed
  1825. }
  1826. }
  1827. else
  1828. {
  1829. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1830. this.inflightQueue.Enqueue(msgContext);
  1831. // update timeout
  1832. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1833. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1834. }
  1835. }
  1836. }
  1837. break;
  1838. case MqttMsgState.WaitForPubrec:
  1839. // QoS 2, waiting for PUBREC of a PUBLISH message sent
  1840. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1841. {
  1842. acknowledge = false;
  1843. lock (this.internalQueue)
  1844. {
  1845. if (this.internalQueue.Count > 0)
  1846. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1847. }
  1848. // it is a PUBREC message
  1849. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  1850. {
  1851. // PUBREC message for the current PUBLISH message, send PUBREL, wait for PUBCOMP
  1852. if (msgReceived.MessageId == msgInflight.MessageId)
  1853. {
  1854. lock (this.internalQueue)
  1855. {
  1856. // received message processed
  1857. this.internalQueue.Dequeue();
  1858. acknowledge = true;
  1859. msgReceivedProcessed = true;
  1860. #if TRACE
  1861. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1862. #endif
  1863. }
  1864. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  1865. pubrel.MessageId = msgInflight.MessageId;
  1866. msgContext.State = MqttMsgState.WaitForPubcomp;
  1867. msgContext.Timestamp = Environment.TickCount;
  1868. msgContext.Attempt = 1;
  1869. this.Send(pubrel);
  1870. // update timeout : minimum between delay (based on current message sent) or current timeout
  1871. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1872. // re-enqueue message
  1873. this.inflightQueue.Enqueue(msgContext);
  1874. }
  1875. }
  1876. // current message not acknowledged
  1877. if (!acknowledge)
  1878. {
  1879. delta = Environment.TickCount - msgContext.Timestamp;
  1880. // check timeout for receiving PUBREC since PUBLISH was sent
  1881. if (delta >= this.settings.DelayOnRetry)
  1882. {
  1883. // max retry not reached, resend
  1884. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1885. {
  1886. msgContext.State = MqttMsgState.QueuedQos2;
  1887. // re-enqueue message
  1888. this.inflightQueue.Enqueue(msgContext);
  1889. // update timeout (0 -> reanalyze queue immediately)
  1890. timeout = 0;
  1891. }
  1892. else
  1893. {
  1894. // PUBREC not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1895. if ((this.session != null) &&
  1896. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1897. (this.session.InflightMessages.Contains(msgContext.Key)))
  1898. #else
  1899. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1900. #endif
  1901. {
  1902. this.session.InflightMessages.Remove(msgContext.Key);
  1903. }
  1904. // if PUBREC for a PUBLISH message not received after retries, raise event for not published
  1905. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1906. // notify not received acknowledge from broker and message not published
  1907. this.OnInternalEvent(internalEvent);
  1908. }
  1909. }
  1910. else
  1911. {
  1912. // re-enqueue message
  1913. this.inflightQueue.Enqueue(msgContext);
  1914. // update timeout
  1915. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1916. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1917. }
  1918. }
  1919. }
  1920. break;
  1921. case MqttMsgState.WaitForPubrel:
  1922. // QoS 2, waiting for PUBREL of a PUBREC message sent
  1923. if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1924. {
  1925. lock (this.internalQueue)
  1926. {
  1927. if (this.internalQueue.Count > 0)
  1928. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1929. }
  1930. // it is a PUBREL message
  1931. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE))
  1932. {
  1933. // PUBREL message for the current message, send PUBCOMP
  1934. if (msgReceived.MessageId == msgInflight.MessageId)
  1935. {
  1936. lock (this.internalQueue)
  1937. {
  1938. // received message processed
  1939. this.internalQueue.Dequeue();
  1940. msgReceivedProcessed = true;
  1941. #if TRACE
  1942. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1943. #endif
  1944. }
  1945. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1946. pubcomp.MessageId = msgInflight.MessageId;
  1947. this.Send(pubcomp);
  1948. internalEvent = new MsgInternalEvent(msgInflight);
  1949. // notify published message from broker and acknowledged
  1950. this.OnInternalEvent(internalEvent);
  1951. // PUBREL received (and PUBCOMP sent) for PUBLISH message with QoS Level 2, remove from session state
  1952. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1953. (this.session != null) &&
  1954. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1955. (this.session.InflightMessages.Contains(msgContext.Key)))
  1956. #else
  1957. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1958. #endif
  1959. {
  1960. this.session.InflightMessages.Remove(msgContext.Key);
  1961. }
  1962. #if TRACE
  1963. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1964. #endif
  1965. }
  1966. else
  1967. {
  1968. // re-enqueue message
  1969. this.inflightQueue.Enqueue(msgContext);
  1970. }
  1971. }
  1972. else
  1973. {
  1974. // re-enqueue message
  1975. this.inflightQueue.Enqueue(msgContext);
  1976. }
  1977. }
  1978. break;
  1979. case MqttMsgState.WaitForPubcomp:
  1980. // QoS 2, waiting for PUBCOMP of a PUBREL message sent
  1981. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1982. {
  1983. acknowledge = false;
  1984. lock (this.internalQueue)
  1985. {
  1986. if (this.internalQueue.Count > 0)
  1987. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1988. }
  1989. // it is a PUBCOMP message
  1990. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE))
  1991. {
  1992. // PUBCOMP message for the current message
  1993. if (msgReceived.MessageId == msgInflight.MessageId)
  1994. {
  1995. lock (this.internalQueue)
  1996. {
  1997. // received message processed
  1998. this.internalQueue.Dequeue();
  1999. acknowledge = true;
  2000. msgReceivedProcessed = true;
  2001. #if TRACE
  2002. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  2003. #endif
  2004. }
  2005. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  2006. // notify received acknowledge from broker of a published message
  2007. this.OnInternalEvent(internalEvent);
  2008. // PUBCOMP received for PUBLISH message with QoS Level 2, remove from session state
  2009. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2010. (this.session != null) &&
  2011. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2012. (this.session.InflightMessages.Contains(msgContext.Key)))
  2013. #else
  2014. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2015. #endif
  2016. {
  2017. this.session.InflightMessages.Remove(msgContext.Key);
  2018. }
  2019. #if TRACE
  2020. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  2021. #endif
  2022. }
  2023. }
  2024. // it is a PUBREC message
  2025. else if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  2026. {
  2027. // another PUBREC message for the current message due to a retransmitted PUBLISH
  2028. // I'm in waiting for PUBCOMP, so I can discard this PUBREC
  2029. if (msgReceived.MessageId == msgInflight.MessageId)
  2030. {
  2031. lock (this.internalQueue)
  2032. {
  2033. // received message processed
  2034. this.internalQueue.Dequeue();
  2035. acknowledge = true;
  2036. msgReceivedProcessed = true;
  2037. #if TRACE
  2038. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  2039. #endif
  2040. // re-enqueue message
  2041. this.inflightQueue.Enqueue(msgContext);
  2042. }
  2043. }
  2044. }
  2045. // current message not acknowledged
  2046. if (!acknowledge)
  2047. {
  2048. delta = Environment.TickCount - msgContext.Timestamp;
  2049. // check timeout for receiving PUBCOMP since PUBREL was sent
  2050. if (delta >= this.settings.DelayOnRetry)
  2051. {
  2052. // max retry not reached, resend
  2053. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  2054. {
  2055. msgContext.State = MqttMsgState.SendPubrel;
  2056. // re-enqueue message
  2057. this.inflightQueue.Enqueue(msgContext);
  2058. // update timeout (0 -> reanalyze queue immediately)
  2059. timeout = 0;
  2060. }
  2061. else
  2062. {
  2063. // PUBCOMP not received, PUBREL retries failed, need to remove from session inflight messages too
  2064. if ((this.session != null) &&
  2065. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2066. (this.session.InflightMessages.Contains(msgContext.Key)))
  2067. #else
  2068. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2069. #endif
  2070. {
  2071. this.session.InflightMessages.Remove(msgContext.Key);
  2072. }
  2073. // if PUBCOMP for a PUBLISH message not received after retries, raise event for not published
  2074. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  2075. // notify not received acknowledge from broker and message not published
  2076. this.OnInternalEvent(internalEvent);
  2077. }
  2078. }
  2079. else
  2080. {
  2081. // re-enqueue message
  2082. this.inflightQueue.Enqueue(msgContext);
  2083. // update timeout
  2084. int msgTimeout = (this.settings.DelayOnRetry - delta);
  2085. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  2086. }
  2087. }
  2088. }
  2089. break;
  2090. case MqttMsgState.SendPubrec:
  2091. // TODO : impossible ? --> QueuedQos2 ToAcknowledge
  2092. break;
  2093. case MqttMsgState.SendPubrel:
  2094. // QoS 2, PUBREL message to send to broker, state change to wait PUBCOMP
  2095. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  2096. {
  2097. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  2098. pubrel.MessageId = msgInflight.MessageId;
  2099. msgContext.State = MqttMsgState.WaitForPubcomp;
  2100. msgContext.Timestamp = Environment.TickCount;
  2101. msgContext.Attempt++;
  2102. // retry ? set dup flag [v3.1.1] no needed
  2103. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1)
  2104. {
  2105. if (msgContext.Attempt > 1)
  2106. pubrel.DupFlag = true;
  2107. }
  2108. this.Send(pubrel);
  2109. // update timeout : minimum between delay (based on current message sent) or current timeout
  2110. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  2111. // re-enqueue message
  2112. this.inflightQueue.Enqueue(msgContext);
  2113. }
  2114. break;
  2115. case MqttMsgState.SendPubcomp:
  2116. // TODO : impossible ?
  2117. break;
  2118. case MqttMsgState.SendPuback:
  2119. // TODO : impossible ? --> QueuedQos1 ToAcknowledge
  2120. break;
  2121. default:
  2122. break;
  2123. }
  2124. }
  2125. // if calculated timeout is MaxValue, it means that must be Infinite (-1)
  2126. if (timeout == Int32.MaxValue)
  2127. timeout = Timeout.Infinite;
  2128. // if message received is orphan, no corresponding message in inflight queue
  2129. // based on messageId, we need to remove from the queue
  2130. if ((msgReceived != null) && !msgReceivedProcessed)
  2131. {
  2132. this.internalQueue.Dequeue();
  2133. #if TRACE
  2134. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0} orphan", msgReceived);
  2135. #endif
  2136. }
  2137. }
  2138. }
  2139. }
  2140. }
  2141. catch (MqttCommunicationException e)
  2142. {
  2143. // possible exception on Send, I need to re-enqueue not sent message
  2144. if (msgContext != null)
  2145. // re-enqueue message
  2146. this.inflightQueue.Enqueue(msgContext);
  2147. #if TRACE
  2148. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  2149. #endif
  2150. // raise disconnection client event
  2151. this.OnConnectionClosing();
  2152. }
  2153. }
  2154. /// <summary>
  2155. /// Restore session
  2156. /// </summary>
  2157. private void RestoreSession()
  2158. {
  2159. // if not clean session
  2160. if (!this.CleanSession)
  2161. {
  2162. // there is a previous session
  2163. if (this.session != null)
  2164. {
  2165. lock (this.inflightQueue)
  2166. {
  2167. foreach (MqttMsgContext msgContext in this.session.InflightMessages.Values)
  2168. {
  2169. this.inflightQueue.Enqueue(msgContext);
  2170. // if it is a PUBLISH message to publish
  2171. if ((msgContext.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2172. (msgContext.Flow == MqttMsgFlow.ToPublish))
  2173. {
  2174. // it's QoS 1 and we haven't received PUBACK
  2175. if ((msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) &&
  2176. (msgContext.State == MqttMsgState.WaitForPuback))
  2177. {
  2178. // we haven't received PUBACK, we need to resend PUBLISH message
  2179. msgContext.State = MqttMsgState.QueuedQos1;
  2180. }
  2181. // it's QoS 2
  2182. else if (msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)
  2183. {
  2184. // we haven't received PUBREC, we need to resend PUBLISH message
  2185. if (msgContext.State == MqttMsgState.WaitForPubrec)
  2186. {
  2187. msgContext.State = MqttMsgState.QueuedQos2;
  2188. }
  2189. // we haven't received PUBCOMP, we need to resend PUBREL for it
  2190. else if (msgContext.State == MqttMsgState.WaitForPubcomp)
  2191. {
  2192. msgContext.State = MqttMsgState.SendPubrel;
  2193. }
  2194. }
  2195. }
  2196. }
  2197. }
  2198. // unlock process inflight queue
  2199. this.inflightWaitHandle.Set();
  2200. }
  2201. else
  2202. {
  2203. // create new session
  2204. this.session = new MqttClientSession(this.ClientId);
  2205. }
  2206. }
  2207. // clean any previous session
  2208. else
  2209. {
  2210. if (this.session != null)
  2211. this.session.Clear();
  2212. }
  2213. }
  2214. #if BROKER
  2215. /// <summary>
  2216. /// Load a given session
  2217. /// </summary>
  2218. /// <param name="session">MQTT Client session to load</param>
  2219. public void LoadSession(MqttClientSession session)
  2220. {
  2221. // if not clean session
  2222. if (!this.CleanSession)
  2223. {
  2224. // set the session ...
  2225. this.session = session;
  2226. // ... and restore it
  2227. this.RestoreSession();
  2228. }
  2229. }
  2230. #endif
  2231. /// <summary>
  2232. /// 生成下一个消息标识符
  2233. /// </summary>
  2234. /// <returns>消息标识符</returns>
  2235. private ushort GetMessageId()
  2236. {
  2237. // 如果为0或最大UInt16,则变为1(第一个有效的messageId)
  2238. this.messageIdCounter = ((this.messageIdCounter % UInt16.MaxValue) != 0) ? (ushort)(this.messageIdCounter + 1) : (ushort)1;
  2239. return this.messageIdCounter;
  2240. }
  2241. /// <summary>
  2242. /// Finder class for PUBLISH message inside a queue
  2243. /// </summary>
  2244. internal class MqttMsgContextFinder
  2245. {
  2246. // PUBLISH message id
  2247. internal ushort MessageId { get; set; }
  2248. // message flow into inflight queue
  2249. internal MqttMsgFlow Flow { get; set; }
  2250. /// <summary>
  2251. /// Constructor
  2252. /// </summary>
  2253. /// <param name="messageId">Message Id</param>
  2254. /// <param name="flow">Message flow inside inflight queue</param>
  2255. internal MqttMsgContextFinder(ushort messageId, MqttMsgFlow flow)
  2256. {
  2257. this.MessageId = messageId;
  2258. this.Flow = flow;
  2259. }
  2260. internal bool Find(object item)
  2261. {
  2262. MqttMsgContext msgCtx = (MqttMsgContext)item;
  2263. return ((msgCtx.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2264. (msgCtx.Message.MessageId == this.MessageId) &&
  2265. msgCtx.Flow == this.Flow);
  2266. }
  2267. }
  2268. }
  2269. /// <summary>
  2270. /// MQTT protocol version
  2271. /// </summary>
  2272. public enum MqttProtocolVersion
  2273. {
  2274. Version_3_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1,
  2275. Version_3_1_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1_1
  2276. }
  2277. }