/* * threadpool-ms.c: Microsoft threadpool runtime support * * Author: * Ludovic Henry (ludovic.henry@xamarin.com) * * Copyright 2015 Xamarin, Inc (http://www.xamarin.com) * Licensed under the MIT license. See LICENSE file in the project root for full license information. */ // // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. // // Files: // - src/vm/comthreadpool.cpp // - src/vm/win32threadpoolcpp // - src/vm/threadpoolrequest.cpp // - src/vm/hillclimbing.cpp // // Ported from C++ to C and adjusted to Mono runtime #include "il2cpp-config.h" #include #define _USE_MATH_DEFINES // needed by MSVC to define math constants #include #include #include #include "math.h" #include "il2cpp-api.h" #include "gc/GarbageCollector.h" #include "gc/GCHandle.h" #include "gc/WriteBarrier.h" #include "icalls/mscorlib/System.Runtime.Remoting.Messaging/MonoMethodMessage.h" #include "mono/ThreadPool/threadpool-ms.h" #include "mono/ThreadPool/threadpool-ms-io.h" #include "mono/ThreadPool/ThreadPoolDataStructures.h" #include "mono/ThreadPool/ThreadPoolMacros.h" #include "mono/ThreadPool/ThreadPoolMonitorThread.h" #include "mono/ThreadPool/ThreadPoolWorkerThread.h" #include "il2cpp-object-internals.h" #include "os/CpuInfo.h" #include "os/Environment.h" #include "os/Mutex.h" #include "os/Time.h" #include "utils/CallOnce.h" #include "vm/Array.h" #include "vm/Class.h" #include "vm/Domain.h" #include "vm/Exception.h" #include "vm/Object.h" #include "vm/Reflection.h" #include "vm/Random.h" #include "vm/Runtime.h" #include "vm/String.h" #include "vm/Thread.h" #include "vm/WaitHandle.h" #include #ifndef CLAMP #define CLAMP(a,low,high) (((a) < (low)) ? (low) : (((a) > (high)) ? (high) : (a))) #endif ThreadPool* g_ThreadPool; /* The exponent to apply to the gain. 1.0 means to use linear gain, * higher values will enhance large moves and damp small ones. * default: 2.0 */ #define HILL_CLIMBING_GAIN_EXPONENT 2.0 /* The 'cost' of a thread. 0 means drive for increased throughput regardless * of thread count, higher values bias more against higher thread counts. * default: 0.15 */ #define HILL_CLIMBING_BIAS 0.15 #define HILL_CLIMBING_WAVE_PERIOD 4 #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20 #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0 #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8 #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0 #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4 #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20 #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10 #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200 #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01 #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15 static il2cpp::utils::OnceFlag lazy_init_status; static Il2CppMethodMessage * mono_method_call_message_new(MethodInfo *method, void* *params, MethodInfo *invoke, Il2CppDelegate **cb, Il2CppObject **state) { Il2CppDomain *domain = il2cpp::vm::Domain::GetCurrent(); Il2CppMethodMessage *msg; int i, count; msg = (Il2CppMethodMessage *)il2cpp::vm::Object::New(il2cpp_defaults.mono_method_message_class); if (invoke) { Il2CppReflectionMethod *rm = il2cpp::vm::Reflection::GetMethodObject(invoke, NULL); il2cpp::icalls::mscorlib::System::Runtime::Remoting::Messaging::MonoMethodMessage::InitMessage(msg, rm, NULL); count = method->parameters_count - 2; } else { Il2CppReflectionMethod *rm = il2cpp::vm::Reflection::GetMethodObject(method, NULL); il2cpp::icalls::mscorlib::System::Runtime::Remoting::Messaging::MonoMethodMessage::InitMessage(msg, rm, NULL); count = method->parameters_count; } for (i = 0; i < count; i++) { void* vpos; Il2CppClass *klass; Il2CppObject *arg; vpos = params[i]; klass = il2cpp_class_from_type(method->parameters[i]); arg = (Il2CppObject*)vpos; il2cpp_array_setref(msg->args, i, arg); } if (cb != NULL && state != NULL) { *cb = (Il2CppDelegate *)params[i]; i++; *state = (Il2CppObject *)params[i]; } return msg; } static void* cpu_info_create() { return il2cpp::os::CpuInfo::Create(); } ThreadPool::ThreadPool() : parked_threads_count(0), worker_creation_current_second(-1), worker_creation_current_count(0), heuristic_completions(0), heuristic_sample_start(0), heuristic_last_dequeue(0), heuristic_last_adjustment(0), heuristic_adjustment_interval(10), limit_worker_min(0), limit_worker_max(0), limit_io_min(0), limit_io_max(0), cpu_usage(0), suspended(false), parked_threads_cond(active_threads_lock) { counters.as_int64_t = 0; cpu_usage_state = cpu_info_create(); } static void initialize(void* arg) { ThreadPoolHillClimbing *hc; //const char *threads_per_cpu_env; int threads_per_cpu; int threads_count; IL2CPP_ASSERT(!g_ThreadPool); g_ThreadPool = new ThreadPool(); IL2CPP_ASSERT(g_ThreadPool); il2cpp::vm::Random::Open(); hc = &g_ThreadPool->heuristic_hill_climbing; hc->wave_period = HILL_CLIMBING_WAVE_PERIOD; hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE; hc->thread_magnitude_multiplier = (double) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER; hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE; hc->target_throughput_ratio = (double) HILL_CLIMBING_BIAS; hc->target_signal_to_noise_ratio = (double) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO; hc->max_change_per_second = (double) HILL_CLIMBING_MAX_CHANGE_PER_SECOND; hc->max_change_per_sample = (double) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE; hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW; hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH; hc->throughput_error_smoothing_factor = (double) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR; hc->gain_exponent = (double) HILL_CLIMBING_GAIN_EXPONENT; hc->max_sample_error = (double) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT; hc->current_control_setting = 0; hc->total_samples = 0; hc->last_thread_count = 0; hc->average_throughput_noise = 0; hc->elapsed_since_last_change = 0; hc->accumulated_completion_count = 0; hc->accumulated_sample_duration = 0; hc->samples = (double*)IL2CPP_MALLOC_ZERO (sizeof(double) * hc->samples_to_measure); hc->thread_counts = (double*)IL2CPP_MALLOC_ZERO(sizeof(double) * hc->samples_to_measure); hc->random_interval_generator = il2cpp::vm::Random::Create (); hc->current_sample_interval = il2cpp::vm::Random::Next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); //std::string threads_per_cpu_env = il2cpp::os::Environment::GetEnvironmentVariable("IL2CPP_THREADS_PER_CPU"); //if (threads_per_cpu_env.empty()) threads_per_cpu = 1; /*else threads_per_cpu = CLAMP (atoi (threads_per_cpu_env.c_str()), 1, 50);*/ threads_count = il2cpp::os::Environment::GetProcessorCount() * threads_per_cpu; g_ThreadPool->limit_worker_min = g_ThreadPool->limit_io_min = threads_count; #if IL2CPP_TARGET_ANDROID || IL2CPP_TARGET_IOS g_ThreadPool->limit_worker_max = g_ThreadPool->limit_io_max = CLAMP (threads_count * 100, std::min (threads_count, 200), std::max (threads_count, 200)); #else g_ThreadPool->limit_worker_max = g_ThreadPool->limit_io_max = threads_count * 100; #endif g_ThreadPool->counters._.max_working = g_ThreadPool->limit_worker_min; } static void lazy_initialize() { il2cpp::utils::CallOnce(lazy_init_status, initialize, NULL); } static void worker_kill(Il2CppInternalThread* thread) { if (thread == il2cpp::vm::Thread::CurrentInternal()) return; il2cpp::vm::Thread::Stop(thread); } static void cleanup (void) { unsigned int i; /* we make the assumption along the code that we are * cleaning up only if the runtime is shutting down */ IL2CPP_ASSERT(il2cpp::vm::Runtime::IsShuttingDown ()); while (GetMonitorStatus() != MONITOR_STATUS_NOT_RUNNING) il2cpp::vm::Thread::Sleep(1); std::vector working_threads; g_ThreadPool->active_threads_lock.Acquire(); working_threads = g_ThreadPool->working_threads; g_ThreadPool->active_threads_lock.Release(); /* stop all threadpool->working_threads */ for (i = 0; i < working_threads.size(); ++i) worker_kill (working_threads[i]); /* unpark all g_ThreadPool->parked_threads */ g_ThreadPool->parked_threads_cond.NotifyAll(); } bool threadpool_ms_enqueue_work_item (Il2CppDomain *domain, Il2CppObject *work_item) { static Il2CppClass *threadpool_class = NULL; static MethodInfo *unsafe_queue_custom_work_item_method = NULL; //Il2CppDomain *current_domain; bool f; void* args [2]; IL2CPP_ASSERT(work_item); if (!threadpool_class) threadpool_class = il2cpp::vm::Class::FromName(il2cpp_defaults.corlib, "System.Threading", "ThreadPool"); if (!unsafe_queue_custom_work_item_method) unsafe_queue_custom_work_item_method = (MethodInfo*)il2cpp::vm::Class::GetMethodFromName(threadpool_class, "UnsafeQueueCustomWorkItem", 2); IL2CPP_ASSERT(unsafe_queue_custom_work_item_method); f = false; args [0] = (void*) work_item; args [1] = (void*) &f; Il2CppObject *result = il2cpp::vm::Runtime::InvokeWithThrow(unsafe_queue_custom_work_item_method, NULL, args); return true; } /* LOCKING: threadpool->domains_lock must be held */ static ThreadPoolDomain* domain_get(Il2CppDomain *domain, bool create) { ThreadPoolDomain *tpdomain = NULL; unsigned int i; IL2CPP_ASSERT(domain); for (i = 0; i < g_ThreadPool->domains.size(); ++i) { tpdomain = g_ThreadPool->domains[i]; if (tpdomain->domain == domain) return tpdomain; } if (create) { tpdomain = new ThreadPoolDomain(); tpdomain->domain = domain; g_ThreadPool->domains.push_back(tpdomain); } return tpdomain; } bool worker_try_unpark() { bool worker_unparked = true; g_ThreadPool->active_threads_lock.AcquireScoped([&worker_unparked] { if (g_ThreadPool->parked_threads_count == 0) worker_unparked = false; else g_ThreadPool->parked_threads_cond.Notify(1); }); return worker_unparked; } static bool worker_request (Il2CppDomain *domain) { ThreadPoolDomain *tpdomain; IL2CPP_ASSERT(domain); IL2CPP_ASSERT(g_ThreadPool); if (il2cpp::vm::Runtime::IsShuttingDown ()) return false; g_ThreadPool->domains_lock.Acquire(); /* synchronize check with worker_thread */ //if (mono_domain_is_unloading (domain)) { //mono_coop_mutex_unlock (&threadpool->domains_lock); /*return false; }*/ tpdomain = domain_get (domain, true); IL2CPP_ASSERT(tpdomain); tpdomain->outstanding_request ++; /*mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d", mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);*/ g_ThreadPool->domains_lock.Release(); if (g_ThreadPool->suspended) return false; monitor_ensure_running (); if (worker_try_unpark ()) { //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ()); return true; } if (worker_try_create ()) { //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ()); return true; } //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ()); return false; } static void hill_climbing_change_thread_count (int16_t new_thread_count, ThreadPoolHeuristicStateTransition transition) { ThreadPoolHillClimbing *hc; IL2CPP_ASSERT(g_ThreadPool); hc = &g_ThreadPool->heuristic_hill_climbing; //mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count); hc->last_thread_count = new_thread_count; hc->current_sample_interval = il2cpp::vm::Random::Next(&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high); hc->elapsed_since_last_change = 0; hc->completions_since_last_change = 0; } void hill_climbing_force_change (int16_t new_thread_count, ThreadPoolHeuristicStateTransition transition) { ThreadPoolHillClimbing *hc; IL2CPP_ASSERT(g_ThreadPool); hc = &g_ThreadPool->heuristic_hill_climbing; if (new_thread_count != hc->last_thread_count) { hc->current_control_setting += new_thread_count - hc->last_thread_count; hill_climbing_change_thread_count (new_thread_count, transition); } } static std::complex hill_climbing_get_wave_component (double *samples, unsigned int sample_count, double period) { ThreadPoolHillClimbing *hc; double w, cosine, sine, coeff, q0, q1, q2; unsigned int i; IL2CPP_ASSERT(g_ThreadPool); IL2CPP_ASSERT(sample_count >= period); IL2CPP_ASSERT(period >= 2); hc = &g_ThreadPool->heuristic_hill_climbing; w = 2.0 * M_PI / period; cosine = cos (w); sine = sin (w); coeff = 2.0 * cosine; q0 = q1 = q2 = 0; for (i = 0; i < sample_count; ++i) { q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure]; q2 = q1; q1 = q0; } return (std::complex (q1 - q2 * cosine, (q2 * sine)) / ((double)sample_count)); } static int16_t hill_climbing_update (int16_t current_thread_count, uint32_t sample_duration, int32_t completions, int64_t *adjustment_interval) { ThreadPoolHillClimbing *hc; ThreadPoolHeuristicStateTransition transition; double throughput; double throughput_error_estimate; double confidence; double move; double gain; int sample_index; int sample_count; int new_thread_wave_magnitude; int new_thread_count; std::complex thread_wave_component; std::complex throughput_wave_component; std::complex ratio; IL2CPP_ASSERT(g_ThreadPool); IL2CPP_ASSERT(adjustment_interval); hc = &g_ThreadPool->heuristic_hill_climbing; /* If someone changed the thread count without telling us, update our records accordingly. */ if (current_thread_count != hc->last_thread_count) hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING); /* Update the cumulative stats for this thread count */ hc->elapsed_since_last_change += sample_duration; hc->completions_since_last_change += completions; /* Add in any data we've already collected about this sample */ sample_duration += (uint32_t)hc->accumulated_sample_duration; completions += hc->accumulated_completion_count; /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end * of each work item, we are goinng to be missing some data about what really happened during the * sample interval. The count produced by each thread includes an initial work item that may have * started well before the start of the interval, and each thread may have been running some new * work item for some time before the end of the interval, which did not yet get counted. So * our count is going to be off by +/- threadCount workitems. * * The exception is that the thread that reported to us last time definitely wasn't running any work * at that time, and the thread that's reporting now definitely isn't running a work item now. So * we really only need to consider threadCount-1 threads. * * Thus the percent error in our count is +/- (threadCount-1)/numCompletions. * * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction, * then the next one likely will be too. The one after that will include the sum of the completions * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency * range we're targeting, which will not be filtered by the frequency-domain translation. */ if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) { /* Not accurate enough yet. Let's accumulate the data so * far, and tell the ThreadPool to collect a little more. */ hc->accumulated_sample_duration = sample_duration; hc->accumulated_completion_count = completions; *adjustment_interval = 10; return current_thread_count; } /* We've got enouugh data for our sample; reset our accumulators for next time. */ hc->accumulated_sample_duration = 0; hc->accumulated_completion_count = 0; /* Add the current thread count and throughput sample to our history. */ throughput = ((double) completions) / sample_duration; sample_index = hc->total_samples % hc->samples_to_measure; hc->samples [sample_index] = throughput; hc->thread_counts [sample_index] = current_thread_count; hc->total_samples ++; /* Set up defaults for our metrics. */ throughput_error_estimate = 0; confidence = 0; transition = TRANSITION_WARMUP; /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */ sample_count = ((int) std::min (hc->total_samples - 1, (int64_t)hc->samples_to_measure) / hc->wave_period) * hc->wave_period; if (sample_count > hc->wave_period) { int i; double average_throughput; double average_thread_count; double sample_sum = 0; double thread_sum = 0; /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */ for (i = 0; i < sample_count; ++i) { unsigned int j = (hc->total_samples - sample_count + i) % hc->samples_to_measure; sample_sum += hc->samples [j]; thread_sum += hc->thread_counts [j]; } average_throughput = sample_sum / sample_count; average_thread_count = thread_sum / sample_count; if (average_throughput > 0 && average_thread_count > 0) { double noise_for_confidence, adjacent_period_1, adjacent_period_2; /* Calculate the periods of the adjacent frequency bands we'll be using to * measure noise levels. We want the two adjacent Fourier frequency bands. */ adjacent_period_1 = sample_count / (((double) sample_count) / ((double) hc->wave_period) + 1); adjacent_period_2 = sample_count / (((double) sample_count) / ((double) hc->wave_period) - 1); /* Get the the three different frequency components of the throughput (scaled by average * throughput). Our "error" estimate (the amount of noise that might be present in the * frequency band we're really interested in) is the average of the adjacent bands. */ throughput_wave_component = hill_climbing_get_wave_component(hc->samples, sample_count, hc->wave_period) / average_throughput; //throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput); throughput_error_estimate = std::abs(hill_climbing_get_wave_component(hc->samples, sample_count, adjacent_period_1) / average_throughput); //throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput)); if (adjacent_period_2 <= sample_count) { throughput_error_estimate = std::max (throughput_error_estimate, std::abs (hill_climbing_get_wave_component ( hc->samples, sample_count, adjacent_period_2) / average_throughput)); } /* Do the same for the thread counts, so we have something to compare to. We don't * measure thread count noise, because there is none; these are exact measurements. */ thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count; /* Update our moving average of the throughput noise. We'll use this * later as feedback to determine the new size of the thread wave. */ if (hc->average_throughput_noise == 0) { hc->average_throughput_noise = throughput_error_estimate; } else { hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate) + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise); } if (std::abs (thread_wave_component) > 0) { /* Adjust the throughput wave so it's centered around the target wave, * and then calculate the adjusted throughput/thread ratio. */ ratio = ((throughput_wave_component - (thread_wave_component * hc->target_throughput_ratio)) / thread_wave_component); transition = TRANSITION_CLIMBING_MOVE; } else { //ratio = mono_double_complex_make (0, 0); transition = TRANSITION_STABILIZING; } noise_for_confidence = std::max (hc->average_throughput_noise, throughput_error_estimate); if (noise_for_confidence > 0) { confidence = std::abs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio; } else { /* there is no noise! */ confidence = 1.0; } } } /* We use just the real part of the complex ratio we just calculated. If the throughput signal * is exactly in phase with the thread signal, this will be the same as taking the magnitude of * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move * backward (because this indicates that our changes are having the opposite of the intended effect). * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're * having a negative or positive effect on throughput. */ move = std::real (ratio); move = CLAMP (move, -1.0, 1.0); /* Apply our confidence multiplier. */ move *= CLAMP (confidence, -1.0, 1.0); /* Now apply non-linear gain, such that values around zero are attenuated, while higher values * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */ gain = hc->max_change_per_second * sample_duration; move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain; move = std::min (move, hc->max_change_per_sample); /* If the result was positive, and CPU is > 95%, refuse the move. */ if (move > 0.0 && g_ThreadPool->cpu_usage > CPU_USAGE_HIGH) move = 0.0; /* Apply the move to our control setting. */ hc->current_control_setting += move; /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */ new_thread_wave_magnitude = (int)(0.5 + (hc->current_control_setting * hc->average_throughput_noise * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0)); new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude); /* Make sure our control setting is within the ThreadPool's limits. */ hc->current_control_setting = CLAMP (hc->current_control_setting, g_ThreadPool->limit_worker_min, g_ThreadPool->limit_worker_max - new_thread_wave_magnitude); /* Calculate the new thread count (control setting + square wave). */ new_thread_count = (int)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2)); /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */ new_thread_count = CLAMP (new_thread_count, g_ThreadPool->limit_worker_min, g_ThreadPool->limit_worker_max); if (new_thread_count != current_thread_count) hill_climbing_change_thread_count (new_thread_count, transition); if (std::real (ratio) < 0.0 && new_thread_count == g_ThreadPool->limit_worker_min) *adjustment_interval = (int)(0.5 + hc->current_sample_interval * (10.0 * std::max (-1.0 * std::real (ratio), 1.0))); else *adjustment_interval = hc->current_sample_interval; return new_thread_count; } static void heuristic_notify_work_completed (void) { IL2CPP_ASSERT(g_ThreadPool); g_ThreadPool->heuristic_completions++; g_ThreadPool->heuristic_last_dequeue = il2cpp::os::Time::GetTicksMillisecondsMonotonic(); } static bool heuristic_should_adjust (void) { IL2CPP_ASSERT(g_ThreadPool); if (g_ThreadPool->heuristic_last_dequeue > g_ThreadPool->heuristic_last_adjustment + g_ThreadPool->heuristic_adjustment_interval) { ThreadPoolCounter counter; counter.as_int64_t = COUNTER_READ(); if (counter._.working <= counter._.max_working) return true; } return false; } static void heuristic_adjust (void) { IL2CPP_ASSERT(g_ThreadPool); if (g_ThreadPool->heuristic_lock.TryAcquire()) { int32_t completions = g_ThreadPool->heuristic_completions.exchange(0); int64_t sample_end = il2cpp::os::Time::GetTicksMillisecondsMonotonic(); int64_t sample_duration = sample_end - g_ThreadPool->heuristic_sample_start; if (sample_duration >= g_ThreadPool->heuristic_adjustment_interval / 2) { ThreadPoolCounter counter; int16_t new_thread_count; counter.as_int64_t = COUNTER_READ (); new_thread_count = hill_climbing_update (counter._.max_working, (uint32_t)sample_duration, completions, &g_ThreadPool->heuristic_adjustment_interval); COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; }); if (new_thread_count > counter._.max_working) worker_request (il2cpp::vm::Domain::GetCurrent()); g_ThreadPool->heuristic_sample_start = sample_end; g_ThreadPool->heuristic_last_adjustment = il2cpp::os::Time::GetTicksMillisecondsMonotonic(); } g_ThreadPool->heuristic_lock.Release(); } } void threadpool_ms_cleanup (void) { #ifndef DISABLE_SOCKETS threadpool_ms_io_cleanup (); #endif if (lazy_init_status.IsSet()) cleanup(); } Il2CppAsyncResult* threadpool_ms_begin_invoke (Il2CppDomain *domain, Il2CppObject *target, MethodInfo *method, void* *params) { Il2CppMethodMessage *message; Il2CppDelegate *async_callback = NULL; Il2CppObject *state = NULL; Il2CppAsyncCall* async_call = (Il2CppAsyncCall*)il2cpp::vm::Object::New(il2cpp_defaults.async_call_class); lazy_initialize (); MethodInfo *invoke = NULL; if (il2cpp::vm::Class::HasParent(method->klass, il2cpp_defaults.multicastdelegate_class)) invoke = (MethodInfo*)il2cpp::vm::Class::GetMethodFromName(method->klass, "Invoke", -1); message = mono_method_call_message_new (method, params, invoke, (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL); IL2CPP_OBJECT_SETREF (async_call, msg, message); IL2CPP_OBJECT_SETREF (async_call, state, state); if (async_callback) { IL2CPP_OBJECT_SETREF (async_call, cb_method, const_cast(il2cpp::vm::Runtime::GetDelegateInvoke(il2cpp::vm::Object::GetClass((Il2CppObject*)async_callback)))); IL2CPP_OBJECT_SETREF (async_call, cb_target, async_callback); } Il2CppAsyncResult* async_result = (Il2CppAsyncResult*)il2cpp::vm::Object::New(il2cpp_defaults.asyncresult_class); IL2CPP_OBJECT_SETREF(async_result, async_delegate, (Il2CppDelegate*)target); IL2CPP_OBJECT_SETREF(async_result, object_data, async_call); IL2CPP_OBJECT_SETREF(async_result, async_state, async_call->state); threadpool_ms_enqueue_work_item (domain, (Il2CppObject*) async_result); return async_result; } Il2CppObject* threadpool_ms_end_invoke (Il2CppAsyncResult *ares, Il2CppArray **out_args, Il2CppObject **exc) { Il2CppAsyncCall *ac; IL2CPP_ASSERT(exc); IL2CPP_ASSERT(out_args); *exc = NULL; *out_args = NULL; /* check if already finished */ il2cpp_monitor_enter((Il2CppObject*) ares); if (ares->endinvoke_called) { il2cpp::vm::Exception::Raise(il2cpp::vm::Exception::GetInvalidOperationException("Cannot call EndInvoke() repeatedly or concurrently on the same AsyncResult!")); il2cpp_monitor_exit((Il2CppObject*) ares); return NULL; } ares->endinvoke_called = 1; /* wait until we are really finished */ if (ares->completed) { il2cpp_monitor_exit((Il2CppObject *) ares); } else { if (!ares->handle) { Il2CppWaitHandle *wait_handle = il2cpp::vm::WaitHandle::NewManualResetEvent(false); IL2CPP_OBJECT_SETREF(ares, handle, wait_handle); } il2cpp::os::Handle* wait_event = il2cpp::vm::WaitHandle::GetPlatformHandle((Il2CppWaitHandle*)ares->handle); il2cpp_monitor_exit((Il2CppObject*) ares); //MONO_ENTER_GC_SAFE; wait_event->Wait(); //MONO_EXIT_GC_SAFE; } ac = (Il2CppAsyncCall*) ares->object_data; IL2CPP_ASSERT(ac); il2cpp::gc::WriteBarrier::GenericStore(exc, ((Il2CppMethodMessage*)ac->msg)->exc); *out_args = ac->out_args; return ac->res; } void threadpool_ms_suspend (void) { if (g_ThreadPool) g_ThreadPool->suspended = true; } void threadpool_ms_resume (void) { if (g_ThreadPool) g_ThreadPool->suspended = false; } void ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads) { ThreadPoolCounter counter; if (!worker_threads || !completion_port_threads) return; lazy_initialize (); counter.as_int64_t = COUNTER_READ (); *worker_threads = std::max (0, g_ThreadPool->limit_worker_max - counter._.active); *completion_port_threads = g_ThreadPool->limit_io_max; } void ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads) { if (!worker_threads || !completion_port_threads) return; lazy_initialize (); *worker_threads = g_ThreadPool->limit_worker_min; *completion_port_threads = g_ThreadPool->limit_io_min; } void ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads) { if (!worker_threads || !completion_port_threads) return; lazy_initialize (); *worker_threads = g_ThreadPool->limit_worker_max; *completion_port_threads = g_ThreadPool->limit_io_max; } bool ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (int32_t worker_threads, int32_t completion_port_threads) { lazy_initialize (); if (worker_threads <= 0 || worker_threads > g_ThreadPool->limit_worker_max) return false; if (completion_port_threads <= 0 || completion_port_threads > g_ThreadPool->limit_io_max) return false; g_ThreadPool->limit_worker_min = worker_threads; g_ThreadPool->limit_io_min = completion_port_threads; return true; } bool ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (int32_t worker_threads, int32_t completion_port_threads) { int cpu_count = il2cpp::os::Environment::GetProcessorCount (); lazy_initialize (); if (worker_threads < g_ThreadPool->limit_worker_min || worker_threads < cpu_count) return false; if (completion_port_threads < g_ThreadPool->limit_io_min || completion_port_threads < cpu_count) return false; g_ThreadPool->limit_worker_max = worker_threads; g_ThreadPool->limit_io_max = completion_port_threads; return true; } void ves_icall_System_Threading_ThreadPool_InitializeVMTp (bool *enable_worker_tracking) { if (enable_worker_tracking) { // TODO implement some kind of switch to have the possibily to use it *enable_worker_tracking = false; } lazy_initialize (); } bool ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void) { ThreadPoolCounter counter; if (il2cpp::vm::Runtime::IsShuttingDown ()) return false; heuristic_notify_work_completed (); if (heuristic_should_adjust ()) heuristic_adjust (); counter.as_int64_t = COUNTER_READ (); return counter._.working <= counter._.max_working; } void ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void) { heuristic_notify_work_completed (); if (heuristic_should_adjust ()) heuristic_adjust (); } void ves_icall_System_Threading_ThreadPool_ReportThreadStatus (bool is_working) { // Mono raises a not implemented exception IL2CPP_NOT_IMPLEMENTED_ICALL(ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus); IL2CPP_UNREACHABLE; } bool ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void) { return worker_request (il2cpp::vm::Domain::GetCurrent()); } bool ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (Il2CppNativeOverlapped *native_overlapped) { // Mono raises a not implemented exception IL2CPP_NOT_IMPLEMENTED_ICALL(ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus); IL2CPP_UNREACHABLE; } bool ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (void* file_handle) { /* This copy the behavior of the current Mono implementation */ return true; } bool ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void) { return false; } void ves_icall_System_Threading_ThreadPool_NotifyWorkItemQueued (void) { // We don't need an implementation here. The Mono code only uses this method to increment a performance counter that we don't have in IL2CPP. }