ThreadPoolWorkerThread.cpp 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. #include "il2cpp-config.h"
  2. #include "gc/GarbageCollector.h"
  3. #include "mono/ThreadPool/threadpool-ms.h"
  4. #include "mono/ThreadPool/ThreadPoolDataStructures.h"
  5. #include "mono/ThreadPool/ThreadPoolMacros.h"
  6. #include "vm/String.h"
  7. #include "vm/Object.h"
  8. #include "vm/Random.h"
  9. #include "vm/Runtime.h"
  10. #include "vm/Thread.h"
  11. #include "os/Time.h"
  12. #define WORKER_CREATION_MAX_PER_SEC 10
  13. static void remove_working_thread(Il2CppInternalThread *thread)
  14. {
  15. int index = -1;
  16. for (unsigned i = 0; i < g_ThreadPool->working_threads.size(); ++i)
  17. {
  18. if (g_ThreadPool->working_threads[i] == thread)
  19. {
  20. index = i;
  21. break;
  22. }
  23. }
  24. if (index != -1)
  25. g_ThreadPool->working_threads.erase(g_ThreadPool->working_threads.begin() + index);
  26. }
  27. /*
  28. * mono_thread_info_install_interrupt: install an interruption token for the current thread.
  29. *
  30. * - @callback: must be able to be called from another thread and always cancel the wait
  31. * - @data: passed to the callback
  32. * - @interrupted: will be set to TRUE if a token is already installed, FALSE otherwise
  33. * if set to TRUE, it must mean that the thread is in interrupted state
  34. */
  35. static void thread_info_install_interrupt(void(*callback)(void* data), void* data, bool *interrupted)
  36. {
  37. // We can get by without this for the time being. Not needed until we have cooperative threading
  38. }
  39. static void thread_info_uninstall_interrupt(bool *interrupted)
  40. {
  41. // We can get by without this for the time being. Not needed until we have cooperative threading
  42. }
  43. static void worker_wait_interrupt(void* data)
  44. {
  45. g_ThreadPool->active_threads_lock.Acquire();
  46. g_ThreadPool->parked_threads_cond.Notify(1);
  47. g_ThreadPool->active_threads_lock.Release();
  48. }
  49. /* return true if timeout, false otherwise (worker unpark or interrupt) */
  50. static bool worker_park(void)
  51. {
  52. bool timeout = false;
  53. //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker parking", mono_native_thread_id_get ());
  54. il2cpp::gc::GarbageCollector::SetSkipThread(true);
  55. g_ThreadPool->active_threads_lock.Acquire();
  56. if (!il2cpp::vm::Runtime::IsShuttingDown())
  57. {
  58. static void* rand_handle = NULL;
  59. Il2CppInternalThread *thread_internal;
  60. bool interrupted = false;
  61. if (!rand_handle)
  62. rand_handle = il2cpp::vm::Random::Create();
  63. IL2CPP_ASSERT(rand_handle);
  64. thread_internal = il2cpp::vm::Thread::CurrentInternal();
  65. IL2CPP_ASSERT(thread_internal);
  66. g_ThreadPool->parked_threads_count += 1;
  67. remove_working_thread(thread_internal);
  68. thread_info_install_interrupt(worker_wait_interrupt, NULL, &interrupted);
  69. if (interrupted)
  70. goto done;
  71. if (g_ThreadPool->parked_threads_cond.TimedWait(baselib::timeout_ms(il2cpp::vm::Random::Next(&rand_handle, 5 * 1000, 60 * 1000))) == false)
  72. timeout = true;
  73. thread_info_uninstall_interrupt(&interrupted);
  74. done:
  75. g_ThreadPool->working_threads.push_back(thread_internal);
  76. g_ThreadPool->parked_threads_count -= 1;
  77. }
  78. g_ThreadPool->active_threads_lock.Release();
  79. il2cpp::gc::GarbageCollector::SetSkipThread(false);
  80. //mono_trace (G_LOG_LEVEL_DEBUG, MONO_TRACE_THREADPOOL, "[%p] current worker unparking, timeout? %s", mono_native_thread_id_get (), timeout ? "yes" : "no");
  81. return timeout;
  82. }
  83. /* LOCKING: threadpool->domains_lock must be held */
  84. static ThreadPoolDomain* domain_get_next(ThreadPoolDomain *current)
  85. {
  86. ThreadPoolDomain *tpdomain = NULL;
  87. unsigned int len;
  88. len = (unsigned int)g_ThreadPool->domains.size();
  89. if (len > 0)
  90. {
  91. unsigned int i, current_idx = ~0u;
  92. if (current)
  93. {
  94. for (i = 0; i < len; ++i)
  95. {
  96. if (current == g_ThreadPool->domains[i])
  97. {
  98. current_idx = i;
  99. break;
  100. }
  101. }
  102. IL2CPP_ASSERT(current_idx != ~0u);
  103. }
  104. for (i = current_idx + 1; i < len + current_idx + 1; ++i)
  105. {
  106. ThreadPoolDomain *tmp = (ThreadPoolDomain*)g_ThreadPool->domains[i % len];
  107. if (tmp->outstanding_request > 0)
  108. {
  109. tpdomain = tmp;
  110. break;
  111. }
  112. }
  113. }
  114. return tpdomain;
  115. }
  116. struct WorkerThreadStateHolder
  117. {
  118. Il2CppInternalThread *thread;
  119. ThreadPoolDomain* tpdomain;
  120. ThreadPoolDomain* previous_tpdomain;
  121. ThreadPoolCounter counter;
  122. bool retire;
  123. WorkerThreadStateHolder() :
  124. thread(il2cpp::vm::Thread::CurrentInternal()),
  125. tpdomain(NULL),
  126. previous_tpdomain(NULL),
  127. retire(false)
  128. {
  129. IL2CPP_ASSERT(thread);
  130. il2cpp::vm::Thread::SetName(thread, il2cpp::vm::String::New("IL2CPP Threadpool worker"));
  131. g_ThreadPool->active_threads_lock.AcquireScoped([this] {
  132. g_ThreadPool->working_threads.push_back(thread);
  133. });
  134. }
  135. ~WorkerThreadStateHolder()
  136. {
  137. g_ThreadPool->active_threads_lock.AcquireScoped([this] {
  138. remove_working_thread(thread);
  139. });
  140. COUNTER_ATOMIC(counter,
  141. {
  142. counter._.working--;
  143. counter._.active--;
  144. });
  145. }
  146. };
  147. struct WorkerThreadParkStateHolder
  148. {
  149. ThreadPoolCounter& counter;
  150. il2cpp::os::FastAutoUnlock domainUnlock;
  151. WorkerThreadParkStateHolder(WorkerThreadStateHolder& workerThreadState) :
  152. counter(workerThreadState.counter),
  153. domainUnlock(&g_ThreadPool->domains_lock)
  154. {
  155. COUNTER_ATOMIC(counter,
  156. {
  157. counter._.working--;
  158. counter._.parked++;
  159. });
  160. }
  161. ~WorkerThreadParkStateHolder()
  162. {
  163. COUNTER_ATOMIC(counter,
  164. {
  165. counter._.working++;
  166. counter._.parked--;
  167. });
  168. }
  169. };
  170. struct WorkerThreadJobStateHolder
  171. {
  172. ThreadPoolDomain* tpdomain;
  173. WorkerThreadJobStateHolder(const WorkerThreadStateHolder& workerThreadState) :
  174. tpdomain(workerThreadState.tpdomain)
  175. {
  176. tpdomain->outstanding_request--;
  177. IL2CPP_ASSERT(tpdomain->outstanding_request >= 0);
  178. IL2CPP_ASSERT(tpdomain->domain);
  179. IL2CPP_ASSERT(tpdomain->domain->threadpool_jobs >= 0);
  180. tpdomain->domain->threadpool_jobs++;
  181. }
  182. ~WorkerThreadJobStateHolder()
  183. {
  184. tpdomain->domain->threadpool_jobs--;
  185. IL2CPP_ASSERT(tpdomain->domain->threadpool_jobs >= 0);
  186. }
  187. };
  188. static void worker_thread(void* data)
  189. {
  190. IL2CPP_ASSERT(g_ThreadPool);
  191. WorkerThreadStateHolder workerThreadState;
  192. il2cpp::os::FastAutoLock domainsLock(&g_ThreadPool->domains_lock);
  193. while (!il2cpp::vm::Runtime::IsShuttingDown())
  194. {
  195. workerThreadState.previous_tpdomain = workerThreadState.tpdomain;
  196. if (workerThreadState.retire || !(workerThreadState.tpdomain = domain_get_next(workerThreadState.previous_tpdomain)))
  197. {
  198. WorkerThreadParkStateHolder threadParkState(workerThreadState);
  199. if (worker_park())
  200. break;
  201. workerThreadState.retire = false;
  202. continue;
  203. }
  204. WorkerThreadJobStateHolder threadJobState(workerThreadState);
  205. il2cpp::os::FastAutoUnlock domainUnlock(&g_ThreadPool->domains_lock);
  206. Il2CppObject* res = il2cpp::vm::Runtime::InvokeWithThrow(il2cpp_defaults.threadpool_perform_wait_callback_method, NULL, NULL);
  207. if (res && *(bool*)il2cpp::vm::Object::Unbox(res) == false)
  208. workerThreadState.retire = true;
  209. il2cpp::vm::Thread::ClrState(workerThreadState.thread, static_cast<il2cpp::vm::ThreadState>(~il2cpp::vm::kThreadStateBackground));
  210. if (!il2cpp::vm::Thread::TestState(workerThreadState.thread, il2cpp::vm::kThreadStateBackground))
  211. il2cpp::vm::Thread::SetState(workerThreadState.thread, il2cpp::vm::kThreadStateBackground);
  212. }
  213. }
  214. bool worker_try_create()
  215. {
  216. ThreadPoolCounter counter;
  217. Il2CppInternalThread *thread;
  218. int64_t current_ticks;
  219. int32_t now;
  220. il2cpp::os::FastAutoLock lock(&g_ThreadPool->worker_creation_lock);
  221. current_ticks = il2cpp::os::Time::GetTicks100NanosecondsMonotonic();
  222. now = (int32_t)(current_ticks / (10 * 1000 * 1000));
  223. if (current_ticks != 0)
  224. {
  225. if (g_ThreadPool->worker_creation_current_second != now)
  226. {
  227. g_ThreadPool->worker_creation_current_second = now;
  228. g_ThreadPool->worker_creation_current_count = 0;
  229. }
  230. else
  231. {
  232. IL2CPP_ASSERT(g_ThreadPool->worker_creation_current_count <= WORKER_CREATION_MAX_PER_SEC);
  233. if (g_ThreadPool->worker_creation_current_count == WORKER_CREATION_MAX_PER_SEC)
  234. {
  235. // Worker creation failed because maximum number of workers already created in the last second
  236. return false;
  237. }
  238. }
  239. }
  240. COUNTER_ATOMIC(counter,
  241. {
  242. if (counter._.working >= counter._.max_working)
  243. {
  244. // Worked creation failed because maximum number of workers are running
  245. return false;
  246. }
  247. counter._.working++;
  248. counter._.active++;
  249. });
  250. if ((thread = il2cpp::vm::Thread::CreateInternal(worker_thread, NULL, true, 0)) != NULL)
  251. {
  252. g_ThreadPool->worker_creation_current_count += 1;
  253. return true;
  254. }
  255. // Failed creating native thread :(
  256. COUNTER_ATOMIC(counter,
  257. {
  258. counter._.working--;
  259. counter._.active--;
  260. });
  261. return false;
  262. }