threadpool-ms.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934
  1. /*
  2. * threadpool-ms.c: Microsoft threadpool runtime support
  3. *
  4. * Author:
  5. * Ludovic Henry (ludovic.henry@xamarin.com)
  6. *
  7. * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
  8. * Licensed under the MIT license. See LICENSE file in the project root for full license information.
  9. */
  10. //
  11. // Copyright (c) Microsoft. All rights reserved.
  12. // Licensed under the MIT license. See LICENSE file in the project root for full license information.
  13. //
  14. // Files:
  15. // - src/vm/comthreadpool.cpp
  16. // - src/vm/win32threadpoolcpp
  17. // - src/vm/threadpoolrequest.cpp
  18. // - src/vm/hillclimbing.cpp
  19. //
  20. // Ported from C++ to C and adjusted to Mono runtime
  21. #include "il2cpp-config.h"
  22. #include <stdlib.h>
  23. #define _USE_MATH_DEFINES // needed by MSVC to define math constants
  24. #include <algorithm>
  25. #include <cmath>
  26. #include <complex>
  27. #include "math.h"
  28. #include "il2cpp-api.h"
  29. #include "gc/GarbageCollector.h"
  30. #include "gc/GCHandle.h"
  31. #include "gc/WriteBarrier.h"
  32. #include "icalls/mscorlib/System.Runtime.Remoting.Messaging/MonoMethodMessage.h"
  33. #include "mono/ThreadPool/threadpool-ms.h"
  34. #include "mono/ThreadPool/threadpool-ms-io.h"
  35. #include "mono/ThreadPool/ThreadPoolDataStructures.h"
  36. #include "mono/ThreadPool/ThreadPoolMacros.h"
  37. #include "mono/ThreadPool/ThreadPoolMonitorThread.h"
  38. #include "mono/ThreadPool/ThreadPoolWorkerThread.h"
  39. #include "il2cpp-object-internals.h"
  40. #include "os/CpuInfo.h"
  41. #include "os/Environment.h"
  42. #include "os/Mutex.h"
  43. #include "os/Time.h"
  44. #include "utils/CallOnce.h"
  45. #include "vm/Array.h"
  46. #include "vm/Class.h"
  47. #include "vm/Domain.h"
  48. #include "vm/Exception.h"
  49. #include "vm/Object.h"
  50. #include "vm/Reflection.h"
  51. #include "vm/Random.h"
  52. #include "vm/Runtime.h"
  53. #include "vm/String.h"
  54. #include "vm/Thread.h"
  55. #include "vm/WaitHandle.h"
  56. #include <icalls/mscorlib/System.Runtime.Remoting.Messaging/MonoMethodMessage.h>
  57. #ifndef CLAMP
  58. #define CLAMP(a,low,high) (((a) < (low)) ? (low) : (((a) > (high)) ? (high) : (a)))
  59. #endif
  60. ThreadPool* g_ThreadPool;
  61. /* The exponent to apply to the gain. 1.0 means to use linear gain,
  62. * higher values will enhance large moves and damp small ones.
  63. * default: 2.0 */
  64. #define HILL_CLIMBING_GAIN_EXPONENT 2.0
  65. /* The 'cost' of a thread. 0 means drive for increased throughput regardless
  66. * of thread count, higher values bias more against higher thread counts.
  67. * default: 0.15 */
  68. #define HILL_CLIMBING_BIAS 0.15
  69. #define HILL_CLIMBING_WAVE_PERIOD 4
  70. #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
  71. #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
  72. #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
  73. #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
  74. #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
  75. #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
  76. #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
  77. #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
  78. #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
  79. #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
  80. static il2cpp::utils::OnceFlag lazy_init_status;
  81. static Il2CppMethodMessage *
  82. mono_method_call_message_new(MethodInfo *method, void* *params, MethodInfo *invoke,
  83. Il2CppDelegate **cb, Il2CppObject **state)
  84. {
  85. Il2CppDomain *domain = il2cpp::vm::Domain::GetCurrent();
  86. Il2CppMethodMessage *msg;
  87. int i, count;
  88. msg = (Il2CppMethodMessage *)il2cpp::vm::Object::New(il2cpp_defaults.mono_method_message_class);
  89. if (invoke) {
  90. Il2CppReflectionMethod *rm = il2cpp::vm::Reflection::GetMethodObject(invoke, NULL);
  91. il2cpp::icalls::mscorlib::System::Runtime::Remoting::Messaging::MonoMethodMessage::InitMessage(msg, rm, NULL);
  92. count = method->parameters_count - 2;
  93. }
  94. else {
  95. Il2CppReflectionMethod *rm = il2cpp::vm::Reflection::GetMethodObject(method, NULL);
  96. il2cpp::icalls::mscorlib::System::Runtime::Remoting::Messaging::MonoMethodMessage::InitMessage(msg, rm, NULL);
  97. count = method->parameters_count;
  98. }
  99. for (i = 0; i < count; i++) {
  100. void* vpos;
  101. Il2CppClass *klass;
  102. Il2CppObject *arg;
  103. vpos = params[i];
  104. klass = il2cpp_class_from_type(method->parameters[i]);
  105. arg = (Il2CppObject*)vpos;
  106. il2cpp_array_setref(msg->args, i, arg);
  107. }
  108. if (cb != NULL && state != NULL) {
  109. *cb = (Il2CppDelegate *)params[i];
  110. i++;
  111. *state = (Il2CppObject *)params[i];
  112. }
  113. return msg;
  114. }
  115. static void* cpu_info_create()
  116. {
  117. return il2cpp::os::CpuInfo::Create();
  118. }
  119. ThreadPool::ThreadPool() :
  120. parked_threads_count(0),
  121. worker_creation_current_second(-1),
  122. worker_creation_current_count(0),
  123. heuristic_completions(0),
  124. heuristic_sample_start(0),
  125. heuristic_last_dequeue(0),
  126. heuristic_last_adjustment(0),
  127. heuristic_adjustment_interval(10),
  128. limit_worker_min(0),
  129. limit_worker_max(0),
  130. limit_io_min(0),
  131. limit_io_max(0),
  132. cpu_usage(0),
  133. suspended(false),
  134. parked_threads_cond(active_threads_lock)
  135. {
  136. counters.as_int64_t = 0;
  137. cpu_usage_state = cpu_info_create();
  138. }
  139. static void initialize(void* arg)
  140. {
  141. ThreadPoolHillClimbing *hc;
  142. //const char *threads_per_cpu_env;
  143. int threads_per_cpu;
  144. int threads_count;
  145. IL2CPP_ASSERT(!g_ThreadPool);
  146. g_ThreadPool = new ThreadPool();
  147. IL2CPP_ASSERT(g_ThreadPool);
  148. il2cpp::vm::Random::Open();
  149. hc = &g_ThreadPool->heuristic_hill_climbing;
  150. hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
  151. hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
  152. hc->thread_magnitude_multiplier = (double) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
  153. hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
  154. hc->target_throughput_ratio = (double) HILL_CLIMBING_BIAS;
  155. hc->target_signal_to_noise_ratio = (double) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
  156. hc->max_change_per_second = (double) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
  157. hc->max_change_per_sample = (double) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
  158. hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
  159. hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
  160. hc->throughput_error_smoothing_factor = (double) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
  161. hc->gain_exponent = (double) HILL_CLIMBING_GAIN_EXPONENT;
  162. hc->max_sample_error = (double) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
  163. hc->current_control_setting = 0;
  164. hc->total_samples = 0;
  165. hc->last_thread_count = 0;
  166. hc->average_throughput_noise = 0;
  167. hc->elapsed_since_last_change = 0;
  168. hc->accumulated_completion_count = 0;
  169. hc->accumulated_sample_duration = 0;
  170. hc->samples = (double*)IL2CPP_MALLOC_ZERO (sizeof(double) * hc->samples_to_measure);
  171. hc->thread_counts = (double*)IL2CPP_MALLOC_ZERO(sizeof(double) * hc->samples_to_measure);
  172. hc->random_interval_generator = il2cpp::vm::Random::Create ();
  173. hc->current_sample_interval = il2cpp::vm::Random::Next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
  174. //std::string threads_per_cpu_env = il2cpp::os::Environment::GetEnvironmentVariable("IL2CPP_THREADS_PER_CPU");
  175. //if (threads_per_cpu_env.empty())
  176. threads_per_cpu = 1;
  177. /*else
  178. threads_per_cpu = CLAMP (atoi (threads_per_cpu_env.c_str()), 1, 50);*/
  179. threads_count = il2cpp::os::Environment::GetProcessorCount() * threads_per_cpu;
  180. g_ThreadPool->limit_worker_min = g_ThreadPool->limit_io_min = threads_count;
  181. #if IL2CPP_TARGET_ANDROID || IL2CPP_TARGET_IOS
  182. g_ThreadPool->limit_worker_max = g_ThreadPool->limit_io_max = CLAMP (threads_count * 100, std::min (threads_count, 200), std::max (threads_count, 200));
  183. #else
  184. g_ThreadPool->limit_worker_max = g_ThreadPool->limit_io_max = threads_count * 100;
  185. #endif
  186. g_ThreadPool->counters._.max_working = g_ThreadPool->limit_worker_min;
  187. }
  188. static void lazy_initialize()
  189. {
  190. il2cpp::utils::CallOnce(lazy_init_status, initialize, NULL);
  191. }
  192. static void worker_kill(Il2CppInternalThread* thread)
  193. {
  194. if (thread == il2cpp::vm::Thread::CurrentInternal())
  195. return;
  196. il2cpp::vm::Thread::Stop(thread);
  197. }
  198. static void cleanup (void)
  199. {
  200. unsigned int i;
  201. /* we make the assumption along the code that we are
  202. * cleaning up only if the runtime is shutting down */
  203. IL2CPP_ASSERT(il2cpp::vm::Runtime::IsShuttingDown ());
  204. while (GetMonitorStatus() != MONITOR_STATUS_NOT_RUNNING)
  205. il2cpp::vm::Thread::Sleep(1);
  206. std::vector<Il2CppInternalThread*> working_threads;
  207. g_ThreadPool->active_threads_lock.Acquire();
  208. working_threads = g_ThreadPool->working_threads;
  209. g_ThreadPool->active_threads_lock.Release();
  210. /* stop all threadpool->working_threads */
  211. for (i = 0; i < working_threads.size(); ++i)
  212. worker_kill (working_threads[i]);
  213. /* unpark all g_ThreadPool->parked_threads */
  214. g_ThreadPool->parked_threads_cond.NotifyAll();
  215. }
  216. bool threadpool_ms_enqueue_work_item (Il2CppDomain *domain, Il2CppObject *work_item)
  217. {
  218. static Il2CppClass *threadpool_class = NULL;
  219. static MethodInfo *unsafe_queue_custom_work_item_method = NULL;
  220. //Il2CppDomain *current_domain;
  221. bool f;
  222. void* args [2];
  223. IL2CPP_ASSERT(work_item);
  224. if (!threadpool_class)
  225. threadpool_class = il2cpp::vm::Class::FromName(il2cpp_defaults.corlib, "System.Threading", "ThreadPool");
  226. if (!unsafe_queue_custom_work_item_method)
  227. unsafe_queue_custom_work_item_method = (MethodInfo*)il2cpp::vm::Class::GetMethodFromName(threadpool_class, "UnsafeQueueCustomWorkItem", 2);
  228. IL2CPP_ASSERT(unsafe_queue_custom_work_item_method);
  229. f = false;
  230. args [0] = (void*) work_item;
  231. args [1] = (void*) &f;
  232. Il2CppObject *result = il2cpp::vm::Runtime::InvokeWithThrow(unsafe_queue_custom_work_item_method, NULL, args);
  233. return true;
  234. }
  235. /* LOCKING: threadpool->domains_lock must be held */
  236. static ThreadPoolDomain* domain_get(Il2CppDomain *domain, bool create)
  237. {
  238. ThreadPoolDomain *tpdomain = NULL;
  239. unsigned int i;
  240. IL2CPP_ASSERT(domain);
  241. for (i = 0; i < g_ThreadPool->domains.size(); ++i) {
  242. tpdomain = g_ThreadPool->domains[i];
  243. if (tpdomain->domain == domain)
  244. return tpdomain;
  245. }
  246. if (create) {
  247. tpdomain = new ThreadPoolDomain();
  248. tpdomain->domain = domain;
  249. g_ThreadPool->domains.push_back(tpdomain);
  250. }
  251. return tpdomain;
  252. }
  253. bool worker_try_unpark()
  254. {
  255. bool worker_unparked = true;
  256. g_ThreadPool->active_threads_lock.AcquireScoped([&worker_unparked] {
  257. if (g_ThreadPool->parked_threads_count == 0)
  258. worker_unparked = false;
  259. else
  260. g_ThreadPool->parked_threads_cond.Notify(1);
  261. });
  262. return worker_unparked;
  263. }
  264. static bool worker_request (Il2CppDomain *domain)
  265. {
  266. ThreadPoolDomain *tpdomain;
  267. IL2CPP_ASSERT(domain);
  268. IL2CPP_ASSERT(g_ThreadPool);
  269. if (il2cpp::vm::Runtime::IsShuttingDown ())
  270. return false;
  271. g_ThreadPool->domains_lock.Acquire();
  272. /* synchronize check with worker_thread */
  273. //if (mono_domain_is_unloading (domain)) {
  274. //mono_coop_mutex_unlock (&threadpool->domains_lock);
  275. /*return false;
  276. }*/
  277. tpdomain = domain_get (domain, true);
  278. IL2CPP_ASSERT(tpdomain);
  279. tpdomain->outstanding_request ++;
  280. /*mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
  281. mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);*/
  282. g_ThreadPool->domains_lock.Release();
  283. if (g_ThreadPool->suspended)
  284. return false;
  285. monitor_ensure_running ();
  286. if (worker_try_unpark ()) {
  287. //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
  288. return true;
  289. }
  290. if (worker_try_create ()) {
  291. //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
  292. return true;
  293. }
  294. //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
  295. return false;
  296. }
  297. static void hill_climbing_change_thread_count (int16_t new_thread_count, ThreadPoolHeuristicStateTransition transition)
  298. {
  299. ThreadPoolHillClimbing *hc;
  300. IL2CPP_ASSERT(g_ThreadPool);
  301. hc = &g_ThreadPool->heuristic_hill_climbing;
  302. //mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
  303. hc->last_thread_count = new_thread_count;
  304. hc->current_sample_interval = il2cpp::vm::Random::Next(&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
  305. hc->elapsed_since_last_change = 0;
  306. hc->completions_since_last_change = 0;
  307. }
  308. void hill_climbing_force_change (int16_t new_thread_count, ThreadPoolHeuristicStateTransition transition)
  309. {
  310. ThreadPoolHillClimbing *hc;
  311. IL2CPP_ASSERT(g_ThreadPool);
  312. hc = &g_ThreadPool->heuristic_hill_climbing;
  313. if (new_thread_count != hc->last_thread_count) {
  314. hc->current_control_setting += new_thread_count - hc->last_thread_count;
  315. hill_climbing_change_thread_count (new_thread_count, transition);
  316. }
  317. }
  318. static std::complex<double> hill_climbing_get_wave_component (double *samples, unsigned int sample_count, double period)
  319. {
  320. ThreadPoolHillClimbing *hc;
  321. double w, cosine, sine, coeff, q0, q1, q2;
  322. unsigned int i;
  323. IL2CPP_ASSERT(g_ThreadPool);
  324. IL2CPP_ASSERT(sample_count >= period);
  325. IL2CPP_ASSERT(period >= 2);
  326. hc = &g_ThreadPool->heuristic_hill_climbing;
  327. w = 2.0 * M_PI / period;
  328. cosine = cos (w);
  329. sine = sin (w);
  330. coeff = 2.0 * cosine;
  331. q0 = q1 = q2 = 0;
  332. for (i = 0; i < sample_count; ++i) {
  333. q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
  334. q2 = q1;
  335. q1 = q0;
  336. }
  337. return (std::complex<double> (q1 - q2 * cosine, (q2 * sine)) / ((double)sample_count));
  338. }
  339. static int16_t hill_climbing_update (int16_t current_thread_count, uint32_t sample_duration, int32_t completions, int64_t *adjustment_interval)
  340. {
  341. ThreadPoolHillClimbing *hc;
  342. ThreadPoolHeuristicStateTransition transition;
  343. double throughput;
  344. double throughput_error_estimate;
  345. double confidence;
  346. double move;
  347. double gain;
  348. int sample_index;
  349. int sample_count;
  350. int new_thread_wave_magnitude;
  351. int new_thread_count;
  352. std::complex<double> thread_wave_component;
  353. std::complex<double> throughput_wave_component;
  354. std::complex<double> ratio;
  355. IL2CPP_ASSERT(g_ThreadPool);
  356. IL2CPP_ASSERT(adjustment_interval);
  357. hc = &g_ThreadPool->heuristic_hill_climbing;
  358. /* If someone changed the thread count without telling us, update our records accordingly. */
  359. if (current_thread_count != hc->last_thread_count)
  360. hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
  361. /* Update the cumulative stats for this thread count */
  362. hc->elapsed_since_last_change += sample_duration;
  363. hc->completions_since_last_change += completions;
  364. /* Add in any data we've already collected about this sample */
  365. sample_duration += (uint32_t)hc->accumulated_sample_duration;
  366. completions += hc->accumulated_completion_count;
  367. /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
  368. * of each work item, we are goinng to be missing some data about what really happened during the
  369. * sample interval. The count produced by each thread includes an initial work item that may have
  370. * started well before the start of the interval, and each thread may have been running some new
  371. * work item for some time before the end of the interval, which did not yet get counted. So
  372. * our count is going to be off by +/- threadCount workitems.
  373. *
  374. * The exception is that the thread that reported to us last time definitely wasn't running any work
  375. * at that time, and the thread that's reporting now definitely isn't running a work item now. So
  376. * we really only need to consider threadCount-1 threads.
  377. *
  378. * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
  379. *
  380. * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
  381. * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
  382. * then the next one likely will be too. The one after that will include the sum of the completions
  383. * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
  384. * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
  385. * range we're targeting, which will not be filtered by the frequency-domain translation. */
  386. if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
  387. /* Not accurate enough yet. Let's accumulate the data so
  388. * far, and tell the ThreadPool to collect a little more. */
  389. hc->accumulated_sample_duration = sample_duration;
  390. hc->accumulated_completion_count = completions;
  391. *adjustment_interval = 10;
  392. return current_thread_count;
  393. }
  394. /* We've got enouugh data for our sample; reset our accumulators for next time. */
  395. hc->accumulated_sample_duration = 0;
  396. hc->accumulated_completion_count = 0;
  397. /* Add the current thread count and throughput sample to our history. */
  398. throughput = ((double) completions) / sample_duration;
  399. sample_index = hc->total_samples % hc->samples_to_measure;
  400. hc->samples [sample_index] = throughput;
  401. hc->thread_counts [sample_index] = current_thread_count;
  402. hc->total_samples ++;
  403. /* Set up defaults for our metrics. */
  404. throughput_error_estimate = 0;
  405. confidence = 0;
  406. transition = TRANSITION_WARMUP;
  407. /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
  408. * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
  409. * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
  410. sample_count = ((int) std::min (hc->total_samples - 1, (int64_t)hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
  411. if (sample_count > hc->wave_period) {
  412. int i;
  413. double average_throughput;
  414. double average_thread_count;
  415. double sample_sum = 0;
  416. double thread_sum = 0;
  417. /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
  418. for (i = 0; i < sample_count; ++i) {
  419. unsigned int j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
  420. sample_sum += hc->samples [j];
  421. thread_sum += hc->thread_counts [j];
  422. }
  423. average_throughput = sample_sum / sample_count;
  424. average_thread_count = thread_sum / sample_count;
  425. if (average_throughput > 0 && average_thread_count > 0) {
  426. double noise_for_confidence, adjacent_period_1, adjacent_period_2;
  427. /* Calculate the periods of the adjacent frequency bands we'll be using to
  428. * measure noise levels. We want the two adjacent Fourier frequency bands. */
  429. adjacent_period_1 = sample_count / (((double) sample_count) / ((double) hc->wave_period) + 1);
  430. adjacent_period_2 = sample_count / (((double) sample_count) / ((double) hc->wave_period) - 1);
  431. /* Get the the three different frequency components of the throughput (scaled by average
  432. * throughput). Our "error" estimate (the amount of noise that might be present in the
  433. * frequency band we're really interested in) is the average of the adjacent bands. */
  434. throughput_wave_component = hill_climbing_get_wave_component(hc->samples, sample_count, hc->wave_period) / average_throughput;
  435. //throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
  436. throughput_error_estimate = std::abs(hill_climbing_get_wave_component(hc->samples, sample_count, adjacent_period_1) / average_throughput);
  437. //throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
  438. if (adjacent_period_2 <= sample_count) {
  439. throughput_error_estimate = std::max (throughput_error_estimate, std::abs (hill_climbing_get_wave_component (
  440. hc->samples, sample_count, adjacent_period_2) / average_throughput));
  441. }
  442. /* Do the same for the thread counts, so we have something to compare to. We don't
  443. * measure thread count noise, because there is none; these are exact measurements. */
  444. thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
  445. /* Update our moving average of the throughput noise. We'll use this
  446. * later as feedback to determine the new size of the thread wave. */
  447. if (hc->average_throughput_noise == 0) {
  448. hc->average_throughput_noise = throughput_error_estimate;
  449. } else {
  450. hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
  451. + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
  452. }
  453. if (std::abs (thread_wave_component) > 0) {
  454. /* Adjust the throughput wave so it's centered around the target wave,
  455. * and then calculate the adjusted throughput/thread ratio. */
  456. ratio = ((throughput_wave_component - (thread_wave_component * hc->target_throughput_ratio)) / thread_wave_component);
  457. transition = TRANSITION_CLIMBING_MOVE;
  458. } else {
  459. //ratio = mono_double_complex_make (0, 0);
  460. transition = TRANSITION_STABILIZING;
  461. }
  462. noise_for_confidence = std::max (hc->average_throughput_noise, throughput_error_estimate);
  463. if (noise_for_confidence > 0) {
  464. confidence = std::abs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
  465. } else {
  466. /* there is no noise! */
  467. confidence = 1.0;
  468. }
  469. }
  470. }
  471. /* We use just the real part of the complex ratio we just calculated. If the throughput signal
  472. * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
  473. * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
  474. * backward (because this indicates that our changes are having the opposite of the intended effect).
  475. * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
  476. * having a negative or positive effect on throughput. */
  477. move = std::real (ratio);
  478. move = CLAMP (move, -1.0, 1.0);
  479. /* Apply our confidence multiplier. */
  480. move *= CLAMP (confidence, -1.0, 1.0);
  481. /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
  482. * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
  483. * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
  484. gain = hc->max_change_per_second * sample_duration;
  485. move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
  486. move = std::min (move, hc->max_change_per_sample);
  487. /* If the result was positive, and CPU is > 95%, refuse the move. */
  488. if (move > 0.0 && g_ThreadPool->cpu_usage > CPU_USAGE_HIGH)
  489. move = 0.0;
  490. /* Apply the move to our control setting. */
  491. hc->current_control_setting += move;
  492. /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
  493. * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */
  494. new_thread_wave_magnitude = (int)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
  495. * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
  496. new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
  497. /* Make sure our control setting is within the ThreadPool's limits. */
  498. hc->current_control_setting = CLAMP (hc->current_control_setting, g_ThreadPool->limit_worker_min, g_ThreadPool->limit_worker_max - new_thread_wave_magnitude);
  499. /* Calculate the new thread count (control setting + square wave). */
  500. new_thread_count = (int)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
  501. /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
  502. new_thread_count = CLAMP (new_thread_count, g_ThreadPool->limit_worker_min, g_ThreadPool->limit_worker_max);
  503. if (new_thread_count != current_thread_count)
  504. hill_climbing_change_thread_count (new_thread_count, transition);
  505. if (std::real (ratio) < 0.0 && new_thread_count == g_ThreadPool->limit_worker_min)
  506. *adjustment_interval = (int)(0.5 + hc->current_sample_interval * (10.0 * std::max (-1.0 * std::real (ratio), 1.0)));
  507. else
  508. *adjustment_interval = hc->current_sample_interval;
  509. return new_thread_count;
  510. }
  511. static void heuristic_notify_work_completed (void)
  512. {
  513. IL2CPP_ASSERT(g_ThreadPool);
  514. g_ThreadPool->heuristic_completions++;
  515. g_ThreadPool->heuristic_last_dequeue = il2cpp::os::Time::GetTicksMillisecondsMonotonic();
  516. }
  517. static bool heuristic_should_adjust (void)
  518. {
  519. IL2CPP_ASSERT(g_ThreadPool);
  520. if (g_ThreadPool->heuristic_last_dequeue > g_ThreadPool->heuristic_last_adjustment + g_ThreadPool->heuristic_adjustment_interval) {
  521. ThreadPoolCounter counter;
  522. counter.as_int64_t = COUNTER_READ();
  523. if (counter._.working <= counter._.max_working)
  524. return true;
  525. }
  526. return false;
  527. }
  528. static void heuristic_adjust (void)
  529. {
  530. IL2CPP_ASSERT(g_ThreadPool);
  531. if (g_ThreadPool->heuristic_lock.TryAcquire()) {
  532. int32_t completions = g_ThreadPool->heuristic_completions.exchange(0);
  533. int64_t sample_end = il2cpp::os::Time::GetTicksMillisecondsMonotonic();
  534. int64_t sample_duration = sample_end - g_ThreadPool->heuristic_sample_start;
  535. if (sample_duration >= g_ThreadPool->heuristic_adjustment_interval / 2) {
  536. ThreadPoolCounter counter;
  537. int16_t new_thread_count;
  538. counter.as_int64_t = COUNTER_READ ();
  539. new_thread_count = hill_climbing_update (counter._.max_working, (uint32_t)sample_duration, completions, &g_ThreadPool->heuristic_adjustment_interval);
  540. COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
  541. if (new_thread_count > counter._.max_working)
  542. worker_request (il2cpp::vm::Domain::GetCurrent());
  543. g_ThreadPool->heuristic_sample_start = sample_end;
  544. g_ThreadPool->heuristic_last_adjustment = il2cpp::os::Time::GetTicksMillisecondsMonotonic();
  545. }
  546. g_ThreadPool->heuristic_lock.Release();
  547. }
  548. }
  549. void threadpool_ms_cleanup (void)
  550. {
  551. #ifndef DISABLE_SOCKETS
  552. threadpool_ms_io_cleanup ();
  553. #endif
  554. if (lazy_init_status.IsSet())
  555. cleanup();
  556. }
  557. Il2CppAsyncResult* threadpool_ms_begin_invoke (Il2CppDomain *domain, Il2CppObject *target, MethodInfo *method, void* *params)
  558. {
  559. Il2CppMethodMessage *message;
  560. Il2CppDelegate *async_callback = NULL;
  561. Il2CppObject *state = NULL;
  562. Il2CppAsyncCall* async_call = (Il2CppAsyncCall*)il2cpp::vm::Object::New(il2cpp_defaults.async_call_class);
  563. lazy_initialize ();
  564. MethodInfo *invoke = NULL;
  565. if (il2cpp::vm::Class::HasParent(method->klass, il2cpp_defaults.multicastdelegate_class))
  566. invoke = (MethodInfo*)il2cpp::vm::Class::GetMethodFromName(method->klass, "Invoke", -1);
  567. message = mono_method_call_message_new (method, params, invoke, (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
  568. IL2CPP_OBJECT_SETREF (async_call, msg, message);
  569. IL2CPP_OBJECT_SETREF (async_call, state, state);
  570. if (async_callback)
  571. {
  572. IL2CPP_OBJECT_SETREF (async_call, cb_method, const_cast<MethodInfo*>(il2cpp::vm::Runtime::GetDelegateInvoke(il2cpp::vm::Object::GetClass((Il2CppObject*)async_callback))));
  573. IL2CPP_OBJECT_SETREF (async_call, cb_target, async_callback);
  574. }
  575. Il2CppAsyncResult* async_result = (Il2CppAsyncResult*)il2cpp::vm::Object::New(il2cpp_defaults.asyncresult_class);
  576. IL2CPP_OBJECT_SETREF(async_result, async_delegate, (Il2CppDelegate*)target);
  577. IL2CPP_OBJECT_SETREF(async_result, object_data, async_call);
  578. IL2CPP_OBJECT_SETREF(async_result, async_state, async_call->state);
  579. threadpool_ms_enqueue_work_item (domain, (Il2CppObject*) async_result);
  580. return async_result;
  581. }
  582. Il2CppObject* threadpool_ms_end_invoke (Il2CppAsyncResult *ares, Il2CppArray **out_args, Il2CppObject **exc)
  583. {
  584. Il2CppAsyncCall *ac;
  585. IL2CPP_ASSERT(exc);
  586. IL2CPP_ASSERT(out_args);
  587. *exc = NULL;
  588. *out_args = NULL;
  589. /* check if already finished */
  590. il2cpp_monitor_enter((Il2CppObject*) ares);
  591. if (ares->endinvoke_called)
  592. {
  593. il2cpp::vm::Exception::Raise(il2cpp::vm::Exception::GetInvalidOperationException("Cannot call EndInvoke() repeatedly or concurrently on the same AsyncResult!"));
  594. il2cpp_monitor_exit((Il2CppObject*) ares);
  595. return NULL;
  596. }
  597. ares->endinvoke_called = 1;
  598. /* wait until we are really finished */
  599. if (ares->completed)
  600. {
  601. il2cpp_monitor_exit((Il2CppObject *) ares);
  602. }
  603. else
  604. {
  605. if (!ares->handle)
  606. {
  607. Il2CppWaitHandle *wait_handle = il2cpp::vm::WaitHandle::NewManualResetEvent(false);
  608. IL2CPP_OBJECT_SETREF(ares, handle, wait_handle);
  609. }
  610. il2cpp::os::Handle* wait_event = il2cpp::vm::WaitHandle::GetPlatformHandle((Il2CppWaitHandle*)ares->handle);
  611. il2cpp_monitor_exit((Il2CppObject*) ares);
  612. //MONO_ENTER_GC_SAFE;
  613. wait_event->Wait();
  614. //MONO_EXIT_GC_SAFE;
  615. }
  616. ac = (Il2CppAsyncCall*) ares->object_data;
  617. IL2CPP_ASSERT(ac);
  618. il2cpp::gc::WriteBarrier::GenericStore(exc, ((Il2CppMethodMessage*)ac->msg)->exc);
  619. *out_args = ac->out_args;
  620. return ac->res;
  621. }
  622. void threadpool_ms_suspend (void)
  623. {
  624. if (g_ThreadPool)
  625. g_ThreadPool->suspended = true;
  626. }
  627. void threadpool_ms_resume (void)
  628. {
  629. if (g_ThreadPool)
  630. g_ThreadPool->suspended = false;
  631. }
  632. void ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads)
  633. {
  634. ThreadPoolCounter counter;
  635. if (!worker_threads || !completion_port_threads)
  636. return;
  637. lazy_initialize ();
  638. counter.as_int64_t = COUNTER_READ ();
  639. *worker_threads = std::max (0, g_ThreadPool->limit_worker_max - counter._.active);
  640. *completion_port_threads = g_ThreadPool->limit_io_max;
  641. }
  642. void ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads)
  643. {
  644. if (!worker_threads || !completion_port_threads)
  645. return;
  646. lazy_initialize ();
  647. *worker_threads = g_ThreadPool->limit_worker_min;
  648. *completion_port_threads = g_ThreadPool->limit_io_min;
  649. }
  650. void ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads)
  651. {
  652. if (!worker_threads || !completion_port_threads)
  653. return;
  654. lazy_initialize ();
  655. *worker_threads = g_ThreadPool->limit_worker_max;
  656. *completion_port_threads = g_ThreadPool->limit_io_max;
  657. }
  658. bool ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (int32_t worker_threads, int32_t completion_port_threads)
  659. {
  660. lazy_initialize ();
  661. if (worker_threads <= 0 || worker_threads > g_ThreadPool->limit_worker_max)
  662. return false;
  663. if (completion_port_threads <= 0 || completion_port_threads > g_ThreadPool->limit_io_max)
  664. return false;
  665. g_ThreadPool->limit_worker_min = worker_threads;
  666. g_ThreadPool->limit_io_min = completion_port_threads;
  667. return true;
  668. }
  669. bool ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (int32_t worker_threads, int32_t completion_port_threads)
  670. {
  671. int cpu_count = il2cpp::os::Environment::GetProcessorCount ();
  672. lazy_initialize ();
  673. if (worker_threads < g_ThreadPool->limit_worker_min || worker_threads < cpu_count)
  674. return false;
  675. if (completion_port_threads < g_ThreadPool->limit_io_min || completion_port_threads < cpu_count)
  676. return false;
  677. g_ThreadPool->limit_worker_max = worker_threads;
  678. g_ThreadPool->limit_io_max = completion_port_threads;
  679. return true;
  680. }
  681. void ves_icall_System_Threading_ThreadPool_InitializeVMTp (bool *enable_worker_tracking)
  682. {
  683. if (enable_worker_tracking) {
  684. // TODO implement some kind of switch to have the possibily to use it
  685. *enable_worker_tracking = false;
  686. }
  687. lazy_initialize ();
  688. }
  689. bool ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
  690. {
  691. ThreadPoolCounter counter;
  692. if (il2cpp::vm::Runtime::IsShuttingDown ())
  693. return false;
  694. heuristic_notify_work_completed ();
  695. if (heuristic_should_adjust ())
  696. heuristic_adjust ();
  697. counter.as_int64_t = COUNTER_READ ();
  698. return counter._.working <= counter._.max_working;
  699. }
  700. void ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
  701. {
  702. heuristic_notify_work_completed ();
  703. if (heuristic_should_adjust ())
  704. heuristic_adjust ();
  705. }
  706. void ves_icall_System_Threading_ThreadPool_ReportThreadStatus (bool is_working)
  707. {
  708. // Mono raises a not implemented exception
  709. IL2CPP_NOT_IMPLEMENTED_ICALL(ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus);
  710. IL2CPP_UNREACHABLE;
  711. }
  712. bool ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
  713. {
  714. return worker_request (il2cpp::vm::Domain::GetCurrent());
  715. }
  716. bool ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (Il2CppNativeOverlapped *native_overlapped)
  717. {
  718. // Mono raises a not implemented exception
  719. IL2CPP_NOT_IMPLEMENTED_ICALL(ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus);
  720. IL2CPP_UNREACHABLE;
  721. }
  722. bool ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (void* file_handle)
  723. {
  724. /* This copy the behavior of the current Mono implementation */
  725. return true;
  726. }
  727. bool ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)
  728. {
  729. return false;
  730. }
  731. void ves_icall_System_Threading_ThreadPool_NotifyWorkItemQueued (void)
  732. {
  733. // We don't need an implementation here. The Mono code only uses this method to increment a performance counter that we don't have in IL2CPP.
  734. }