MqttClient.cs 121 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637
  1. /*
  2. Copyright (c) 2013, 2014 Paolo Patierno
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License v1.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. http://www.eclipse.org/legal/epl-v10.html
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. Contributors:
  11. Paolo Patierno - initial API and implementation and/or initial documentation
  12. ----------------------------------------------------------------------------
  13. Giovanni Paolo Vigano' - preprocessor directives for platform dependent compilation in Unity
  14. */
  15. using System;
  16. using System.Net;
  17. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  18. using System.Net.Sockets;
  19. using System.Security.Cryptography.X509Certificates;
  20. #endif
  21. using System.Threading;
  22. using uPLibrary.Networking.M2Mqtt.Exceptions;
  23. using uPLibrary.Networking.M2Mqtt.Messages;
  24. using uPLibrary.Networking.M2Mqtt.Session;
  25. using uPLibrary.Networking.M2Mqtt.Utility;
  26. using uPLibrary.Networking.M2Mqtt.Internal;
  27. // if .Net Micro Framework
  28. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3)
  29. using Microsoft.SPOT;
  30. #if SSL
  31. using Microsoft.SPOT.Net.Security;
  32. #endif
  33. // else other frameworks (.Net, .Net Compact, Mono, Windows Phone)
  34. #else
  35. using System.Collections.Generic;
  36. #if (SSL && !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP)))
  37. using System.Security.Authentication;
  38. using System.Net.Security;
  39. #endif
  40. #endif
  41. #if (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP))
  42. using Windows.Networking.Sockets;
  43. #endif
  44. using System.Collections;
  45. // alias needed due to Microsoft.SPOT.Trace in .Net Micro Framework
  46. // (it's ambiguos with uPLibrary.Networking.M2Mqtt.Utility.Trace)
  47. using MqttUtility = uPLibrary.Networking.M2Mqtt.Utility;
  48. using System.IO;
  49. using System.Net.Security;
  50. namespace uPLibrary.Networking.M2Mqtt
  51. {
  52. /// <summary>
  53. /// MQTT Client
  54. /// </summary>
  55. public class MqttClient
  56. {
  57. #if BROKER
  58. #region Constants ...
  59. // thread names
  60. private const string RECEIVE_THREAD_NAME = "ReceiveThread";
  61. private const string RECEIVE_EVENT_THREAD_NAME = "DispatchEventThread";
  62. private const string PROCESS_INFLIGHT_THREAD_NAME = "ProcessInflightThread";
  63. private const string KEEP_ALIVE_THREAD = "KeepAliveThread";
  64. #endregion
  65. #endif
  66. /// <summary>
  67. /// Delagate that defines event handler for PUBLISH message received
  68. /// </summary>
  69. public delegate void MqttMsgPublishEventHandler(object sender, MqttMsgPublishEventArgs e);
  70. /// <summary>
  71. /// Delegate that defines event handler for published message
  72. /// </summary>
  73. public delegate void MqttMsgPublishedEventHandler(object sender, MqttMsgPublishedEventArgs e);
  74. /// <summary>
  75. /// Delagate that defines event handler for subscribed topic
  76. /// </summary>
  77. public delegate void MqttMsgSubscribedEventHandler(object sender, MqttMsgSubscribedEventArgs e);
  78. /// <summary>
  79. /// Delagate that defines event handler for unsubscribed topic
  80. /// </summary>
  81. public delegate void MqttMsgUnsubscribedEventHandler(object sender, MqttMsgUnsubscribedEventArgs e);
  82. #if BROKER
  83. /// <summary>
  84. /// Delagate that defines event handler for SUBSCRIBE message received
  85. /// </summary>
  86. public delegate void MqttMsgSubscribeEventHandler(object sender, MqttMsgSubscribeEventArgs e);
  87. /// <summary>
  88. /// Delagate that defines event handler for UNSUBSCRIBE message received
  89. /// </summary>
  90. public delegate void MqttMsgUnsubscribeEventHandler(object sender, MqttMsgUnsubscribeEventArgs e);
  91. /// <summary>
  92. /// Delagate that defines event handler for CONNECT message received
  93. /// </summary>
  94. public delegate void MqttMsgConnectEventHandler(object sender, MqttMsgConnectEventArgs e);
  95. /// <summary>
  96. /// Delegate that defines event handler for client disconnection (DISCONNECT message or not)
  97. /// </summary>
  98. public delegate void MqttMsgDisconnectEventHandler(object sender, EventArgs e);
  99. #endif
  100. /// <summary>
  101. /// Delegate that defines event handler for cliet/peer disconnection
  102. /// </summary>
  103. public delegate void ConnectionClosedEventHandler(object sender, EventArgs e);
  104. // broker hostname (or ip address) and port
  105. private string brokerHostName;
  106. private int brokerPort;
  107. // running status of threads
  108. private bool isRunning;
  109. // event for raising received message event
  110. private AutoResetEvent receiveEventWaitHandle;
  111. // event for starting process inflight queue asynchronously
  112. private AutoResetEvent inflightWaitHandle;
  113. // event for signaling synchronous receive
  114. AutoResetEvent syncEndReceiving;
  115. // message received
  116. MqttMsgBase msgReceived;
  117. // exeption thrown during receiving
  118. Exception exReceiving;
  119. // keep alive period (in ms)
  120. private int keepAlivePeriod;
  121. // events for signaling on keep alive thread
  122. private AutoResetEvent keepAliveEvent;
  123. private AutoResetEvent keepAliveEventEnd;
  124. // last communication time in ticks
  125. private int lastCommTime;
  126. // event for PUBLISH message received
  127. public event MqttMsgPublishEventHandler MqttMsgPublishReceived;
  128. // event for published message
  129. public event MqttMsgPublishedEventHandler MqttMsgPublished;
  130. // event for subscribed topic
  131. public event MqttMsgSubscribedEventHandler MqttMsgSubscribed;
  132. // event for unsubscribed topic
  133. public event MqttMsgUnsubscribedEventHandler MqttMsgUnsubscribed;
  134. #if BROKER
  135. // event for SUBSCRIBE message received
  136. public event MqttMsgSubscribeEventHandler MqttMsgSubscribeReceived;
  137. // event for USUBSCRIBE message received
  138. public event MqttMsgUnsubscribeEventHandler MqttMsgUnsubscribeReceived;
  139. // event for CONNECT message received
  140. public event MqttMsgConnectEventHandler MqttMsgConnected;
  141. // event for DISCONNECT message received
  142. public event MqttMsgDisconnectEventHandler MqttMsgDisconnected;
  143. #endif
  144. // event for peer/client disconnection
  145. public event ConnectionClosedEventHandler ConnectionClosed;
  146. // channel to communicate over the network
  147. private IMqttNetworkChannel channel;
  148. // inflight messages queue
  149. private Queue inflightQueue;
  150. // internal queue for received messages about inflight messages
  151. private Queue internalQueue;
  152. // internal queue for dispatching events
  153. private Queue eventQueue;
  154. // session
  155. private MqttClientSession session;
  156. // reference to avoid access to singleton via property
  157. private MqttSettings settings;
  158. // current message identifier generated
  159. private ushort messageIdCounter = 0;
  160. // connection is closing due to peer
  161. private bool isConnectionClosing;
  162. /// <summary>
  163. /// Connection status between client and broker
  164. /// </summary>
  165. public bool IsConnected { get; private set; }
  166. /// <summary>
  167. /// Client identifier
  168. /// </summary>
  169. public string ClientId { get; private set; }
  170. /// <summary>
  171. /// Clean session flag
  172. /// </summary>
  173. public bool CleanSession { get; private set; }
  174. /// <summary>
  175. /// Will flag
  176. /// </summary>
  177. public bool WillFlag { get; private set; }
  178. /// <summary>
  179. /// Will QOS level
  180. /// </summary>
  181. public byte WillQosLevel { get; private set; }
  182. /// <summary>
  183. /// Will topic
  184. /// </summary>
  185. public string WillTopic { get; private set; }
  186. /// <summary>
  187. /// Will message
  188. /// </summary>
  189. public string WillMessage { get; private set; }
  190. /// <summary>
  191. /// MQTT protocol version
  192. /// </summary>
  193. public MqttProtocolVersion ProtocolVersion { get; set; }
  194. #if BROKER
  195. /// <summary>
  196. /// MQTT Client Session
  197. /// </summary>
  198. public MqttClientSession Session
  199. {
  200. get { return this.session; }
  201. set { this.session = value; }
  202. }
  203. #endif
  204. /// <summary>
  205. /// MQTT client settings
  206. /// </summary>
  207. public MqttSettings Settings
  208. {
  209. get { return this.settings; }
  210. }
  211. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  212. /// <summary>
  213. /// Constructor
  214. /// </summary>
  215. /// <param name="brokerIpAddress">Broker IP address</param>
  216. [Obsolete("Use this ctor MqttClient(string brokerHostName) insted")]
  217. public MqttClient(IPAddress brokerIpAddress) :
  218. this(brokerIpAddress, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None)
  219. {
  220. }
  221. /// <summary>
  222. /// Constructor
  223. /// </summary>
  224. /// <param name="brokerIpAddress">Broker IP address</param>
  225. /// <param name="brokerPort">Broker port</param>
  226. /// <param name="secure">Using secure connection</param>
  227. /// <param name="caCert">CA certificate for secure connection</param>
  228. /// <param name="clientCert">Client certificate</param>
  229. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  230. [Obsolete("Use this ctor MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert) insted")]
  231. public MqttClient(IPAddress brokerIpAddress, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  232. {
  233. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  234. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
  235. #else
  236. this.Init(brokerIpAddress.ToString(), brokerPort, secure, caCert, clientCert, sslProtocol);
  237. #endif
  238. }
  239. #endif
  240. /// <summary>
  241. /// Constructor
  242. /// </summary>
  243. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  244. public MqttClient(string brokerHostName) :
  245. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  246. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, null, null, MqttSslProtocols.None)
  247. #else
  248. this(brokerHostName, MqttSettings.MQTT_BROKER_DEFAULT_PORT, false, MqttSslProtocols.None)
  249. #endif
  250. {
  251. }
  252. /// <summary>
  253. /// Constructor
  254. /// </summary>
  255. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  256. /// <param name="brokerPort">Broker port</param>
  257. /// <param name="secure">使用安全连接</param>
  258. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  259. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP))
  260. /// <param name="caCert">CA certificate for secure connection</param>
  261. /// <param name="clientCert">Client certificate</param>
  262. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  263. #else
  264. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  265. #endif
  266. {
  267. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  268. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, null, null);
  269. #elif (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  270. this.Init(brokerHostName, brokerPort, secure, sslProtocol);
  271. #else
  272. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol);
  273. #endif
  274. }
  275. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  276. /// <summary>
  277. /// Constructor
  278. /// </summary>
  279. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  280. /// <param name="brokerPort">Broker port</param>
  281. /// <param name="secure">Using secure connection</param>
  282. /// <param name="caCert">CA certificate for secure connection</param>
  283. /// <param name="clientCert">Client certificate</param>
  284. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  285. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  286. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  287. RemoteCertificateValidationCallback userCertificateValidationCallback)
  288. : this(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, null)
  289. {
  290. }
  291. /// <summary>
  292. /// Constructor
  293. /// </summary>
  294. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  295. /// <param name="brokerPort">Broker port</param>
  296. /// <param name="secure">Using secure connection</param>
  297. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  298. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  299. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  300. public MqttClient(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol,
  301. RemoteCertificateValidationCallback userCertificateValidationCallback,
  302. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  303. : this(brokerHostName, brokerPort, secure, null, null, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback)
  304. {
  305. }
  306. /// <summary>
  307. /// Constructor
  308. /// </summary>
  309. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  310. /// <param name="brokerPort">Broker port</param>
  311. /// <param name="secure">Using secure connection</param>
  312. /// <param name="caCert">CA certificate for secure connection</param>
  313. /// <param name="clientCert">Client certificate</param>
  314. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  315. /// <param name="userCertificateValidationCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  316. /// <param name="userCertificateSelectionCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  317. public MqttClient(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  318. RemoteCertificateValidationCallback userCertificateValidationCallback,
  319. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  320. {
  321. this.Init(brokerHostName, brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  322. }
  323. #endif
  324. #if BROKER
  325. /// <summary>
  326. /// Constructor
  327. /// </summary>
  328. /// <param name="channel">Network channel for communication</param>
  329. public MqttClient(IMqttNetworkChannel channel)
  330. {
  331. // set default MQTT protocol version (default is 3.1.1)
  332. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  333. this.channel = channel;
  334. // reference to MQTT settings
  335. this.settings = MqttSettings.Instance;
  336. // client not connected yet (CONNACK not send from client), some default values
  337. this.IsConnected = false;
  338. this.ClientId = null;
  339. this.CleanSession = true;
  340. this.keepAliveEvent = new AutoResetEvent(false);
  341. // queue for handling inflight messages (publishing and acknowledge)
  342. this.inflightWaitHandle = new AutoResetEvent(false);
  343. this.inflightQueue = new Queue();
  344. // queue for received message
  345. this.receiveEventWaitHandle = new AutoResetEvent(false);
  346. this.eventQueue = new Queue();
  347. this.internalQueue = new Queue();
  348. // session
  349. this.session = null;
  350. }
  351. #endif
  352. /// <summary>
  353. /// MqttClient initialization
  354. /// </summary>
  355. /// <param name="brokerHostName">Broker Host Name or IP Address</param>
  356. /// <param name="brokerPort">Broker port</param>
  357. /// <param name="secure">>Using secure connection</param>
  358. /// <param name="caCert">CA certificate for secure connection</param>
  359. /// <param name="clientCert">Client certificate</param>
  360. /// <param name="sslProtocol">SSL/TLS protocol version</param>
  361. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || ((!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP)))
  362. /// <param name="userCertificateSelectionCallback">A RemoteCertificateValidationCallback delegate responsible for validating the certificate supplied by the remote party</param>
  363. /// <param name="userCertificateValidationCallback">A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication</param>
  364. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol,
  365. RemoteCertificateValidationCallback userCertificateValidationCallback,
  366. LocalCertificateSelectionCallback userCertificateSelectionCallback)
  367. #elif (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  368. private void Init(string brokerHostName, int brokerPort, bool secure, MqttSslProtocols sslProtocol)
  369. #else
  370. private void Init(string brokerHostName, int brokerPort, bool secure, X509Certificate caCert, X509Certificate clientCert, MqttSslProtocols sslProtocol)
  371. #endif
  372. {
  373. // set default MQTT protocol version (default is 3.1.1)
  374. this.ProtocolVersion = MqttProtocolVersion.Version_3_1_1;
  375. #if !SSL
  376. // check security parameters
  377. if (secure)
  378. throw new ArgumentException("Library compiled without SSL support");
  379. #endif
  380. this.brokerHostName = brokerHostName;
  381. this.brokerPort = brokerPort;
  382. // reference to MQTT settings
  383. this.settings = MqttSettings.Instance;
  384. // set settings port based on secure connection or not
  385. if (!secure)
  386. this.settings.Port = this.brokerPort;
  387. else
  388. this.settings.SslPort = this.brokerPort;
  389. this.syncEndReceiving = new AutoResetEvent(false);
  390. this.keepAliveEvent = new AutoResetEvent(false);
  391. // queue for handling inflight messages (publishing and acknowledge)
  392. this.inflightWaitHandle = new AutoResetEvent(false);
  393. this.inflightQueue = new Queue();
  394. // queue for received message
  395. this.receiveEventWaitHandle = new AutoResetEvent(false);
  396. this.eventQueue = new Queue();
  397. this.internalQueue = new Queue();
  398. // session
  399. this.session = null;
  400. // create network channel
  401. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  402. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol, userCertificateValidationCallback, userCertificateSelectionCallback);
  403. #elif (WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  404. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, sslProtocol);
  405. #else
  406. this.channel = new MqttNetworkChannel(this.brokerHostName, this.brokerPort, secure, caCert, clientCert, sslProtocol);
  407. #endif
  408. }
  409. /// <summary>
  410. /// Connect to broker
  411. /// </summary>
  412. /// <param name="clientId">Client identifier</param>
  413. /// <returns>Return code of CONNACK message from broker</returns>
  414. public byte Connect(string clientId)
  415. {
  416. return this.Connect(clientId, null, null, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  417. }
  418. /// <summary>
  419. /// Connect to broker
  420. /// </summary>
  421. /// <param name="clientId">Client identifier</param>
  422. /// <param name="username">Username</param>
  423. /// <param name="password">Password</param>
  424. /// <returns>Return code of CONNACK message from broker</returns>
  425. public byte Connect(string clientId,
  426. string username,
  427. string password)
  428. {
  429. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, true, MqttMsgConnect.KEEP_ALIVE_PERIOD_DEFAULT);
  430. }
  431. /// <summary>
  432. /// Connect to broker
  433. /// </summary>
  434. /// <param name="clientId">Client identifier</param>
  435. /// <param name="username">Username</param>
  436. /// <param name="password">Password</param>
  437. /// <param name="cleanSession">Clean sessione flag</param>
  438. /// <param name="keepAlivePeriod">Keep alive period</param>
  439. /// <returns>Return code of CONNACK message from broker</returns>
  440. public byte Connect(string clientId,
  441. string username,
  442. string password,
  443. bool cleanSession,
  444. ushort keepAlivePeriod)
  445. {
  446. return this.Connect(clientId, username, password, false, MqttMsgConnect.QOS_LEVEL_AT_MOST_ONCE, false, null, null, cleanSession, keepAlivePeriod);
  447. }
  448. /// <summary>
  449. /// Connect to broker
  450. /// </summary>
  451. /// <param name="clientId">Client identifier</param>
  452. /// <param name="username">Username</param>
  453. /// <param name="password">Password</param>
  454. /// <param name="willRetain">Will retain flag</param>
  455. /// <param name="willQosLevel">Will QOS level</param>
  456. /// <param name="willFlag">Will flag</param>
  457. /// <param name="willTopic">Will topic</param>
  458. /// <param name="willMessage">Will message</param>
  459. /// <param name="cleanSession">Clean sessione flag</param>
  460. /// <param name="keepAlivePeriod">Keep alive period</param>
  461. /// <returns>Return code of CONNACK message from broker</returns>
  462. public byte Connect(string clientId,
  463. string username,
  464. string password,
  465. bool willRetain,
  466. byte willQosLevel,
  467. bool willFlag,
  468. string willTopic,
  469. string willMessage,
  470. bool cleanSession,
  471. ushort keepAlivePeriod)
  472. {
  473. // create CONNECT message
  474. MqttMsgConnect connect = new MqttMsgConnect(clientId,
  475. username,
  476. password,
  477. willRetain,
  478. willQosLevel,
  479. willFlag,
  480. willTopic,
  481. willMessage,
  482. cleanSession,
  483. keepAlivePeriod,
  484. (byte)this.ProtocolVersion);
  485. try
  486. {
  487. // connect to the broker
  488. this.channel.Connect();
  489. }
  490. catch (Exception ex)
  491. {
  492. throw new MqttConnectionException("Exception connecting to the broker", ex);
  493. }
  494. this.lastCommTime = 0;
  495. this.isRunning = true;
  496. this.isConnectionClosing = false;
  497. // start thread for receiving messages from broker
  498. Fx.StartThread(this.ReceiveThread);
  499. MqttMsgConnack connack = (MqttMsgConnack)this.SendReceive(connect);
  500. // if connection accepted, start keep alive timer and
  501. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  502. {
  503. // set all client properties
  504. this.ClientId = clientId;
  505. this.CleanSession = cleanSession;
  506. this.WillFlag = willFlag;
  507. this.WillTopic = willTopic;
  508. this.WillMessage = willMessage;
  509. this.WillQosLevel = willQosLevel;
  510. this.keepAlivePeriod = keepAlivePeriod * 1000; // convert in ms
  511. // restore previous session
  512. this.RestoreSession();
  513. // keep alive period equals zero means turning off keep alive mechanism
  514. if (this.keepAlivePeriod != 0)
  515. {
  516. // start thread for sending keep alive message to the broker
  517. Fx.StartThread(this.KeepAliveThread);
  518. }
  519. // start thread for raising received message event from broker
  520. Fx.StartThread(this.DispatchEventThread);
  521. // start thread for handling inflight messages queue to broker asynchronously (publish and acknowledge)
  522. Fx.StartThread(this.ProcessInflightThread);
  523. this.IsConnected = true;
  524. }
  525. return connack.ReturnCode;
  526. }
  527. /// <summary>
  528. /// Disconnect from broker
  529. /// </summary>
  530. public void Disconnect()
  531. {
  532. MqttMsgDisconnect disconnect = new MqttMsgDisconnect();
  533. this.Send(disconnect);
  534. // close client
  535. this.OnConnectionClosing();
  536. }
  537. #if BROKER
  538. /// <summary>
  539. /// Open client communication
  540. /// </summary>
  541. public void Open()
  542. {
  543. this.isRunning = true;
  544. // start thread for receiving messages from client
  545. Fx.StartThread(this.ReceiveThread);
  546. // start thread for raising received message event from client
  547. Fx.StartThread(this.DispatchEventThread);
  548. // start thread for handling inflight messages queue to client asynchronously (publish and acknowledge)
  549. Fx.StartThread(this.ProcessInflightThread);
  550. }
  551. #endif
  552. /// <summary>
  553. /// Close client
  554. /// </summary>
  555. #if BROKER
  556. public void Close()
  557. #else
  558. private void Close()
  559. #endif
  560. {
  561. // stop receiving thread
  562. this.isRunning = false;
  563. // wait end receive event thread
  564. if (this.receiveEventWaitHandle != null)
  565. this.receiveEventWaitHandle.Set();
  566. // wait end process inflight thread
  567. if (this.inflightWaitHandle != null)
  568. this.inflightWaitHandle.Set();
  569. #if BROKER
  570. // unlock keep alive thread
  571. this.keepAliveEvent.Set();
  572. #else
  573. // unlock keep alive thread and wait
  574. this.keepAliveEvent.Set();
  575. if (this.keepAliveEventEnd != null)
  576. this.keepAliveEventEnd.WaitOne();
  577. #endif
  578. // clear all queues
  579. this.inflightQueue.Clear();
  580. this.internalQueue.Clear();
  581. this.eventQueue.Clear();
  582. // close network channel
  583. this.channel.Close();
  584. this.IsConnected = false;
  585. }
  586. /// <summary>
  587. /// Execute ping to broker for keep alive
  588. /// </summary>
  589. /// <returns>PINGRESP message from broker</returns>
  590. private MqttMsgPingResp Ping()
  591. {
  592. MqttMsgPingReq pingreq = new MqttMsgPingReq();
  593. try
  594. {
  595. // broker must send PINGRESP within timeout equal to keep alive period
  596. return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod);
  597. }
  598. catch (Exception e)
  599. {
  600. #if TRACE
  601. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  602. #endif
  603. // client must close connection
  604. this.OnConnectionClosing();
  605. return null;
  606. }
  607. }
  608. #if BROKER
  609. /// <summary>
  610. /// Send CONNACK message to the client (connection accepted or not)
  611. /// </summary>
  612. /// <param name="connect">CONNECT message with all client information</param>
  613. /// <param name="returnCode">Return code for CONNACK message</param>
  614. /// <param name="clientId">If not null, client id assigned by broker</param>
  615. /// <param name="sessionPresent">Session present on the broker</param>
  616. public void Connack(MqttMsgConnect connect, byte returnCode, string clientId, bool sessionPresent)
  617. {
  618. this.lastCommTime = 0;
  619. // create CONNACK message and ...
  620. MqttMsgConnack connack = new MqttMsgConnack();
  621. connack.ReturnCode = returnCode;
  622. // [v3.1.1] session present flag
  623. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1_1)
  624. connack.SessionPresent = sessionPresent;
  625. // ... send it to the client
  626. this.Send(connack);
  627. // connection accepted, start keep alive thread checking
  628. if (connack.ReturnCode == MqttMsgConnack.CONN_ACCEPTED)
  629. {
  630. // [v3.1.1] if client id isn't null, the CONNECT message has a cliend id with zero bytes length
  631. // and broker assigned a unique identifier to the client
  632. this.ClientId = (clientId == null) ? connect.ClientId : clientId;
  633. this.CleanSession = connect.CleanSession;
  634. this.WillFlag = connect.WillFlag;
  635. this.WillTopic = connect.WillTopic;
  636. this.WillMessage = connect.WillMessage;
  637. this.WillQosLevel = connect.WillQosLevel;
  638. this.keepAlivePeriod = connect.KeepAlivePeriod * 1000; // convert in ms
  639. // broker has a tolerance of 1.5 specified keep alive period
  640. this.keepAlivePeriod += (this.keepAlivePeriod / 2);
  641. // start thread for checking keep alive period timeout
  642. Fx.StartThread(this.KeepAliveThread);
  643. this.isConnectionClosing = false;
  644. this.IsConnected = true;
  645. }
  646. // connection refused, close TCP/IP channel
  647. else
  648. {
  649. this.Close();
  650. }
  651. }
  652. /// <summary>
  653. /// Send SUBACK message to the client
  654. /// </summary>
  655. /// <param name="messageId">Message Id for the SUBSCRIBE message that is being acknowledged</param>
  656. /// <param name="grantedQosLevels">Granted QoS Levels</param>
  657. public void Suback(ushort messageId, byte[] grantedQosLevels)
  658. {
  659. MqttMsgSuback suback = new MqttMsgSuback();
  660. suback.MessageId = messageId;
  661. suback.GrantedQoSLevels = grantedQosLevels;
  662. this.Send(suback);
  663. }
  664. /// <summary>
  665. /// Send UNSUBACK message to the client
  666. /// </summary>
  667. /// <param name="messageId">Message Id for the UNSUBSCRIBE message that is being acknowledged</param>
  668. public void Unsuback(ushort messageId)
  669. {
  670. MqttMsgUnsuback unsuback = new MqttMsgUnsuback();
  671. unsuback.MessageId = messageId;
  672. this.Send(unsuback);
  673. }
  674. #endif
  675. /// <summary>
  676. /// Subscribe for message topics
  677. /// </summary>
  678. /// <param name="topics">List of topics to subscribe</param>
  679. /// <param name="qosLevels">QOS levels related to topics</param>
  680. /// <returns>Message Id related to SUBSCRIBE message</returns>
  681. public ushort Subscribe(string[] topics, byte[] qosLevels)
  682. {
  683. MqttMsgSubscribe subscribe =
  684. new MqttMsgSubscribe(topics, qosLevels);
  685. subscribe.MessageId = this.GetMessageId();
  686. // enqueue subscribe request into the inflight queue
  687. this.EnqueueInflight(subscribe, MqttMsgFlow.ToPublish);
  688. return subscribe.MessageId;
  689. }
  690. /// <summary>
  691. /// Unsubscribe for message topics
  692. /// </summary>
  693. /// <param name="topics">List of topics to unsubscribe</param>
  694. /// <returns>Message Id in UNSUBACK message from broker</returns>
  695. public ushort Unsubscribe(string[] topics)
  696. {
  697. MqttMsgUnsubscribe unsubscribe =
  698. new MqttMsgUnsubscribe(topics);
  699. unsubscribe.MessageId = this.GetMessageId();
  700. // enqueue unsubscribe request into the inflight queue
  701. this.EnqueueInflight(unsubscribe, MqttMsgFlow.ToPublish);
  702. return unsubscribe.MessageId;
  703. }
  704. /// <summary>
  705. /// 异步发布消息(QoS级别0且未保留)
  706. /// Publish a message asynchronously (QoS Level 0 and not retained)
  707. /// </summary>
  708. /// <param name="topic">Message topic</param>
  709. /// <param name="message">Message data (payload)</param>
  710. /// <returns>Message Id related to PUBLISH message</returns>
  711. public ushort Publish(string topic, byte[] message)
  712. {
  713. return this.Publish(topic, message, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
  714. }
  715. /// <summary>
  716. /// 异步发布消息
  717. /// </summary>
  718. /// <param name="topic">Message topic</param>
  719. /// <param name="message">Message data (payload)</param>
  720. /// <param name="qosLevel">QoS Level</param>
  721. /// <param name="retain">Retain flag</param>
  722. /// <returns>Message Id related to PUBLISH message</returns>
  723. public ushort Publish(string topic, byte[] message, byte qosLevel, bool retain)
  724. {
  725. MqttMsgPublish publish =
  726. new MqttMsgPublish(topic, message, false, qosLevel, retain);
  727. publish.MessageId = this.GetMessageId();
  728. // enqueue message to publish into the inflight queue
  729. bool enqueue = this.EnqueueInflight(publish, MqttMsgFlow.ToPublish);
  730. // message enqueued
  731. if (enqueue)
  732. return publish.MessageId;
  733. // infligh queue full, message not enqueued
  734. else
  735. throw new MqttClientException(MqttClientErrorCode.InflightQueueFull);
  736. }
  737. /// <summary>
  738. /// Wrapper method for raising events
  739. /// </summary>
  740. /// <param name="internalEvent">Internal event</param>
  741. private void OnInternalEvent(InternalEvent internalEvent)
  742. {
  743. lock (this.eventQueue)
  744. {
  745. this.eventQueue.Enqueue(internalEvent);
  746. }
  747. this.receiveEventWaitHandle.Set();
  748. }
  749. /// <summary>
  750. /// Wrapper method for raising closing connection event
  751. /// </summary>
  752. private void OnConnectionClosing()
  753. {
  754. if (!this.isConnectionClosing)
  755. {
  756. this.isConnectionClosing = true;
  757. this.receiveEventWaitHandle.Set();
  758. }
  759. }
  760. /// <summary>
  761. /// Wrapper method for raising PUBLISH message received event
  762. /// </summary>
  763. /// <param name="publish">PUBLISH message received</param>
  764. private void OnMqttMsgPublishReceived(MqttMsgPublish publish)
  765. {
  766. if (this.MqttMsgPublishReceived != null)
  767. {
  768. this.MqttMsgPublishReceived(this,
  769. new MqttMsgPublishEventArgs(publish.Topic, publish.Message, publish.DupFlag, publish.QosLevel, publish.Retain));
  770. }
  771. }
  772. /// <summary>
  773. /// Wrapper method for raising published message event
  774. /// </summary>
  775. /// <param name="messageId">Message identifier for published message</param>
  776. /// <param name="isPublished">Publish flag</param>
  777. private void OnMqttMsgPublished(ushort messageId, bool isPublished)
  778. {
  779. if (this.MqttMsgPublished != null)
  780. {
  781. this.MqttMsgPublished(this,
  782. new MqttMsgPublishedEventArgs(messageId, isPublished));
  783. }
  784. }
  785. /// <summary>
  786. /// Wrapper method for raising subscribed topic event
  787. /// </summary>
  788. /// <param name="suback">SUBACK message received</param>
  789. private void OnMqttMsgSubscribed(MqttMsgSuback suback)
  790. {
  791. if (this.MqttMsgSubscribed != null)
  792. {
  793. this.MqttMsgSubscribed(this,
  794. new MqttMsgSubscribedEventArgs(suback.MessageId, suback.GrantedQoSLevels));
  795. }
  796. }
  797. /// <summary>
  798. /// Wrapper method for raising unsubscribed topic event
  799. /// </summary>
  800. /// <param name="messageId">Message identifier for unsubscribed topic</param>
  801. private void OnMqttMsgUnsubscribed(ushort messageId)
  802. {
  803. if (this.MqttMsgUnsubscribed != null)
  804. {
  805. this.MqttMsgUnsubscribed(this,
  806. new MqttMsgUnsubscribedEventArgs(messageId));
  807. }
  808. }
  809. #if BROKER
  810. /// <summary>
  811. /// Wrapper method for raising SUBSCRIBE message event
  812. /// </summary>
  813. /// <param name="messageId">Message identifier for subscribe topics request</param>
  814. /// <param name="topics">Topics requested to subscribe</param>
  815. /// <param name="qosLevels">List of QOS Levels requested</param>
  816. private void OnMqttMsgSubscribeReceived(ushort messageId, string[] topics, byte[] qosLevels)
  817. {
  818. if (this.MqttMsgSubscribeReceived != null)
  819. {
  820. this.MqttMsgSubscribeReceived(this,
  821. new MqttMsgSubscribeEventArgs(messageId, topics, qosLevels));
  822. }
  823. }
  824. /// <summary>
  825. /// Wrapper method for raising UNSUBSCRIBE message event
  826. /// </summary>
  827. /// <param name="messageId">Message identifier for unsubscribe topics request</param>
  828. /// <param name="topics">Topics requested to unsubscribe</param>
  829. private void OnMqttMsgUnsubscribeReceived(ushort messageId, string[] topics)
  830. {
  831. if (this.MqttMsgUnsubscribeReceived != null)
  832. {
  833. this.MqttMsgUnsubscribeReceived(this,
  834. new MqttMsgUnsubscribeEventArgs(messageId, topics));
  835. }
  836. }
  837. /// <summary>
  838. /// Wrapper method for raising CONNECT message event
  839. /// </summary>
  840. private void OnMqttMsgConnected(MqttMsgConnect connect)
  841. {
  842. if (this.MqttMsgConnected != null)
  843. {
  844. this.ProtocolVersion = (MqttProtocolVersion)connect.ProtocolVersion;
  845. this.MqttMsgConnected(this, new MqttMsgConnectEventArgs(connect));
  846. }
  847. }
  848. /// <summary>
  849. /// Wrapper method for raising DISCONNECT message event
  850. /// </summary>
  851. private void OnMqttMsgDisconnected()
  852. {
  853. if (this.MqttMsgDisconnected != null)
  854. {
  855. this.MqttMsgDisconnected(this, EventArgs.Empty);
  856. }
  857. }
  858. #endif
  859. /// <summary>
  860. /// Wrapper method for peer/client disconnection
  861. /// </summary>
  862. private void OnConnectionClosed()
  863. {
  864. if (this.ConnectionClosed != null)
  865. {
  866. this.ConnectionClosed(this, EventArgs.Empty);
  867. }
  868. }
  869. /// <summary>
  870. /// Send a message
  871. /// </summary>
  872. /// <param name="msgBytes">Message bytes</param>
  873. private void Send(byte[] msgBytes)
  874. {
  875. try
  876. {
  877. // send message
  878. this.channel.Send(msgBytes);
  879. #if !BROKER
  880. // update last message sent ticks
  881. this.lastCommTime = Environment.TickCount;
  882. #endif
  883. }
  884. catch (Exception e)
  885. {
  886. #if TRACE
  887. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  888. #endif
  889. throw new MqttCommunicationException(e);
  890. }
  891. }
  892. /// <summary>
  893. /// Send a message
  894. /// </summary>
  895. /// <param name="msg">Message</param>
  896. private void Send(MqttMsgBase msg)
  897. {
  898. #if TRACE
  899. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  900. #endif
  901. this.Send(msg.GetBytes((byte)this.ProtocolVersion));
  902. }
  903. /// <summary>
  904. /// Send a message to the broker and wait answer
  905. /// </summary>
  906. /// <param name="msgBytes">Message bytes</param>
  907. /// <returns>MQTT message response</returns>
  908. private MqttMsgBase SendReceive(byte[] msgBytes)
  909. {
  910. return this.SendReceive(msgBytes, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  911. }
  912. /// <summary>
  913. /// Send a message to the broker and wait answer
  914. /// </summary>
  915. /// <param name="msgBytes">Message bytes</param>
  916. /// <param name="timeout">Timeout for receiving answer</param>
  917. /// <returns>MQTT message response</returns>
  918. private MqttMsgBase SendReceive(byte[] msgBytes, int timeout)
  919. {
  920. // reset handle before sending
  921. this.syncEndReceiving.Reset();
  922. try
  923. {
  924. // send message
  925. this.channel.Send(msgBytes);
  926. // update last message sent ticks
  927. this.lastCommTime = Environment.TickCount;
  928. }
  929. catch (Exception e)
  930. {
  931. #if !(MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK || WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  932. if (typeof(SocketException) == e.GetType())
  933. {
  934. // connection reset by broker
  935. if (((SocketException)e).SocketErrorCode == SocketError.ConnectionReset)
  936. this.IsConnected = false;
  937. }
  938. #endif
  939. #if TRACE
  940. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  941. #endif
  942. throw new MqttCommunicationException(e);
  943. }
  944. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  945. // wait for answer from broker
  946. if (this.syncEndReceiving.WaitOne(timeout, false))
  947. #else
  948. // wait for answer from broker
  949. if (this.syncEndReceiving.WaitOne(timeout))
  950. #endif
  951. {
  952. // message received without exception
  953. if (this.exReceiving == null)
  954. return this.msgReceived;
  955. // receiving thread catched exception
  956. else
  957. throw this.exReceiving;
  958. }
  959. else
  960. {
  961. // throw timeout exception
  962. throw new MqttCommunicationException();
  963. }
  964. }
  965. /// <summary>
  966. /// Send a message to the broker and wait answer
  967. /// </summary>
  968. /// <param name="msg">Message</param>
  969. /// <returns>MQTT message response</returns>
  970. private MqttMsgBase SendReceive(MqttMsgBase msg)
  971. {
  972. return this.SendReceive(msg, MqttSettings.MQTT_DEFAULT_TIMEOUT);
  973. }
  974. /// <summary>
  975. /// Send a message to the broker and wait answer
  976. /// </summary>
  977. /// <param name="msg">Message</param>
  978. /// <param name="timeout">Timeout for receiving answer</param>
  979. /// <returns>MQTT message response</returns>
  980. private MqttMsgBase SendReceive(MqttMsgBase msg, int timeout)
  981. {
  982. #if TRACE
  983. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "SEND {0}", msg);
  984. #endif
  985. return this.SendReceive(msg.GetBytes((byte)this.ProtocolVersion), timeout);
  986. }
  987. /// <summary>
  988. /// Enqueue a message into the inflight queue
  989. /// </summary>
  990. /// <param name="msg">Message to enqueue</param>
  991. /// <param name="flow">Message flow (publish, acknowledge)</param>
  992. /// <returns>Message enqueued or not</returns>
  993. private bool EnqueueInflight(MqttMsgBase msg, MqttMsgFlow flow)
  994. {
  995. // enqueue is needed (or not)
  996. bool enqueue = true;
  997. // if it is a PUBLISH message with QoS Level 2
  998. if ((msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  999. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  1000. {
  1001. lock (this.inflightQueue)
  1002. {
  1003. // if it is a PUBLISH message already received (it is in the inflight queue), the publisher
  1004. // re-sent it because it didn't received the PUBREC. In this case, we have to re-send PUBREC
  1005. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1006. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1007. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  1008. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1009. // the PUBLISH message is alredy in the inflight queue, we don't need to re-enqueue but we need
  1010. // to change state to re-send PUBREC
  1011. if (msgCtx != null)
  1012. {
  1013. msgCtx.State = MqttMsgState.QueuedQos2;
  1014. msgCtx.Flow = MqttMsgFlow.ToAcknowledge;
  1015. enqueue = false;
  1016. }
  1017. }
  1018. }
  1019. if (enqueue)
  1020. {
  1021. // set a default state
  1022. MqttMsgState state = MqttMsgState.QueuedQos0;
  1023. // based on QoS level, the messages flow between broker and client changes
  1024. switch (msg.QosLevel)
  1025. {
  1026. // QoS Level 0
  1027. case MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE:
  1028. state = MqttMsgState.QueuedQos0;
  1029. break;
  1030. // QoS Level 1
  1031. case MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE:
  1032. state = MqttMsgState.QueuedQos1;
  1033. break;
  1034. // QoS Level 2
  1035. case MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE:
  1036. state = MqttMsgState.QueuedQos2;
  1037. break;
  1038. }
  1039. // [v3.1.1] SUBSCRIBE and UNSUBSCRIBE aren't "officially" QOS = 1
  1040. // so QueuedQos1 state isn't valid for them
  1041. if (msg.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1042. state = MqttMsgState.SendSubscribe;
  1043. else if (msg.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1044. state = MqttMsgState.SendUnsubscribe;
  1045. // queue message context
  1046. MqttMsgContext msgContext = new MqttMsgContext()
  1047. {
  1048. Message = msg,
  1049. State = state,
  1050. Flow = flow,
  1051. Attempt = 0
  1052. };
  1053. lock (this.inflightQueue)
  1054. {
  1055. // check number of messages inside inflight queue
  1056. enqueue = (this.inflightQueue.Count < this.settings.InflightQueueSize);
  1057. if (enqueue)
  1058. {
  1059. // enqueue message and unlock send thread
  1060. this.inflightQueue.Enqueue(msgContext);
  1061. #if TRACE
  1062. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1063. #endif
  1064. // PUBLISH message
  1065. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1066. {
  1067. // to publish and QoS level 1 or 2
  1068. if ((msgContext.Flow == MqttMsgFlow.ToPublish) &&
  1069. ((msg.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) ||
  1070. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)))
  1071. {
  1072. if (this.session != null)
  1073. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1074. }
  1075. // to acknowledge and QoS level 2
  1076. else if ((msgContext.Flow == MqttMsgFlow.ToAcknowledge) &&
  1077. (msg.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE))
  1078. {
  1079. if (this.session != null)
  1080. this.session.InflightMessages.Add(msgContext.Key, msgContext);
  1081. }
  1082. }
  1083. }
  1084. }
  1085. }
  1086. this.inflightWaitHandle.Set();
  1087. return enqueue;
  1088. }
  1089. /// <summary>
  1090. /// Enqueue a message into the internal queue
  1091. /// </summary>
  1092. /// <param name="msg">Message to enqueue</param>
  1093. private void EnqueueInternal(MqttMsgBase msg)
  1094. {
  1095. // enqueue is needed (or not)
  1096. bool enqueue = true;
  1097. // if it is a PUBREL message (for QoS Level 2)
  1098. if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE)
  1099. {
  1100. lock (this.inflightQueue)
  1101. {
  1102. // if it is a PUBREL but the corresponding PUBLISH isn't in the inflight queue,
  1103. // it means that we processed PUBLISH message and received PUBREL and we sent PUBCOMP
  1104. // but publisher didn't receive PUBCOMP so it re-sent PUBREL. We need only to re-send PUBCOMP.
  1105. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1106. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1107. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToAcknowledge);
  1108. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1109. // the PUBLISH message isn't in the inflight queue, it was already processed so
  1110. // we need to re-send PUBCOMP only
  1111. if (msgCtx == null)
  1112. {
  1113. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1114. pubcomp.MessageId = msg.MessageId;
  1115. this.Send(pubcomp);
  1116. enqueue = false;
  1117. }
  1118. }
  1119. }
  1120. // if it is a PUBCOMP message (for QoS Level 2)
  1121. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE)
  1122. {
  1123. lock (this.inflightQueue)
  1124. {
  1125. // if it is a PUBCOMP but the corresponding PUBLISH isn't in the inflight queue,
  1126. // it means that we sent PUBLISH message, sent PUBREL (after receiving PUBREC) and already received PUBCOMP
  1127. // but publisher didn't receive PUBREL so it re-sent PUBCOMP. We need only to ignore this PUBCOMP.
  1128. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1129. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1130. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1131. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1132. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBCOMP
  1133. if (msgCtx == null)
  1134. {
  1135. enqueue = false;
  1136. }
  1137. }
  1138. }
  1139. // if it is a PUBREC message (for QoS Level 2)
  1140. else if (msg.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE)
  1141. {
  1142. lock (this.inflightQueue)
  1143. {
  1144. // if it is a PUBREC but the corresponding PUBLISH isn't in the inflight queue,
  1145. // it means that we sent PUBLISH message more times (retries) but broker didn't send PUBREC in time
  1146. // the publish is failed and we need only to ignore this PUBREC.
  1147. // NOTE : I need to find on message id and flow because the broker could be publish/received
  1148. // to/from client and message id could be the same (one tracked by broker and the other by client)
  1149. MqttMsgContextFinder msgCtxFinder = new MqttMsgContextFinder(msg.MessageId, MqttMsgFlow.ToPublish);
  1150. MqttMsgContext msgCtx = (MqttMsgContext)this.inflightQueue.Get(msgCtxFinder.Find);
  1151. // the PUBLISH message isn't in the inflight queue, it was already sent so we need to ignore this PUBREC
  1152. if (msgCtx == null)
  1153. {
  1154. enqueue = false;
  1155. }
  1156. }
  1157. }
  1158. if (enqueue)
  1159. {
  1160. lock (this.internalQueue)
  1161. {
  1162. this.internalQueue.Enqueue(msg);
  1163. #if TRACE
  1164. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "enqueued {0}", msg);
  1165. #endif
  1166. this.inflightWaitHandle.Set();
  1167. }
  1168. }
  1169. }
  1170. /// <summary>
  1171. /// Thread for receiving messages
  1172. /// </summary>
  1173. private void ReceiveThread()
  1174. {
  1175. int readBytes = 0;
  1176. byte[] fixedHeaderFirstByte = new byte[1];
  1177. byte msgType;
  1178. while (this.isRunning)
  1179. {
  1180. try
  1181. {
  1182. // read first byte (fixed header)
  1183. readBytes = this.channel.Receive(fixedHeaderFirstByte);
  1184. if (readBytes > 0)
  1185. {
  1186. #if BROKER
  1187. // update last message received ticks
  1188. this.lastCommTime = Environment.TickCount;
  1189. #endif
  1190. // extract message type from received byte
  1191. msgType = (byte)((fixedHeaderFirstByte[0] & MqttMsgBase.MSG_TYPE_MASK) >> MqttMsgBase.MSG_TYPE_OFFSET);
  1192. switch (msgType)
  1193. {
  1194. // CONNECT message received
  1195. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1196. #if BROKER
  1197. MqttMsgConnect connect = MqttMsgConnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1198. #if TRACE
  1199. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", connect);
  1200. #endif
  1201. // raise message received event
  1202. this.OnInternalEvent(new MsgInternalEvent(connect));
  1203. break;
  1204. #else
  1205. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1206. #endif
  1207. // CONNACK message received
  1208. case MqttMsgBase.MQTT_MSG_CONNACK_TYPE:
  1209. #if BROKER
  1210. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1211. #else
  1212. this.msgReceived = MqttMsgConnack.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1213. #if TRACE
  1214. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1215. #endif
  1216. this.syncEndReceiving.Set();
  1217. break;
  1218. #endif
  1219. // PINGREQ message received
  1220. case MqttMsgBase.MQTT_MSG_PINGREQ_TYPE:
  1221. #if BROKER
  1222. this.msgReceived = MqttMsgPingReq.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1223. #if TRACE
  1224. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1225. #endif
  1226. MqttMsgPingResp pingresp = new MqttMsgPingResp();
  1227. this.Send(pingresp);
  1228. break;
  1229. #else
  1230. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1231. #endif
  1232. // PINGRESP message received
  1233. case MqttMsgBase.MQTT_MSG_PINGRESP_TYPE:
  1234. #if BROKER
  1235. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1236. #else
  1237. this.msgReceived = MqttMsgPingResp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1238. #if TRACE
  1239. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", this.msgReceived);
  1240. #endif
  1241. this.syncEndReceiving.Set();
  1242. break;
  1243. #endif
  1244. // SUBSCRIBE message received
  1245. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1246. #if BROKER
  1247. MqttMsgSubscribe subscribe = MqttMsgSubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1248. #if TRACE
  1249. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", subscribe);
  1250. #endif
  1251. // raise message received event
  1252. this.OnInternalEvent(new MsgInternalEvent(subscribe));
  1253. break;
  1254. #else
  1255. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1256. #endif
  1257. // SUBACK message received
  1258. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1259. #if BROKER
  1260. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1261. #else
  1262. // enqueue SUBACK message received (for QoS Level 1) into the internal queue
  1263. MqttMsgSuback suback = MqttMsgSuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1264. #if TRACE
  1265. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", suback);
  1266. #endif
  1267. // enqueue SUBACK message into the internal queue
  1268. this.EnqueueInternal(suback);
  1269. break;
  1270. #endif
  1271. // PUBLISH message received
  1272. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1273. MqttMsgPublish publish = MqttMsgPublish.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1274. #if TRACE
  1275. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", publish);
  1276. #endif
  1277. // enqueue PUBLISH message to acknowledge into the inflight queue
  1278. this.EnqueueInflight(publish, MqttMsgFlow.ToAcknowledge);
  1279. break;
  1280. // PUBACK message received
  1281. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1282. // enqueue PUBACK message received (for QoS Level 1) into the internal queue
  1283. MqttMsgPuback puback = MqttMsgPuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1284. #if TRACE
  1285. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", puback);
  1286. #endif
  1287. // enqueue PUBACK message into the internal queue
  1288. this.EnqueueInternal(puback);
  1289. break;
  1290. // PUBREC message received
  1291. case MqttMsgBase.MQTT_MSG_PUBREC_TYPE:
  1292. // enqueue PUBREC message received (for QoS Level 2) into the internal queue
  1293. MqttMsgPubrec pubrec = MqttMsgPubrec.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1294. #if TRACE
  1295. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrec);
  1296. #endif
  1297. // enqueue PUBREC message into the internal queue
  1298. this.EnqueueInternal(pubrec);
  1299. break;
  1300. // PUBREL message received
  1301. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1302. // enqueue PUBREL message received (for QoS Level 2) into the internal queue
  1303. MqttMsgPubrel pubrel = MqttMsgPubrel.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1304. #if TRACE
  1305. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubrel);
  1306. #endif
  1307. // enqueue PUBREL message into the internal queue
  1308. this.EnqueueInternal(pubrel);
  1309. break;
  1310. // PUBCOMP message received
  1311. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1312. // enqueue PUBCOMP message received (for QoS Level 2) into the internal queue
  1313. MqttMsgPubcomp pubcomp = MqttMsgPubcomp.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1314. #if TRACE
  1315. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", pubcomp);
  1316. #endif
  1317. // enqueue PUBCOMP message into the internal queue
  1318. this.EnqueueInternal(pubcomp);
  1319. break;
  1320. // UNSUBSCRIBE message received
  1321. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1322. #if BROKER
  1323. MqttMsgUnsubscribe unsubscribe = MqttMsgUnsubscribe.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1324. #if TRACE
  1325. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsubscribe);
  1326. #endif
  1327. // raise message received event
  1328. this.OnInternalEvent(new MsgInternalEvent(unsubscribe));
  1329. break;
  1330. #else
  1331. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1332. #endif
  1333. // UNSUBACK message received
  1334. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1335. #if BROKER
  1336. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1337. #else
  1338. // enqueue UNSUBACK message received (for QoS Level 1) into the internal queue
  1339. MqttMsgUnsuback unsuback = MqttMsgUnsuback.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1340. #if TRACE
  1341. MqttUtility.Trace.WriteLine(TraceLevel.Frame, "RECV {0}", unsuback);
  1342. #endif
  1343. // enqueue UNSUBACK message into the internal queue
  1344. this.EnqueueInternal(unsuback);
  1345. break;
  1346. #endif
  1347. // DISCONNECT message received
  1348. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1349. #if BROKER
  1350. MqttMsgDisconnect disconnect = MqttMsgDisconnect.Parse(fixedHeaderFirstByte[0], (byte)this.ProtocolVersion, this.channel);
  1351. #if TRACE
  1352. Trace.WriteLine(TraceLevel.Frame, "RECV {0}", disconnect);
  1353. #endif
  1354. // raise message received event
  1355. this.OnInternalEvent(new MsgInternalEvent(disconnect));
  1356. break;
  1357. #else
  1358. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1359. #endif
  1360. default:
  1361. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1362. }
  1363. this.exReceiving = null;
  1364. }
  1365. // zero bytes read, peer gracefully closed socket
  1366. else
  1367. {
  1368. // wake up thread that will notify connection is closing
  1369. this.OnConnectionClosing();
  1370. }
  1371. }
  1372. catch (Exception e)
  1373. {
  1374. #if TRACE
  1375. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  1376. #endif
  1377. this.exReceiving = new MqttCommunicationException(e);
  1378. bool close = false;
  1379. if (e.GetType() == typeof(MqttClientException))
  1380. {
  1381. // [v3.1.1] scenarios the receiver MUST close the network connection
  1382. MqttClientException ex = e as MqttClientException;
  1383. close = ((ex.ErrorCode == MqttClientErrorCode.InvalidFlagBits) ||
  1384. (ex.ErrorCode == MqttClientErrorCode.InvalidProtocolName) ||
  1385. (ex.ErrorCode == MqttClientErrorCode.InvalidConnectFlags));
  1386. }
  1387. #if !(WINDOWS_APP || WINDOWS_PHONE_APP || (!UNITY_EDITOR&&UNITY_WSA_10_0&&!ENABLE_IL2CPP))
  1388. else if ((e.GetType() == typeof(IOException)) || (e.GetType() == typeof(SocketException)) ||
  1389. ((e.InnerException != null) && (e.InnerException.GetType() == typeof(SocketException)))) // added for SSL/TLS incoming connection that use SslStream that wraps SocketException
  1390. {
  1391. close = true;
  1392. }
  1393. #endif
  1394. if (close)
  1395. {
  1396. // wake up thread that will notify connection is closing
  1397. this.OnConnectionClosing();
  1398. }
  1399. }
  1400. }
  1401. }
  1402. /// <summary>
  1403. /// Thread for handling keep alive message
  1404. /// </summary>
  1405. private void KeepAliveThread()
  1406. {
  1407. int delta = 0;
  1408. int wait = this.keepAlivePeriod;
  1409. // create event to signal that current thread is end
  1410. this.keepAliveEventEnd = new AutoResetEvent(false);
  1411. while (this.isRunning)
  1412. {
  1413. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1414. // waiting...
  1415. this.keepAliveEvent.WaitOne(wait, false);
  1416. #else
  1417. // waiting...
  1418. this.keepAliveEvent.WaitOne(wait);
  1419. #endif
  1420. if (this.isRunning)
  1421. {
  1422. delta = Environment.TickCount - this.lastCommTime;
  1423. // if timeout exceeded ...
  1424. if (delta >= this.keepAlivePeriod)
  1425. {
  1426. #if BROKER
  1427. // client must close connection
  1428. this.OnConnectionClosing();
  1429. #else
  1430. // ... send keep alive
  1431. this.Ping();
  1432. wait = this.keepAlivePeriod;
  1433. #endif
  1434. }
  1435. else
  1436. {
  1437. // update waiting time
  1438. wait = this.keepAlivePeriod - delta;
  1439. }
  1440. }
  1441. }
  1442. // signal thread end
  1443. this.keepAliveEventEnd.Set();
  1444. }
  1445. /// <summary>
  1446. /// Thread for raising event
  1447. /// </summary>
  1448. private void DispatchEventThread()
  1449. {
  1450. while (this.isRunning)
  1451. {
  1452. #if BROKER
  1453. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1454. {
  1455. // broker need to receive the first message (CONNECT)
  1456. // within a reasonable amount of time after TCP/IP connection
  1457. if (!this.IsConnected)
  1458. {
  1459. // wait on receiving message from client with a connection timeout
  1460. if (!this.receiveEventWaitHandle.WaitOne(this.settings.TimeoutOnConnection))
  1461. {
  1462. // client must close connection
  1463. this.Close();
  1464. // client raw disconnection
  1465. this.OnConnectionClosed();
  1466. }
  1467. }
  1468. else
  1469. {
  1470. // wait on receiving message from client
  1471. this.receiveEventWaitHandle.WaitOne();
  1472. }
  1473. }
  1474. #else
  1475. if ((this.eventQueue.Count == 0) && !this.isConnectionClosing)
  1476. // wait on receiving message from client
  1477. this.receiveEventWaitHandle.WaitOne();
  1478. #endif
  1479. // check if it is running or we are closing client
  1480. if (this.isRunning)
  1481. {
  1482. // get event from queue
  1483. InternalEvent internalEvent = null;
  1484. lock (this.eventQueue)
  1485. {
  1486. if (this.eventQueue.Count > 0)
  1487. internalEvent = (InternalEvent)this.eventQueue.Dequeue();
  1488. }
  1489. // it's an event with a message inside
  1490. if (internalEvent != null)
  1491. {
  1492. MqttMsgBase msg = ((MsgInternalEvent)internalEvent).Message;
  1493. if (msg != null)
  1494. {
  1495. switch (msg.Type)
  1496. {
  1497. // CONNECT message received
  1498. case MqttMsgBase.MQTT_MSG_CONNECT_TYPE:
  1499. #if BROKER
  1500. // raise connected client event (CONNECT message received)
  1501. this.OnMqttMsgConnected((MqttMsgConnect)msg);
  1502. break;
  1503. #else
  1504. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1505. #endif
  1506. // SUBSCRIBE message received
  1507. case MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE:
  1508. #if BROKER
  1509. MqttMsgSubscribe subscribe = (MqttMsgSubscribe)msg;
  1510. // raise subscribe topic event (SUBSCRIBE message received)
  1511. this.OnMqttMsgSubscribeReceived(subscribe.MessageId, subscribe.Topics, subscribe.QoSLevels);
  1512. break;
  1513. #else
  1514. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1515. #endif
  1516. // SUBACK message received
  1517. case MqttMsgBase.MQTT_MSG_SUBACK_TYPE:
  1518. // raise subscribed topic event (SUBACK message received)
  1519. this.OnMqttMsgSubscribed((MqttMsgSuback)msg);
  1520. break;
  1521. // PUBLISH message received
  1522. case MqttMsgBase.MQTT_MSG_PUBLISH_TYPE:
  1523. // PUBLISH message received in a published internal event, no publish succeeded
  1524. if (internalEvent.GetType() == typeof(MsgPublishedInternalEvent))
  1525. this.OnMqttMsgPublished(msg.MessageId, false);
  1526. else
  1527. // raise PUBLISH message received event
  1528. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1529. break;
  1530. // PUBACK message received
  1531. case MqttMsgBase.MQTT_MSG_PUBACK_TYPE:
  1532. // raise published message event
  1533. // (PUBACK received for QoS Level 1)
  1534. this.OnMqttMsgPublished(msg.MessageId, true);
  1535. break;
  1536. // PUBREL message received
  1537. case MqttMsgBase.MQTT_MSG_PUBREL_TYPE:
  1538. // raise message received event
  1539. // (PUBREL received for QoS Level 2)
  1540. this.OnMqttMsgPublishReceived((MqttMsgPublish)msg);
  1541. break;
  1542. // PUBCOMP message received
  1543. case MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE:
  1544. // raise published message event
  1545. // (PUBCOMP received for QoS Level 2)
  1546. this.OnMqttMsgPublished(msg.MessageId, true);
  1547. break;
  1548. // UNSUBSCRIBE message received from client
  1549. case MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE:
  1550. #if BROKER
  1551. MqttMsgUnsubscribe unsubscribe = (MqttMsgUnsubscribe)msg;
  1552. // raise unsubscribe topic event (UNSUBSCRIBE message received)
  1553. this.OnMqttMsgUnsubscribeReceived(unsubscribe.MessageId, unsubscribe.Topics);
  1554. break;
  1555. #else
  1556. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1557. #endif
  1558. // UNSUBACK message received
  1559. case MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE:
  1560. // raise unsubscribed topic event
  1561. this.OnMqttMsgUnsubscribed(msg.MessageId);
  1562. break;
  1563. // DISCONNECT message received from client
  1564. case MqttMsgDisconnect.MQTT_MSG_DISCONNECT_TYPE:
  1565. #if BROKER
  1566. // raise disconnected client event (DISCONNECT message received)
  1567. this.OnMqttMsgDisconnected();
  1568. break;
  1569. #else
  1570. throw new MqttClientException(MqttClientErrorCode.WrongBrokerMessage);
  1571. #endif
  1572. }
  1573. }
  1574. }
  1575. // all events for received messages dispatched, check if there is closing connection
  1576. if ((this.eventQueue.Count == 0) && this.isConnectionClosing)
  1577. {
  1578. // client must close connection
  1579. this.Close();
  1580. // client raw disconnection
  1581. this.OnConnectionClosed();
  1582. }
  1583. }
  1584. }
  1585. }
  1586. /// <summary>
  1587. /// Process inflight messages queue
  1588. /// </summary>
  1589. private void ProcessInflightThread()
  1590. {
  1591. MqttMsgContext msgContext = null;
  1592. MqttMsgBase msgInflight = null;
  1593. MqttMsgBase msgReceived = null;
  1594. InternalEvent internalEvent = null;
  1595. bool acknowledge = false;
  1596. int timeout = Timeout.Infinite;
  1597. int delta;
  1598. bool msgReceivedProcessed = false;
  1599. try
  1600. {
  1601. while (this.isRunning)
  1602. {
  1603. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1604. // wait on message queueud to inflight
  1605. this.inflightWaitHandle.WaitOne(timeout, false);
  1606. #else
  1607. // wait on message queueud to inflight
  1608. this.inflightWaitHandle.WaitOne(timeout);
  1609. #endif
  1610. // it could be unblocked because Close() method is joining
  1611. if (this.isRunning)
  1612. {
  1613. lock (this.inflightQueue)
  1614. {
  1615. // message received and peeked from internal queue is processed
  1616. // NOTE : it has the corresponding message in inflight queue based on messageId
  1617. // (ex. a PUBREC for a PUBLISH, a SUBACK for a SUBSCRIBE, ...)
  1618. // if it's orphan we need to remove from internal queue
  1619. msgReceivedProcessed = false;
  1620. acknowledge = false;
  1621. msgReceived = null;
  1622. // set timeout tu MaxValue instead of Infinte (-1) to perform
  1623. // compare with calcultad current msgTimeout
  1624. timeout = Int32.MaxValue;
  1625. // a message inflight could be re-enqueued but we have to
  1626. // analyze it only just one time for cycle
  1627. int count = this.inflightQueue.Count;
  1628. // process all inflight queued messages
  1629. while (count > 0)
  1630. {
  1631. count--;
  1632. acknowledge = false;
  1633. msgReceived = null;
  1634. // check to be sure that client isn't closing and all queues are now empty !
  1635. if (!this.isRunning)
  1636. break;
  1637. // dequeue message context from queue
  1638. msgContext = (MqttMsgContext)this.inflightQueue.Dequeue();
  1639. // get inflight message
  1640. msgInflight = (MqttMsgBase)msgContext.Message;
  1641. switch (msgContext.State)
  1642. {
  1643. case MqttMsgState.QueuedQos0:
  1644. // QoS 0, PUBLISH message to send to broker, no state change, no acknowledge
  1645. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1646. {
  1647. this.Send(msgInflight);
  1648. }
  1649. // QoS 0, no need acknowledge
  1650. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1651. {
  1652. internalEvent = new MsgInternalEvent(msgInflight);
  1653. // notify published message from broker (no need acknowledged)
  1654. this.OnInternalEvent(internalEvent);
  1655. }
  1656. #if TRACE
  1657. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1658. #endif
  1659. break;
  1660. case MqttMsgState.QueuedQos1:
  1661. // [v3.1.1] SUBSCRIBE and UNSIBSCRIBE aren't "officially" QOS = 1
  1662. case MqttMsgState.SendSubscribe:
  1663. case MqttMsgState.SendUnsubscribe:
  1664. // QoS 1, PUBLISH or SUBSCRIBE/UNSUBSCRIBE message to send to broker, state change to wait PUBACK or SUBACK/UNSUBACK
  1665. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1666. {
  1667. msgContext.Timestamp = Environment.TickCount;
  1668. msgContext.Attempt++;
  1669. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1670. {
  1671. // PUBLISH message to send, wait for PUBACK
  1672. msgContext.State = MqttMsgState.WaitForPuback;
  1673. // retry ? set dup flag [v3.1.1] only for PUBLISH message
  1674. if (msgContext.Attempt > 1)
  1675. msgInflight.DupFlag = true;
  1676. }
  1677. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE)
  1678. // SUBSCRIBE message to send, wait for SUBACK
  1679. msgContext.State = MqttMsgState.WaitForSuback;
  1680. else if (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE)
  1681. // UNSUBSCRIBE message to send, wait for UNSUBACK
  1682. msgContext.State = MqttMsgState.WaitForUnsuback;
  1683. this.Send(msgInflight);
  1684. // update timeout : minimum between delay (based on current message sent) or current timeout
  1685. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1686. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1687. this.inflightQueue.Enqueue(msgContext);
  1688. }
  1689. // QoS 1, PUBLISH message received from broker to acknowledge, send PUBACK
  1690. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1691. {
  1692. MqttMsgPuback puback = new MqttMsgPuback();
  1693. puback.MessageId = msgInflight.MessageId;
  1694. this.Send(puback);
  1695. internalEvent = new MsgInternalEvent(msgInflight);
  1696. // notify published message from broker and acknowledged
  1697. this.OnInternalEvent(internalEvent);
  1698. #if TRACE
  1699. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1700. #endif
  1701. }
  1702. break;
  1703. case MqttMsgState.QueuedQos2:
  1704. // QoS 2, PUBLISH message to send to broker, state change to wait PUBREC
  1705. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1706. {
  1707. msgContext.Timestamp = Environment.TickCount;
  1708. msgContext.Attempt++;
  1709. msgContext.State = MqttMsgState.WaitForPubrec;
  1710. // retry ? set dup flag
  1711. if (msgContext.Attempt > 1)
  1712. msgInflight.DupFlag = true;
  1713. this.Send(msgInflight);
  1714. // update timeout : minimum between delay (based on current message sent) or current timeout
  1715. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1716. // re-enqueue message (I have to re-analyze for receiving PUBREC)
  1717. this.inflightQueue.Enqueue(msgContext);
  1718. }
  1719. // QoS 2, PUBLISH message received from broker to acknowledge, send PUBREC, state change to wait PUBREL
  1720. else if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1721. {
  1722. MqttMsgPubrec pubrec = new MqttMsgPubrec();
  1723. pubrec.MessageId = msgInflight.MessageId;
  1724. msgContext.State = MqttMsgState.WaitForPubrel;
  1725. this.Send(pubrec);
  1726. // re-enqueue message (I have to re-analyze for receiving PUBREL)
  1727. this.inflightQueue.Enqueue(msgContext);
  1728. }
  1729. break;
  1730. case MqttMsgState.WaitForPuback:
  1731. case MqttMsgState.WaitForSuback:
  1732. case MqttMsgState.WaitForUnsuback:
  1733. // QoS 1, waiting for PUBACK of a PUBLISH message sent or
  1734. // waiting for SUBACK of a SUBSCRIBE message sent or
  1735. // waiting for UNSUBACK of a UNSUBSCRIBE message sent or
  1736. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1737. {
  1738. acknowledge = false;
  1739. lock (this.internalQueue)
  1740. {
  1741. if (this.internalQueue.Count > 0)
  1742. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1743. }
  1744. // it is a PUBACK message or a SUBACK/UNSUBACK message
  1745. if (msgReceived != null)
  1746. {
  1747. // PUBACK message or SUBACK/UNSUBACK message for the current message
  1748. if (((msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1749. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_SUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_SUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)) ||
  1750. ((msgReceived.Type == MqttMsgBase.MQTT_MSG_UNSUBACK_TYPE) && (msgInflight.Type == MqttMsgBase.MQTT_MSG_UNSUBSCRIBE_TYPE) && (msgReceived.MessageId == msgInflight.MessageId)))
  1751. {
  1752. lock (this.internalQueue)
  1753. {
  1754. // received message processed
  1755. this.internalQueue.Dequeue();
  1756. acknowledge = true;
  1757. msgReceivedProcessed = true;
  1758. #if TRACE
  1759. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1760. #endif
  1761. }
  1762. // if PUBACK received, confirm published with flag
  1763. if (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBACK_TYPE)
  1764. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  1765. else
  1766. internalEvent = new MsgInternalEvent(msgReceived);
  1767. // notify received acknowledge from broker of a published message or subscribe/unsubscribe message
  1768. this.OnInternalEvent(internalEvent);
  1769. // PUBACK received for PUBLISH message with QoS Level 1, remove from session state
  1770. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1771. (this.session != null) &&
  1772. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1773. (this.session.InflightMessages.Contains(msgContext.Key)))
  1774. #else
  1775. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1776. #endif
  1777. {
  1778. this.session.InflightMessages.Remove(msgContext.Key);
  1779. }
  1780. #if TRACE
  1781. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1782. #endif
  1783. }
  1784. }
  1785. // current message not acknowledged, no PUBACK or SUBACK/UNSUBACK or not equal messageid
  1786. if (!acknowledge)
  1787. {
  1788. delta = Environment.TickCount - msgContext.Timestamp;
  1789. // check timeout for receiving PUBACK since PUBLISH was sent or
  1790. // for receiving SUBACK since SUBSCRIBE was sent or
  1791. // for receiving UNSUBACK since UNSUBSCRIBE was sent
  1792. if (delta >= this.settings.DelayOnRetry)
  1793. {
  1794. // max retry not reached, resend
  1795. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1796. {
  1797. msgContext.State = MqttMsgState.QueuedQos1;
  1798. // re-enqueue message
  1799. this.inflightQueue.Enqueue(msgContext);
  1800. // update timeout (0 -> reanalyze queue immediately)
  1801. timeout = 0;
  1802. }
  1803. else
  1804. {
  1805. // if PUBACK for a PUBLISH message not received after retries, raise event for not published
  1806. if (msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE)
  1807. {
  1808. // PUBACK not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1809. if ((this.session != null) &&
  1810. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1811. (this.session.InflightMessages.Contains(msgContext.Key)))
  1812. #else
  1813. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1814. #endif
  1815. {
  1816. this.session.InflightMessages.Remove(msgContext.Key);
  1817. }
  1818. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1819. // notify not received acknowledge from broker and message not published
  1820. this.OnInternalEvent(internalEvent);
  1821. }
  1822. // NOTE : not raise events for SUBACK or UNSUBACK not received
  1823. // for the user no event raised means subscribe/unsubscribe failed
  1824. }
  1825. }
  1826. else
  1827. {
  1828. // re-enqueue message (I have to re-analyze for receiving PUBACK, SUBACK or UNSUBACK)
  1829. this.inflightQueue.Enqueue(msgContext);
  1830. // update timeout
  1831. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1832. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1833. }
  1834. }
  1835. }
  1836. break;
  1837. case MqttMsgState.WaitForPubrec:
  1838. // QoS 2, waiting for PUBREC of a PUBLISH message sent
  1839. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1840. {
  1841. acknowledge = false;
  1842. lock (this.internalQueue)
  1843. {
  1844. if (this.internalQueue.Count > 0)
  1845. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1846. }
  1847. // it is a PUBREC message
  1848. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  1849. {
  1850. // PUBREC message for the current PUBLISH message, send PUBREL, wait for PUBCOMP
  1851. if (msgReceived.MessageId == msgInflight.MessageId)
  1852. {
  1853. lock (this.internalQueue)
  1854. {
  1855. // received message processed
  1856. this.internalQueue.Dequeue();
  1857. acknowledge = true;
  1858. msgReceivedProcessed = true;
  1859. #if TRACE
  1860. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1861. #endif
  1862. }
  1863. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  1864. pubrel.MessageId = msgInflight.MessageId;
  1865. msgContext.State = MqttMsgState.WaitForPubcomp;
  1866. msgContext.Timestamp = Environment.TickCount;
  1867. msgContext.Attempt = 1;
  1868. this.Send(pubrel);
  1869. // update timeout : minimum between delay (based on current message sent) or current timeout
  1870. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  1871. // re-enqueue message
  1872. this.inflightQueue.Enqueue(msgContext);
  1873. }
  1874. }
  1875. // current message not acknowledged
  1876. if (!acknowledge)
  1877. {
  1878. delta = Environment.TickCount - msgContext.Timestamp;
  1879. // check timeout for receiving PUBREC since PUBLISH was sent
  1880. if (delta >= this.settings.DelayOnRetry)
  1881. {
  1882. // max retry not reached, resend
  1883. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  1884. {
  1885. msgContext.State = MqttMsgState.QueuedQos2;
  1886. // re-enqueue message
  1887. this.inflightQueue.Enqueue(msgContext);
  1888. // update timeout (0 -> reanalyze queue immediately)
  1889. timeout = 0;
  1890. }
  1891. else
  1892. {
  1893. // PUBREC not received in time, PUBLISH retries failed, need to remove from session inflight messages too
  1894. if ((this.session != null) &&
  1895. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1896. (this.session.InflightMessages.Contains(msgContext.Key)))
  1897. #else
  1898. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1899. #endif
  1900. {
  1901. this.session.InflightMessages.Remove(msgContext.Key);
  1902. }
  1903. // if PUBREC for a PUBLISH message not received after retries, raise event for not published
  1904. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  1905. // notify not received acknowledge from broker and message not published
  1906. this.OnInternalEvent(internalEvent);
  1907. }
  1908. }
  1909. else
  1910. {
  1911. // re-enqueue message
  1912. this.inflightQueue.Enqueue(msgContext);
  1913. // update timeout
  1914. int msgTimeout = (this.settings.DelayOnRetry - delta);
  1915. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  1916. }
  1917. }
  1918. }
  1919. break;
  1920. case MqttMsgState.WaitForPubrel:
  1921. // QoS 2, waiting for PUBREL of a PUBREC message sent
  1922. if (msgContext.Flow == MqttMsgFlow.ToAcknowledge)
  1923. {
  1924. lock (this.internalQueue)
  1925. {
  1926. if (this.internalQueue.Count > 0)
  1927. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1928. }
  1929. // it is a PUBREL message
  1930. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREL_TYPE))
  1931. {
  1932. // PUBREL message for the current message, send PUBCOMP
  1933. if (msgReceived.MessageId == msgInflight.MessageId)
  1934. {
  1935. lock (this.internalQueue)
  1936. {
  1937. // received message processed
  1938. this.internalQueue.Dequeue();
  1939. msgReceivedProcessed = true;
  1940. #if TRACE
  1941. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  1942. #endif
  1943. }
  1944. MqttMsgPubcomp pubcomp = new MqttMsgPubcomp();
  1945. pubcomp.MessageId = msgInflight.MessageId;
  1946. this.Send(pubcomp);
  1947. internalEvent = new MsgInternalEvent(msgInflight);
  1948. // notify published message from broker and acknowledged
  1949. this.OnInternalEvent(internalEvent);
  1950. // PUBREL received (and PUBCOMP sent) for PUBLISH message with QoS Level 2, remove from session state
  1951. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  1952. (this.session != null) &&
  1953. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  1954. (this.session.InflightMessages.Contains(msgContext.Key)))
  1955. #else
  1956. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  1957. #endif
  1958. {
  1959. this.session.InflightMessages.Remove(msgContext.Key);
  1960. }
  1961. #if TRACE
  1962. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  1963. #endif
  1964. }
  1965. else
  1966. {
  1967. // re-enqueue message
  1968. this.inflightQueue.Enqueue(msgContext);
  1969. }
  1970. }
  1971. else
  1972. {
  1973. // re-enqueue message
  1974. this.inflightQueue.Enqueue(msgContext);
  1975. }
  1976. }
  1977. break;
  1978. case MqttMsgState.WaitForPubcomp:
  1979. // QoS 2, waiting for PUBCOMP of a PUBREL message sent
  1980. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  1981. {
  1982. acknowledge = false;
  1983. lock (this.internalQueue)
  1984. {
  1985. if (this.internalQueue.Count > 0)
  1986. msgReceived = (MqttMsgBase)this.internalQueue.Peek();
  1987. }
  1988. // it is a PUBCOMP message
  1989. if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBCOMP_TYPE))
  1990. {
  1991. // PUBCOMP message for the current message
  1992. if (msgReceived.MessageId == msgInflight.MessageId)
  1993. {
  1994. lock (this.internalQueue)
  1995. {
  1996. // received message processed
  1997. this.internalQueue.Dequeue();
  1998. acknowledge = true;
  1999. msgReceivedProcessed = true;
  2000. #if TRACE
  2001. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  2002. #endif
  2003. }
  2004. internalEvent = new MsgPublishedInternalEvent(msgReceived, true);
  2005. // notify received acknowledge from broker of a published message
  2006. this.OnInternalEvent(internalEvent);
  2007. // PUBCOMP received for PUBLISH message with QoS Level 2, remove from session state
  2008. if ((msgInflight.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2009. (this.session != null) &&
  2010. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2011. (this.session.InflightMessages.Contains(msgContext.Key)))
  2012. #else
  2013. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2014. #endif
  2015. {
  2016. this.session.InflightMessages.Remove(msgContext.Key);
  2017. }
  2018. #if TRACE
  2019. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "processed {0}", msgInflight);
  2020. #endif
  2021. }
  2022. }
  2023. // it is a PUBREC message
  2024. else if ((msgReceived != null) && (msgReceived.Type == MqttMsgBase.MQTT_MSG_PUBREC_TYPE))
  2025. {
  2026. // another PUBREC message for the current message due to a retransmitted PUBLISH
  2027. // I'm in waiting for PUBCOMP, so I can discard this PUBREC
  2028. if (msgReceived.MessageId == msgInflight.MessageId)
  2029. {
  2030. lock (this.internalQueue)
  2031. {
  2032. // received message processed
  2033. this.internalQueue.Dequeue();
  2034. acknowledge = true;
  2035. msgReceivedProcessed = true;
  2036. #if TRACE
  2037. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0}", msgReceived);
  2038. #endif
  2039. // re-enqueue message
  2040. this.inflightQueue.Enqueue(msgContext);
  2041. }
  2042. }
  2043. }
  2044. // current message not acknowledged
  2045. if (!acknowledge)
  2046. {
  2047. delta = Environment.TickCount - msgContext.Timestamp;
  2048. // check timeout for receiving PUBCOMP since PUBREL was sent
  2049. if (delta >= this.settings.DelayOnRetry)
  2050. {
  2051. // max retry not reached, resend
  2052. if (msgContext.Attempt < this.settings.AttemptsOnRetry)
  2053. {
  2054. msgContext.State = MqttMsgState.SendPubrel;
  2055. // re-enqueue message
  2056. this.inflightQueue.Enqueue(msgContext);
  2057. // update timeout (0 -> reanalyze queue immediately)
  2058. timeout = 0;
  2059. }
  2060. else
  2061. {
  2062. // PUBCOMP not received, PUBREL retries failed, need to remove from session inflight messages too
  2063. if ((this.session != null) &&
  2064. #if (MF_FRAMEWORK_VERSION_V4_2 || MF_FRAMEWORK_VERSION_V4_3 || COMPACT_FRAMEWORK)
  2065. (this.session.InflightMessages.Contains(msgContext.Key)))
  2066. #else
  2067. (this.session.InflightMessages.ContainsKey(msgContext.Key)))
  2068. #endif
  2069. {
  2070. this.session.InflightMessages.Remove(msgContext.Key);
  2071. }
  2072. // if PUBCOMP for a PUBLISH message not received after retries, raise event for not published
  2073. internalEvent = new MsgPublishedInternalEvent(msgInflight, false);
  2074. // notify not received acknowledge from broker and message not published
  2075. this.OnInternalEvent(internalEvent);
  2076. }
  2077. }
  2078. else
  2079. {
  2080. // re-enqueue message
  2081. this.inflightQueue.Enqueue(msgContext);
  2082. // update timeout
  2083. int msgTimeout = (this.settings.DelayOnRetry - delta);
  2084. timeout = (msgTimeout < timeout) ? msgTimeout : timeout;
  2085. }
  2086. }
  2087. }
  2088. break;
  2089. case MqttMsgState.SendPubrec:
  2090. // TODO : impossible ? --> QueuedQos2 ToAcknowledge
  2091. break;
  2092. case MqttMsgState.SendPubrel:
  2093. // QoS 2, PUBREL message to send to broker, state change to wait PUBCOMP
  2094. if (msgContext.Flow == MqttMsgFlow.ToPublish)
  2095. {
  2096. MqttMsgPubrel pubrel = new MqttMsgPubrel();
  2097. pubrel.MessageId = msgInflight.MessageId;
  2098. msgContext.State = MqttMsgState.WaitForPubcomp;
  2099. msgContext.Timestamp = Environment.TickCount;
  2100. msgContext.Attempt++;
  2101. // retry ? set dup flag [v3.1.1] no needed
  2102. if (this.ProtocolVersion == MqttProtocolVersion.Version_3_1)
  2103. {
  2104. if (msgContext.Attempt > 1)
  2105. pubrel.DupFlag = true;
  2106. }
  2107. this.Send(pubrel);
  2108. // update timeout : minimum between delay (based on current message sent) or current timeout
  2109. timeout = (this.settings.DelayOnRetry < timeout) ? this.settings.DelayOnRetry : timeout;
  2110. // re-enqueue message
  2111. this.inflightQueue.Enqueue(msgContext);
  2112. }
  2113. break;
  2114. case MqttMsgState.SendPubcomp:
  2115. // TODO : impossible ?
  2116. break;
  2117. case MqttMsgState.SendPuback:
  2118. // TODO : impossible ? --> QueuedQos1 ToAcknowledge
  2119. break;
  2120. default:
  2121. break;
  2122. }
  2123. }
  2124. // if calculated timeout is MaxValue, it means that must be Infinite (-1)
  2125. if (timeout == Int32.MaxValue)
  2126. timeout = Timeout.Infinite;
  2127. // if message received is orphan, no corresponding message in inflight queue
  2128. // based on messageId, we need to remove from the queue
  2129. if ((msgReceived != null) && !msgReceivedProcessed)
  2130. {
  2131. this.internalQueue.Dequeue();
  2132. #if TRACE
  2133. MqttUtility.Trace.WriteLine(TraceLevel.Queuing, "dequeued {0} orphan", msgReceived);
  2134. #endif
  2135. }
  2136. }
  2137. }
  2138. }
  2139. }
  2140. catch (MqttCommunicationException e)
  2141. {
  2142. // possible exception on Send, I need to re-enqueue not sent message
  2143. if (msgContext != null)
  2144. // re-enqueue message
  2145. this.inflightQueue.Enqueue(msgContext);
  2146. #if TRACE
  2147. MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
  2148. #endif
  2149. // raise disconnection client event
  2150. this.OnConnectionClosing();
  2151. }
  2152. }
  2153. /// <summary>
  2154. /// Restore session
  2155. /// </summary>
  2156. private void RestoreSession()
  2157. {
  2158. // if not clean session
  2159. if (!this.CleanSession)
  2160. {
  2161. // there is a previous session
  2162. if (this.session != null)
  2163. {
  2164. lock (this.inflightQueue)
  2165. {
  2166. foreach (MqttMsgContext msgContext in this.session.InflightMessages.Values)
  2167. {
  2168. this.inflightQueue.Enqueue(msgContext);
  2169. // if it is a PUBLISH message to publish
  2170. if ((msgContext.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2171. (msgContext.Flow == MqttMsgFlow.ToPublish))
  2172. {
  2173. // it's QoS 1 and we haven't received PUBACK
  2174. if ((msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE) &&
  2175. (msgContext.State == MqttMsgState.WaitForPuback))
  2176. {
  2177. // we haven't received PUBACK, we need to resend PUBLISH message
  2178. msgContext.State = MqttMsgState.QueuedQos1;
  2179. }
  2180. // it's QoS 2
  2181. else if (msgContext.Message.QosLevel == MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE)
  2182. {
  2183. // we haven't received PUBREC, we need to resend PUBLISH message
  2184. if (msgContext.State == MqttMsgState.WaitForPubrec)
  2185. {
  2186. msgContext.State = MqttMsgState.QueuedQos2;
  2187. }
  2188. // we haven't received PUBCOMP, we need to resend PUBREL for it
  2189. else if (msgContext.State == MqttMsgState.WaitForPubcomp)
  2190. {
  2191. msgContext.State = MqttMsgState.SendPubrel;
  2192. }
  2193. }
  2194. }
  2195. }
  2196. }
  2197. // unlock process inflight queue
  2198. this.inflightWaitHandle.Set();
  2199. }
  2200. else
  2201. {
  2202. // create new session
  2203. this.session = new MqttClientSession(this.ClientId);
  2204. }
  2205. }
  2206. // clean any previous session
  2207. else
  2208. {
  2209. if (this.session != null)
  2210. this.session.Clear();
  2211. }
  2212. }
  2213. #if BROKER
  2214. /// <summary>
  2215. /// Load a given session
  2216. /// </summary>
  2217. /// <param name="session">MQTT Client session to load</param>
  2218. public void LoadSession(MqttClientSession session)
  2219. {
  2220. // if not clean session
  2221. if (!this.CleanSession)
  2222. {
  2223. // set the session ...
  2224. this.session = session;
  2225. // ... and restore it
  2226. this.RestoreSession();
  2227. }
  2228. }
  2229. #endif
  2230. /// <summary>
  2231. /// Generate the next message identifier
  2232. /// </summary>
  2233. /// <returns>Message identifier</returns>
  2234. private ushort GetMessageId()
  2235. {
  2236. // if 0 or max UInt16, it becomes 1 (first valid messageId)
  2237. this.messageIdCounter = ((this.messageIdCounter % UInt16.MaxValue) != 0) ? (ushort)(this.messageIdCounter + 1) : (ushort)1;
  2238. return this.messageIdCounter;
  2239. }
  2240. /// <summary>
  2241. /// Finder class for PUBLISH message inside a queue
  2242. /// </summary>
  2243. internal class MqttMsgContextFinder
  2244. {
  2245. // PUBLISH message id
  2246. internal ushort MessageId { get; set; }
  2247. // message flow into inflight queue
  2248. internal MqttMsgFlow Flow { get; set; }
  2249. /// <summary>
  2250. /// Constructor
  2251. /// </summary>
  2252. /// <param name="messageId">Message Id</param>
  2253. /// <param name="flow">Message flow inside inflight queue</param>
  2254. internal MqttMsgContextFinder(ushort messageId, MqttMsgFlow flow)
  2255. {
  2256. this.MessageId = messageId;
  2257. this.Flow = flow;
  2258. }
  2259. internal bool Find(object item)
  2260. {
  2261. MqttMsgContext msgCtx = (MqttMsgContext)item;
  2262. return ((msgCtx.Message.Type == MqttMsgBase.MQTT_MSG_PUBLISH_TYPE) &&
  2263. (msgCtx.Message.MessageId == this.MessageId) &&
  2264. msgCtx.Flow == this.Flow);
  2265. }
  2266. }
  2267. }
  2268. /// <summary>
  2269. /// MQTT protocol version
  2270. /// </summary>
  2271. public enum MqttProtocolVersion
  2272. {
  2273. Version_3_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1,
  2274. Version_3_1_1 = MqttMsgConnect.PROTOCOL_VERSION_V3_1_1
  2275. }
  2276. }