123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934 |
- /*
- * threadpool-ms.c: Microsoft threadpool runtime support
- *
- * Author:
- * Ludovic Henry (ludovic.henry@xamarin.com)
- *
- * Copyright 2015 Xamarin, Inc (http://www.xamarin.com)
- * Licensed under the MIT license. See LICENSE file in the project root for full license information.
- */
- //
- // Copyright (c) Microsoft. All rights reserved.
- // Licensed under the MIT license. See LICENSE file in the project root for full license information.
- //
- // Files:
- // - src/vm/comthreadpool.cpp
- // - src/vm/win32threadpoolcpp
- // - src/vm/threadpoolrequest.cpp
- // - src/vm/hillclimbing.cpp
- //
- // Ported from C++ to C and adjusted to Mono runtime
- #include "il2cpp-config.h"
- #include <stdlib.h>
- #define _USE_MATH_DEFINES // needed by MSVC to define math constants
- #include <algorithm>
- #include <cmath>
- #include <complex>
- #include "math.h"
- #include "il2cpp-api.h"
- #include "gc/GarbageCollector.h"
- #include "gc/GCHandle.h"
- #include "gc/WriteBarrier.h"
- #include "icalls/mscorlib/System.Runtime.Remoting.Messaging/MonoMethodMessage.h"
- #include "mono/ThreadPool/threadpool-ms.h"
- #include "mono/ThreadPool/threadpool-ms-io.h"
- #include "mono/ThreadPool/ThreadPoolDataStructures.h"
- #include "mono/ThreadPool/ThreadPoolMacros.h"
- #include "mono/ThreadPool/ThreadPoolMonitorThread.h"
- #include "mono/ThreadPool/ThreadPoolWorkerThread.h"
- #include "il2cpp-object-internals.h"
- #include "os/CpuInfo.h"
- #include "os/Environment.h"
- #include "os/Mutex.h"
- #include "os/Time.h"
- #include "utils/CallOnce.h"
- #include "vm/Array.h"
- #include "vm/Class.h"
- #include "vm/Domain.h"
- #include "vm/Exception.h"
- #include "vm/Object.h"
- #include "vm/Reflection.h"
- #include "vm/Random.h"
- #include "vm/Runtime.h"
- #include "vm/String.h"
- #include "vm/Thread.h"
- #include "vm/WaitHandle.h"
- #include <icalls/mscorlib/System.Runtime.Remoting.Messaging/MonoMethodMessage.h>
- #ifndef CLAMP
- #define CLAMP(a,low,high) (((a) < (low)) ? (low) : (((a) > (high)) ? (high) : (a)))
- #endif
- ThreadPool* g_ThreadPool;
- /* The exponent to apply to the gain. 1.0 means to use linear gain,
- * higher values will enhance large moves and damp small ones.
- * default: 2.0 */
- #define HILL_CLIMBING_GAIN_EXPONENT 2.0
- /* The 'cost' of a thread. 0 means drive for increased throughput regardless
- * of thread count, higher values bias more against higher thread counts.
- * default: 0.15 */
- #define HILL_CLIMBING_BIAS 0.15
- #define HILL_CLIMBING_WAVE_PERIOD 4
- #define HILL_CLIMBING_MAX_WAVE_MAGNITUDE 20
- #define HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER 1.0
- #define HILL_CLIMBING_WAVE_HISTORY_SIZE 8
- #define HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO 3.0
- #define HILL_CLIMBING_MAX_CHANGE_PER_SECOND 4
- #define HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE 20
- #define HILL_CLIMBING_SAMPLE_INTERVAL_LOW 10
- #define HILL_CLIMBING_SAMPLE_INTERVAL_HIGH 200
- #define HILL_CLIMBING_ERROR_SMOOTHING_FACTOR 0.01
- #define HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT 0.15
- static il2cpp::utils::OnceFlag lazy_init_status;
- static Il2CppMethodMessage *
- mono_method_call_message_new(MethodInfo *method, void* *params, MethodInfo *invoke,
- Il2CppDelegate **cb, Il2CppObject **state)
- {
- Il2CppDomain *domain = il2cpp::vm::Domain::GetCurrent();
- Il2CppMethodMessage *msg;
- int i, count;
- msg = (Il2CppMethodMessage *)il2cpp::vm::Object::New(il2cpp_defaults.mono_method_message_class);
- if (invoke) {
- Il2CppReflectionMethod *rm = il2cpp::vm::Reflection::GetMethodObject(invoke, NULL);
- il2cpp::icalls::mscorlib::System::Runtime::Remoting::Messaging::MonoMethodMessage::InitMessage(msg, rm, NULL);
- count = method->parameters_count - 2;
- }
- else {
- Il2CppReflectionMethod *rm = il2cpp::vm::Reflection::GetMethodObject(method, NULL);
- il2cpp::icalls::mscorlib::System::Runtime::Remoting::Messaging::MonoMethodMessage::InitMessage(msg, rm, NULL);
- count = method->parameters_count;
- }
- for (i = 0; i < count; i++) {
- void* vpos;
- Il2CppClass *klass;
- Il2CppObject *arg;
- vpos = params[i];
- klass = il2cpp_class_from_type(method->parameters[i]);
- arg = (Il2CppObject*)vpos;
- il2cpp_array_setref(msg->args, i, arg);
- }
- if (cb != NULL && state != NULL) {
- *cb = (Il2CppDelegate *)params[i];
- i++;
- *state = (Il2CppObject *)params[i];
- }
- return msg;
- }
- static void* cpu_info_create()
- {
- return il2cpp::os::CpuInfo::Create();
- }
- ThreadPool::ThreadPool() :
- parked_threads_count(0),
- worker_creation_current_second(-1),
- worker_creation_current_count(0),
- heuristic_completions(0),
- heuristic_sample_start(0),
- heuristic_last_dequeue(0),
- heuristic_last_adjustment(0),
- heuristic_adjustment_interval(10),
- limit_worker_min(0),
- limit_worker_max(0),
- limit_io_min(0),
- limit_io_max(0),
- cpu_usage(0),
- suspended(false),
- parked_threads_cond(active_threads_lock)
- {
- counters.as_int64_t = 0;
- cpu_usage_state = cpu_info_create();
- }
- static void initialize(void* arg)
- {
- ThreadPoolHillClimbing *hc;
- //const char *threads_per_cpu_env;
- int threads_per_cpu;
- int threads_count;
- IL2CPP_ASSERT(!g_ThreadPool);
- g_ThreadPool = new ThreadPool();
- IL2CPP_ASSERT(g_ThreadPool);
- il2cpp::vm::Random::Open();
- hc = &g_ThreadPool->heuristic_hill_climbing;
- hc->wave_period = HILL_CLIMBING_WAVE_PERIOD;
- hc->max_thread_wave_magnitude = HILL_CLIMBING_MAX_WAVE_MAGNITUDE;
- hc->thread_magnitude_multiplier = (double) HILL_CLIMBING_WAVE_MAGNITUDE_MULTIPLIER;
- hc->samples_to_measure = hc->wave_period * HILL_CLIMBING_WAVE_HISTORY_SIZE;
- hc->target_throughput_ratio = (double) HILL_CLIMBING_BIAS;
- hc->target_signal_to_noise_ratio = (double) HILL_CLIMBING_TARGET_SIGNAL_TO_NOISE_RATIO;
- hc->max_change_per_second = (double) HILL_CLIMBING_MAX_CHANGE_PER_SECOND;
- hc->max_change_per_sample = (double) HILL_CLIMBING_MAX_CHANGE_PER_SAMPLE;
- hc->sample_interval_low = HILL_CLIMBING_SAMPLE_INTERVAL_LOW;
- hc->sample_interval_high = HILL_CLIMBING_SAMPLE_INTERVAL_HIGH;
- hc->throughput_error_smoothing_factor = (double) HILL_CLIMBING_ERROR_SMOOTHING_FACTOR;
- hc->gain_exponent = (double) HILL_CLIMBING_GAIN_EXPONENT;
- hc->max_sample_error = (double) HILL_CLIMBING_MAX_SAMPLE_ERROR_PERCENT;
- hc->current_control_setting = 0;
- hc->total_samples = 0;
- hc->last_thread_count = 0;
- hc->average_throughput_noise = 0;
- hc->elapsed_since_last_change = 0;
- hc->accumulated_completion_count = 0;
- hc->accumulated_sample_duration = 0;
- hc->samples = (double*)IL2CPP_MALLOC_ZERO (sizeof(double) * hc->samples_to_measure);
- hc->thread_counts = (double*)IL2CPP_MALLOC_ZERO(sizeof(double) * hc->samples_to_measure);
- hc->random_interval_generator = il2cpp::vm::Random::Create ();
- hc->current_sample_interval = il2cpp::vm::Random::Next (&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
- //std::string threads_per_cpu_env = il2cpp::os::Environment::GetEnvironmentVariable("IL2CPP_THREADS_PER_CPU");
- //if (threads_per_cpu_env.empty())
- threads_per_cpu = 1;
- /*else
- threads_per_cpu = CLAMP (atoi (threads_per_cpu_env.c_str()), 1, 50);*/
- threads_count = il2cpp::os::Environment::GetProcessorCount() * threads_per_cpu;
- g_ThreadPool->limit_worker_min = g_ThreadPool->limit_io_min = threads_count;
- #if IL2CPP_TARGET_ANDROID || IL2CPP_TARGET_IOS
- g_ThreadPool->limit_worker_max = g_ThreadPool->limit_io_max = CLAMP (threads_count * 100, std::min (threads_count, 200), std::max (threads_count, 200));
- #else
- g_ThreadPool->limit_worker_max = g_ThreadPool->limit_io_max = threads_count * 100;
- #endif
- g_ThreadPool->counters._.max_working = g_ThreadPool->limit_worker_min;
- }
- static void lazy_initialize()
- {
- il2cpp::utils::CallOnce(lazy_init_status, initialize, NULL);
- }
- static void worker_kill(Il2CppInternalThread* thread)
- {
- if (thread == il2cpp::vm::Thread::CurrentInternal())
- return;
- il2cpp::vm::Thread::Stop(thread);
- }
- static void cleanup (void)
- {
- unsigned int i;
- /* we make the assumption along the code that we are
- * cleaning up only if the runtime is shutting down */
- IL2CPP_ASSERT(il2cpp::vm::Runtime::IsShuttingDown ());
- while (GetMonitorStatus() != MONITOR_STATUS_NOT_RUNNING)
- il2cpp::vm::Thread::Sleep(1);
- std::vector<Il2CppInternalThread*> working_threads;
- g_ThreadPool->active_threads_lock.Acquire();
- working_threads = g_ThreadPool->working_threads;
- g_ThreadPool->active_threads_lock.Release();
- /* stop all threadpool->working_threads */
- for (i = 0; i < working_threads.size(); ++i)
- worker_kill (working_threads[i]);
- /* unpark all g_ThreadPool->parked_threads */
- g_ThreadPool->parked_threads_cond.NotifyAll();
- }
- bool threadpool_ms_enqueue_work_item (Il2CppDomain *domain, Il2CppObject *work_item)
- {
- static Il2CppClass *threadpool_class = NULL;
- static MethodInfo *unsafe_queue_custom_work_item_method = NULL;
- //Il2CppDomain *current_domain;
- bool f;
- void* args [2];
- IL2CPP_ASSERT(work_item);
- if (!threadpool_class)
- threadpool_class = il2cpp::vm::Class::FromName(il2cpp_defaults.corlib, "System.Threading", "ThreadPool");
- if (!unsafe_queue_custom_work_item_method)
- unsafe_queue_custom_work_item_method = (MethodInfo*)il2cpp::vm::Class::GetMethodFromName(threadpool_class, "UnsafeQueueCustomWorkItem", 2);
- IL2CPP_ASSERT(unsafe_queue_custom_work_item_method);
- f = false;
- args [0] = (void*) work_item;
- args [1] = (void*) &f;
- Il2CppObject *result = il2cpp::vm::Runtime::InvokeWithThrow(unsafe_queue_custom_work_item_method, NULL, args);
- return true;
- }
- /* LOCKING: threadpool->domains_lock must be held */
- static ThreadPoolDomain* domain_get(Il2CppDomain *domain, bool create)
- {
- ThreadPoolDomain *tpdomain = NULL;
- unsigned int i;
- IL2CPP_ASSERT(domain);
- for (i = 0; i < g_ThreadPool->domains.size(); ++i) {
- tpdomain = g_ThreadPool->domains[i];
- if (tpdomain->domain == domain)
- return tpdomain;
- }
- if (create) {
- tpdomain = new ThreadPoolDomain();
- tpdomain->domain = domain;
- g_ThreadPool->domains.push_back(tpdomain);
- }
- return tpdomain;
- }
- bool worker_try_unpark()
- {
- bool worker_unparked = true;
- g_ThreadPool->active_threads_lock.AcquireScoped([&worker_unparked] {
- if (g_ThreadPool->parked_threads_count == 0)
- worker_unparked = false;
- else
- g_ThreadPool->parked_threads_cond.Notify(1);
- });
-
- return worker_unparked;
- }
- static bool worker_request (Il2CppDomain *domain)
- {
- ThreadPoolDomain *tpdomain;
- IL2CPP_ASSERT(domain);
- IL2CPP_ASSERT(g_ThreadPool);
- if (il2cpp::vm::Runtime::IsShuttingDown ())
- return false;
- g_ThreadPool->domains_lock.Acquire();
- /* synchronize check with worker_thread */
- //if (mono_domain_is_unloading (domain)) {
- //mono_coop_mutex_unlock (&threadpool->domains_lock);
- /*return false;
- }*/
- tpdomain = domain_get (domain, true);
- IL2CPP_ASSERT(tpdomain);
- tpdomain->outstanding_request ++;
- /*mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, domain = %p, outstanding_request = %d",
- mono_native_thread_id_get (), tpdomain->domain, tpdomain->outstanding_request);*/
- g_ThreadPool->domains_lock.Release();
- if (g_ThreadPool->suspended)
- return false;
- monitor_ensure_running ();
- if (worker_try_unpark ()) {
- //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, unparked", mono_native_thread_id_get ());
- return true;
- }
- if (worker_try_create ()) {
- //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, created", mono_native_thread_id_get ());
- return true;
- }
- //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] request worker, failed", mono_native_thread_id_get ());
- return false;
- }
- static void hill_climbing_change_thread_count (int16_t new_thread_count, ThreadPoolHeuristicStateTransition transition)
- {
- ThreadPoolHillClimbing *hc;
- IL2CPP_ASSERT(g_ThreadPool);
- hc = &g_ThreadPool->heuristic_hill_climbing;
- //mono_trace (G_LOG_LEVEL_INFO, MONO_TRACE_THREADPOOL, "[%p] hill climbing, change max number of threads %d", mono_native_thread_id_get (), new_thread_count);
- hc->last_thread_count = new_thread_count;
- hc->current_sample_interval = il2cpp::vm::Random::Next(&hc->random_interval_generator, hc->sample_interval_low, hc->sample_interval_high);
- hc->elapsed_since_last_change = 0;
- hc->completions_since_last_change = 0;
- }
- void hill_climbing_force_change (int16_t new_thread_count, ThreadPoolHeuristicStateTransition transition)
- {
- ThreadPoolHillClimbing *hc;
- IL2CPP_ASSERT(g_ThreadPool);
- hc = &g_ThreadPool->heuristic_hill_climbing;
- if (new_thread_count != hc->last_thread_count) {
- hc->current_control_setting += new_thread_count - hc->last_thread_count;
- hill_climbing_change_thread_count (new_thread_count, transition);
- }
- }
- static std::complex<double> hill_climbing_get_wave_component (double *samples, unsigned int sample_count, double period)
- {
- ThreadPoolHillClimbing *hc;
- double w, cosine, sine, coeff, q0, q1, q2;
- unsigned int i;
- IL2CPP_ASSERT(g_ThreadPool);
- IL2CPP_ASSERT(sample_count >= period);
- IL2CPP_ASSERT(period >= 2);
- hc = &g_ThreadPool->heuristic_hill_climbing;
- w = 2.0 * M_PI / period;
- cosine = cos (w);
- sine = sin (w);
- coeff = 2.0 * cosine;
- q0 = q1 = q2 = 0;
- for (i = 0; i < sample_count; ++i) {
- q0 = coeff * q1 - q2 + samples [(hc->total_samples - sample_count + i) % hc->samples_to_measure];
- q2 = q1;
- q1 = q0;
- }
- return (std::complex<double> (q1 - q2 * cosine, (q2 * sine)) / ((double)sample_count));
- }
- static int16_t hill_climbing_update (int16_t current_thread_count, uint32_t sample_duration, int32_t completions, int64_t *adjustment_interval)
- {
- ThreadPoolHillClimbing *hc;
- ThreadPoolHeuristicStateTransition transition;
- double throughput;
- double throughput_error_estimate;
- double confidence;
- double move;
- double gain;
- int sample_index;
- int sample_count;
- int new_thread_wave_magnitude;
- int new_thread_count;
- std::complex<double> thread_wave_component;
- std::complex<double> throughput_wave_component;
- std::complex<double> ratio;
- IL2CPP_ASSERT(g_ThreadPool);
- IL2CPP_ASSERT(adjustment_interval);
- hc = &g_ThreadPool->heuristic_hill_climbing;
- /* If someone changed the thread count without telling us, update our records accordingly. */
- if (current_thread_count != hc->last_thread_count)
- hill_climbing_force_change (current_thread_count, TRANSITION_INITIALIZING);
- /* Update the cumulative stats for this thread count */
- hc->elapsed_since_last_change += sample_duration;
- hc->completions_since_last_change += completions;
- /* Add in any data we've already collected about this sample */
- sample_duration += (uint32_t)hc->accumulated_sample_duration;
- completions += hc->accumulated_completion_count;
- /* We need to make sure we're collecting reasonably accurate data. Since we're just counting the end
- * of each work item, we are goinng to be missing some data about what really happened during the
- * sample interval. The count produced by each thread includes an initial work item that may have
- * started well before the start of the interval, and each thread may have been running some new
- * work item for some time before the end of the interval, which did not yet get counted. So
- * our count is going to be off by +/- threadCount workitems.
- *
- * The exception is that the thread that reported to us last time definitely wasn't running any work
- * at that time, and the thread that's reporting now definitely isn't running a work item now. So
- * we really only need to consider threadCount-1 threads.
- *
- * Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
- *
- * We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
- * of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction,
- * then the next one likely will be too. The one after that will include the sum of the completions
- * we missed in the previous samples, and so will be 33% positive. So every three samples we'll have
- * two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency
- * range we're targeting, which will not be filtered by the frequency-domain translation. */
- if (hc->total_samples > 0 && ((current_thread_count - 1.0) / completions) >= hc->max_sample_error) {
- /* Not accurate enough yet. Let's accumulate the data so
- * far, and tell the ThreadPool to collect a little more. */
- hc->accumulated_sample_duration = sample_duration;
- hc->accumulated_completion_count = completions;
- *adjustment_interval = 10;
- return current_thread_count;
- }
- /* We've got enouugh data for our sample; reset our accumulators for next time. */
- hc->accumulated_sample_duration = 0;
- hc->accumulated_completion_count = 0;
- /* Add the current thread count and throughput sample to our history. */
- throughput = ((double) completions) / sample_duration;
- sample_index = hc->total_samples % hc->samples_to_measure;
- hc->samples [sample_index] = throughput;
- hc->thread_counts [sample_index] = current_thread_count;
- hc->total_samples ++;
- /* Set up defaults for our metrics. */
- throughput_error_estimate = 0;
- confidence = 0;
- transition = TRANSITION_WARMUP;
- /* How many samples will we use? It must be at least the three wave periods we're looking for, and it must also
- * be a whole multiple of the primary wave's period; otherwise the frequency we're looking for will fall between
- * two frequency bands in the Fourier analysis, and we won't be able to measure it accurately. */
- sample_count = ((int) std::min (hc->total_samples - 1, (int64_t)hc->samples_to_measure) / hc->wave_period) * hc->wave_period;
- if (sample_count > hc->wave_period) {
- int i;
- double average_throughput;
- double average_thread_count;
- double sample_sum = 0;
- double thread_sum = 0;
- /* Average the throughput and thread count samples, so we can scale the wave magnitudes later. */
- for (i = 0; i < sample_count; ++i) {
- unsigned int j = (hc->total_samples - sample_count + i) % hc->samples_to_measure;
- sample_sum += hc->samples [j];
- thread_sum += hc->thread_counts [j];
- }
- average_throughput = sample_sum / sample_count;
- average_thread_count = thread_sum / sample_count;
- if (average_throughput > 0 && average_thread_count > 0) {
- double noise_for_confidence, adjacent_period_1, adjacent_period_2;
- /* Calculate the periods of the adjacent frequency bands we'll be using to
- * measure noise levels. We want the two adjacent Fourier frequency bands. */
- adjacent_period_1 = sample_count / (((double) sample_count) / ((double) hc->wave_period) + 1);
- adjacent_period_2 = sample_count / (((double) sample_count) / ((double) hc->wave_period) - 1);
- /* Get the the three different frequency components of the throughput (scaled by average
- * throughput). Our "error" estimate (the amount of noise that might be present in the
- * frequency band we're really interested in) is the average of the adjacent bands. */
- throughput_wave_component = hill_climbing_get_wave_component(hc->samples, sample_count, hc->wave_period) / average_throughput;
- //throughput_wave_component = mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, hc->wave_period), average_throughput);
- throughput_error_estimate = std::abs(hill_climbing_get_wave_component(hc->samples, sample_count, adjacent_period_1) / average_throughput);
- //throughput_error_estimate = cabs (mono_double_complex_scalar_div (hill_climbing_get_wave_component (hc->samples, sample_count, adjacent_period_1), average_throughput));
- if (adjacent_period_2 <= sample_count) {
- throughput_error_estimate = std::max (throughput_error_estimate, std::abs (hill_climbing_get_wave_component (
- hc->samples, sample_count, adjacent_period_2) / average_throughput));
- }
- /* Do the same for the thread counts, so we have something to compare to. We don't
- * measure thread count noise, because there is none; these are exact measurements. */
- thread_wave_component = hill_climbing_get_wave_component (hc->thread_counts, sample_count, hc->wave_period) / average_thread_count;
- /* Update our moving average of the throughput noise. We'll use this
- * later as feedback to determine the new size of the thread wave. */
- if (hc->average_throughput_noise == 0) {
- hc->average_throughput_noise = throughput_error_estimate;
- } else {
- hc->average_throughput_noise = (hc->throughput_error_smoothing_factor * throughput_error_estimate)
- + ((1.0 + hc->throughput_error_smoothing_factor) * hc->average_throughput_noise);
- }
- if (std::abs (thread_wave_component) > 0) {
- /* Adjust the throughput wave so it's centered around the target wave,
- * and then calculate the adjusted throughput/thread ratio. */
- ratio = ((throughput_wave_component - (thread_wave_component * hc->target_throughput_ratio)) / thread_wave_component);
- transition = TRANSITION_CLIMBING_MOVE;
- } else {
- //ratio = mono_double_complex_make (0, 0);
- transition = TRANSITION_STABILIZING;
- }
- noise_for_confidence = std::max (hc->average_throughput_noise, throughput_error_estimate);
- if (noise_for_confidence > 0) {
- confidence = std::abs (thread_wave_component) / noise_for_confidence / hc->target_signal_to_noise_ratio;
- } else {
- /* there is no noise! */
- confidence = 1.0;
- }
- }
- }
- /* We use just the real part of the complex ratio we just calculated. If the throughput signal
- * is exactly in phase with the thread signal, this will be the same as taking the magnitude of
- * the complex move and moving that far up. If they're 180 degrees out of phase, we'll move
- * backward (because this indicates that our changes are having the opposite of the intended effect).
- * If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
- * having a negative or positive effect on throughput. */
- move = std::real (ratio);
- move = CLAMP (move, -1.0, 1.0);
- /* Apply our confidence multiplier. */
- move *= CLAMP (confidence, -1.0, 1.0);
- /* Now apply non-linear gain, such that values around zero are attenuated, while higher values
- * are enhanced. This allows us to move quickly if we're far away from the target, but more slowly
- * if we're getting close, giving us rapid ramp-up without wild oscillations around the target. */
- gain = hc->max_change_per_second * sample_duration;
- move = pow (fabs (move), hc->gain_exponent) * (move >= 0.0 ? 1 : -1) * gain;
- move = std::min (move, hc->max_change_per_sample);
- /* If the result was positive, and CPU is > 95%, refuse the move. */
- if (move > 0.0 && g_ThreadPool->cpu_usage > CPU_USAGE_HIGH)
- move = 0.0;
- /* Apply the move to our control setting. */
- hc->current_control_setting += move;
- /* Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of the
- * throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. */
- new_thread_wave_magnitude = (int)(0.5 + (hc->current_control_setting * hc->average_throughput_noise
- * hc->target_signal_to_noise_ratio * hc->thread_magnitude_multiplier * 2.0));
- new_thread_wave_magnitude = CLAMP (new_thread_wave_magnitude, 1, hc->max_thread_wave_magnitude);
- /* Make sure our control setting is within the ThreadPool's limits. */
- hc->current_control_setting = CLAMP (hc->current_control_setting, g_ThreadPool->limit_worker_min, g_ThreadPool->limit_worker_max - new_thread_wave_magnitude);
- /* Calculate the new thread count (control setting + square wave). */
- new_thread_count = (int)(hc->current_control_setting + new_thread_wave_magnitude * ((hc->total_samples / (hc->wave_period / 2)) % 2));
- /* Make sure the new thread count doesn't exceed the ThreadPool's limits. */
- new_thread_count = CLAMP (new_thread_count, g_ThreadPool->limit_worker_min, g_ThreadPool->limit_worker_max);
- if (new_thread_count != current_thread_count)
- hill_climbing_change_thread_count (new_thread_count, transition);
- if (std::real (ratio) < 0.0 && new_thread_count == g_ThreadPool->limit_worker_min)
- *adjustment_interval = (int)(0.5 + hc->current_sample_interval * (10.0 * std::max (-1.0 * std::real (ratio), 1.0)));
- else
- *adjustment_interval = hc->current_sample_interval;
- return new_thread_count;
- }
- static void heuristic_notify_work_completed (void)
- {
- IL2CPP_ASSERT(g_ThreadPool);
- g_ThreadPool->heuristic_completions++;
- g_ThreadPool->heuristic_last_dequeue = il2cpp::os::Time::GetTicksMillisecondsMonotonic();
- }
- static bool heuristic_should_adjust (void)
- {
- IL2CPP_ASSERT(g_ThreadPool);
- if (g_ThreadPool->heuristic_last_dequeue > g_ThreadPool->heuristic_last_adjustment + g_ThreadPool->heuristic_adjustment_interval) {
- ThreadPoolCounter counter;
- counter.as_int64_t = COUNTER_READ();
- if (counter._.working <= counter._.max_working)
- return true;
- }
- return false;
- }
- static void heuristic_adjust (void)
- {
- IL2CPP_ASSERT(g_ThreadPool);
- if (g_ThreadPool->heuristic_lock.TryAcquire()) {
- int32_t completions = g_ThreadPool->heuristic_completions.exchange(0);
- int64_t sample_end = il2cpp::os::Time::GetTicksMillisecondsMonotonic();
- int64_t sample_duration = sample_end - g_ThreadPool->heuristic_sample_start;
- if (sample_duration >= g_ThreadPool->heuristic_adjustment_interval / 2) {
- ThreadPoolCounter counter;
- int16_t new_thread_count;
- counter.as_int64_t = COUNTER_READ ();
- new_thread_count = hill_climbing_update (counter._.max_working, (uint32_t)sample_duration, completions, &g_ThreadPool->heuristic_adjustment_interval);
- COUNTER_ATOMIC (counter, { counter._.max_working = new_thread_count; });
- if (new_thread_count > counter._.max_working)
- worker_request (il2cpp::vm::Domain::GetCurrent());
- g_ThreadPool->heuristic_sample_start = sample_end;
- g_ThreadPool->heuristic_last_adjustment = il2cpp::os::Time::GetTicksMillisecondsMonotonic();
- }
- g_ThreadPool->heuristic_lock.Release();
- }
- }
- void threadpool_ms_cleanup (void)
- {
- #ifndef DISABLE_SOCKETS
- threadpool_ms_io_cleanup ();
- #endif
- if (lazy_init_status.IsSet())
- cleanup();
- }
- Il2CppAsyncResult* threadpool_ms_begin_invoke (Il2CppDomain *domain, Il2CppObject *target, MethodInfo *method, void* *params)
- {
- Il2CppMethodMessage *message;
- Il2CppDelegate *async_callback = NULL;
- Il2CppObject *state = NULL;
- Il2CppAsyncCall* async_call = (Il2CppAsyncCall*)il2cpp::vm::Object::New(il2cpp_defaults.async_call_class);
- lazy_initialize ();
- MethodInfo *invoke = NULL;
- if (il2cpp::vm::Class::HasParent(method->klass, il2cpp_defaults.multicastdelegate_class))
- invoke = (MethodInfo*)il2cpp::vm::Class::GetMethodFromName(method->klass, "Invoke", -1);
- message = mono_method_call_message_new (method, params, invoke, (params != NULL) ? (&async_callback) : NULL, (params != NULL) ? (&state) : NULL);
- IL2CPP_OBJECT_SETREF (async_call, msg, message);
- IL2CPP_OBJECT_SETREF (async_call, state, state);
- if (async_callback)
- {
- IL2CPP_OBJECT_SETREF (async_call, cb_method, const_cast<MethodInfo*>(il2cpp::vm::Runtime::GetDelegateInvoke(il2cpp::vm::Object::GetClass((Il2CppObject*)async_callback))));
- IL2CPP_OBJECT_SETREF (async_call, cb_target, async_callback);
- }
- Il2CppAsyncResult* async_result = (Il2CppAsyncResult*)il2cpp::vm::Object::New(il2cpp_defaults.asyncresult_class);
- IL2CPP_OBJECT_SETREF(async_result, async_delegate, (Il2CppDelegate*)target);
- IL2CPP_OBJECT_SETREF(async_result, object_data, async_call);
- IL2CPP_OBJECT_SETREF(async_result, async_state, async_call->state);
- threadpool_ms_enqueue_work_item (domain, (Il2CppObject*) async_result);
- return async_result;
- }
- Il2CppObject* threadpool_ms_end_invoke (Il2CppAsyncResult *ares, Il2CppArray **out_args, Il2CppObject **exc)
- {
- Il2CppAsyncCall *ac;
- IL2CPP_ASSERT(exc);
- IL2CPP_ASSERT(out_args);
- *exc = NULL;
- *out_args = NULL;
- /* check if already finished */
- il2cpp_monitor_enter((Il2CppObject*) ares);
- if (ares->endinvoke_called)
- {
- il2cpp::vm::Exception::Raise(il2cpp::vm::Exception::GetInvalidOperationException("Cannot call EndInvoke() repeatedly or concurrently on the same AsyncResult!"));
- il2cpp_monitor_exit((Il2CppObject*) ares);
- return NULL;
- }
- ares->endinvoke_called = 1;
- /* wait until we are really finished */
- if (ares->completed)
- {
- il2cpp_monitor_exit((Il2CppObject *) ares);
- }
- else
- {
- if (!ares->handle)
- {
- Il2CppWaitHandle *wait_handle = il2cpp::vm::WaitHandle::NewManualResetEvent(false);
- IL2CPP_OBJECT_SETREF(ares, handle, wait_handle);
- }
- il2cpp::os::Handle* wait_event = il2cpp::vm::WaitHandle::GetPlatformHandle((Il2CppWaitHandle*)ares->handle);
- il2cpp_monitor_exit((Il2CppObject*) ares);
- //MONO_ENTER_GC_SAFE;
- wait_event->Wait();
- //MONO_EXIT_GC_SAFE;
- }
- ac = (Il2CppAsyncCall*) ares->object_data;
- IL2CPP_ASSERT(ac);
- il2cpp::gc::WriteBarrier::GenericStore(exc, ((Il2CppMethodMessage*)ac->msg)->exc);
- *out_args = ac->out_args;
- return ac->res;
- }
- void threadpool_ms_suspend (void)
- {
- if (g_ThreadPool)
- g_ThreadPool->suspended = true;
- }
- void threadpool_ms_resume (void)
- {
- if (g_ThreadPool)
- g_ThreadPool->suspended = false;
- }
- void ves_icall_System_Threading_ThreadPool_GetAvailableThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads)
- {
- ThreadPoolCounter counter;
- if (!worker_threads || !completion_port_threads)
- return;
- lazy_initialize ();
- counter.as_int64_t = COUNTER_READ ();
- *worker_threads = std::max (0, g_ThreadPool->limit_worker_max - counter._.active);
- *completion_port_threads = g_ThreadPool->limit_io_max;
- }
- void ves_icall_System_Threading_ThreadPool_GetMinThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads)
- {
- if (!worker_threads || !completion_port_threads)
- return;
- lazy_initialize ();
- *worker_threads = g_ThreadPool->limit_worker_min;
- *completion_port_threads = g_ThreadPool->limit_io_min;
- }
- void ves_icall_System_Threading_ThreadPool_GetMaxThreadsNative (int32_t *worker_threads, int32_t *completion_port_threads)
- {
- if (!worker_threads || !completion_port_threads)
- return;
- lazy_initialize ();
- *worker_threads = g_ThreadPool->limit_worker_max;
- *completion_port_threads = g_ThreadPool->limit_io_max;
- }
- bool ves_icall_System_Threading_ThreadPool_SetMinThreadsNative (int32_t worker_threads, int32_t completion_port_threads)
- {
- lazy_initialize ();
- if (worker_threads <= 0 || worker_threads > g_ThreadPool->limit_worker_max)
- return false;
- if (completion_port_threads <= 0 || completion_port_threads > g_ThreadPool->limit_io_max)
- return false;
- g_ThreadPool->limit_worker_min = worker_threads;
- g_ThreadPool->limit_io_min = completion_port_threads;
- return true;
- }
- bool ves_icall_System_Threading_ThreadPool_SetMaxThreadsNative (int32_t worker_threads, int32_t completion_port_threads)
- {
- int cpu_count = il2cpp::os::Environment::GetProcessorCount ();
- lazy_initialize ();
- if (worker_threads < g_ThreadPool->limit_worker_min || worker_threads < cpu_count)
- return false;
- if (completion_port_threads < g_ThreadPool->limit_io_min || completion_port_threads < cpu_count)
- return false;
- g_ThreadPool->limit_worker_max = worker_threads;
- g_ThreadPool->limit_io_max = completion_port_threads;
- return true;
- }
- void ves_icall_System_Threading_ThreadPool_InitializeVMTp (bool *enable_worker_tracking)
- {
- if (enable_worker_tracking) {
- // TODO implement some kind of switch to have the possibily to use it
- *enable_worker_tracking = false;
- }
- lazy_initialize ();
- }
- bool ves_icall_System_Threading_ThreadPool_NotifyWorkItemComplete (void)
- {
- ThreadPoolCounter counter;
- if (il2cpp::vm::Runtime::IsShuttingDown ())
- return false;
- heuristic_notify_work_completed ();
- if (heuristic_should_adjust ())
- heuristic_adjust ();
- counter.as_int64_t = COUNTER_READ ();
- return counter._.working <= counter._.max_working;
- }
- void ves_icall_System_Threading_ThreadPool_NotifyWorkItemProgressNative (void)
- {
- heuristic_notify_work_completed ();
- if (heuristic_should_adjust ())
- heuristic_adjust ();
- }
- void ves_icall_System_Threading_ThreadPool_ReportThreadStatus (bool is_working)
- {
- // Mono raises a not implemented exception
- IL2CPP_NOT_IMPLEMENTED_ICALL(ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus);
- IL2CPP_UNREACHABLE;
- }
- bool ves_icall_System_Threading_ThreadPool_RequestWorkerThread (void)
- {
- return worker_request (il2cpp::vm::Domain::GetCurrent());
- }
- bool ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus (Il2CppNativeOverlapped *native_overlapped)
- {
- // Mono raises a not implemented exception
- IL2CPP_NOT_IMPLEMENTED_ICALL(ves_icall_System_Threading_ThreadPool_PostQueuedCompletionStatus);
- IL2CPP_UNREACHABLE;
- }
- bool ves_icall_System_Threading_ThreadPool_BindIOCompletionCallbackNative (void* file_handle)
- {
- /* This copy the behavior of the current Mono implementation */
- return true;
- }
- bool ves_icall_System_Threading_ThreadPool_IsThreadPoolHosted (void)
- {
- return false;
- }
- void ves_icall_System_Threading_ThreadPool_NotifyWorkItemQueued (void)
- {
- // We don't need an implementation here. The Mono code only uses this method to increment a performance counter that we don't have in IL2CPP.
- }
|