// Copyright (c) 2012-2023 Wojciech Figat. All rights reserved. #include "JobSystem.h" #include "IRunnable.h" #include "Engine/Platform/CPUInfo.h" #include "Engine/Platform/Thread.h" #include "Engine/Platform/ConditionVariable.h" #include "Engine/Core/Collections/Dictionary.h" #include "Engine/Engine/EngineService.h" #include "Engine/Profiler/ProfilerCPU.h" #if USE_CSHARP #include "Engine/Scripting/ManagedCLR/MCore.h" #endif // Jobs storage perf info: // (500 jobs, i7 9th gen) // JOB_SYSTEM_USE_MUTEX=1, enqueue=130-280 cycles, dequeue=2-6 cycles // JOB_SYSTEM_USE_MUTEX=0, enqueue=300-700 cycles, dequeue=10-16 cycles // So using RingBuffer+Mutex+Signals is better than moodycamel::ConcurrentQueue #define JOB_SYSTEM_ENABLED 1 #define JOB_SYSTEM_USE_MUTEX 1 #define JOB_SYSTEM_USE_STATS 0 #if JOB_SYSTEM_USE_STATS #include "Engine/Core/Log.h" #endif #if JOB_SYSTEM_USE_MUTEX #include "Engine/Core/Collections/RingBuffer.h" #else #include "ConcurrentQueue.h" #endif #if JOB_SYSTEM_ENABLED class JobSystemService : public EngineService { public: JobSystemService() : EngineService(TEXT("JobSystem"), -800) { } bool Init() override; void BeforeExit() override; void Dispose() override; }; struct JobData { Function Job; int32 Index; int64 JobKey; }; template<> struct TIsPODType { enum { Value = false }; }; class JobSystemThread : public IRunnable { public: uint64 Index; public: // [IRunnable] String ToString() const override { return TEXT("JobSystemThread"); } int32 Run() override; void AfterWork(bool wasKilled) override { Delete(this); } }; struct JobContext { volatile int64 JobsLeft; }; template<> struct TIsPODType { enum { Value = true }; }; namespace { JobSystemService JobSystemInstance; Thread* Threads[PLATFORM_THREADS_LIMIT] = {}; int32 ThreadsCount = 0; bool JobStartingOnDispatch = true; volatile int64 ExitFlag = 0; volatile int64 JobLabel = 0; Dictionary JobContexts; ConditionVariable JobsSignal; CriticalSection JobsMutex; ConditionVariable WaitSignal; CriticalSection WaitMutex; CriticalSection JobsLocker; #if JOB_SYSTEM_USE_MUTEX RingBuffer Jobs; #else ConcurrentQueue Jobs; #endif #if JOB_SYSTEM_USE_STATS int64 DequeueCount = 0; int64 DequeueSum = 0; #endif } bool JobSystemService::Init() { ThreadsCount = Math::Min(Platform::GetCPUInfo().LogicalProcessorCount, ARRAY_COUNT(Threads)); for (int32 i = 0; i < ThreadsCount; i++) { auto runnable = New(); runnable->Index = (uint64)i; auto thread = Thread::Create(runnable, String::Format(TEXT("Job System {0}"), i), ThreadPriority::AboveNormal); if (thread == nullptr) return true; Threads[i] = thread; } return false; } void JobSystemService::BeforeExit() { Platform::AtomicStore(&ExitFlag, 1); JobsSignal.NotifyAll(); } void JobSystemService::Dispose() { Platform::AtomicStore(&ExitFlag, 1); JobsSignal.NotifyAll(); Platform::Sleep(1); for (int32 i = 0; i < ThreadsCount; i++) { if (Threads[i]) { Threads[i]->Kill(true); Delete(Threads[i]); Threads[i] = nullptr; } } } int32 JobSystemThread::Run() { Platform::SetThreadAffinityMask(1ull << Index); JobData data; bool attachCSharpThread = true; #if !JOB_SYSTEM_USE_MUTEX moodycamel::ConsumerToken consumerToken(Jobs); #endif while (Platform::AtomicRead(&ExitFlag) == 0) { // Try to get a job #if JOB_SYSTEM_USE_STATS const auto start = Platform::GetTimeCycles(); #endif #if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); if (Jobs.Count() != 0) { data = Jobs.PeekFront(); Jobs.PopFront(); } JobsLocker.Unlock(); #else if (!Jobs.try_dequeue(consumerToken, data)) data.Job.Unbind(); #endif #if JOB_SYSTEM_USE_STATS Platform::InterlockedIncrement(&DequeueCount); Platform::InterlockedAdd(&DequeueSum, Platform::GetTimeCycles() - start); #endif if (data.Job.IsBinded()) { #if USE_CSHARP // Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System) if (attachCSharpThread) { MCore::Thread::Attach(); attachCSharpThread = false; } #endif // Run job data.Job(data.Index); // Move forward with the job queue JobsLocker.Lock(); JobContext& context = JobContexts.At(data.JobKey); if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0) { ASSERT_LOW_LAYER(context.JobsLeft <= 0); JobContexts.Remove(data.JobKey); } JobsLocker.Unlock(); WaitSignal.NotifyAll(); data.Job.Unbind(); } else { // Wait for signal JobsMutex.Lock(); JobsSignal.Wait(JobsMutex); JobsMutex.Unlock(); } } return 0; } #endif void JobSystem::Execute(const Function& job, int32 jobCount) { #if JOB_SYSTEM_ENABLED // TODO: disable async if called on job thread? or maybe Wait should handle waiting in job thread to do the processing? if (jobCount > 1) { // Async const int64 jobWaitHandle = Dispatch(job, jobCount); Wait(jobWaitHandle); } else #endif { // Sync for (int32 i = 0; i < jobCount; i++) job(i); } } int64 JobSystem::Dispatch(const Function& job, int32 jobCount) { PROFILE_CPU(); if (jobCount <= 0) return 0; #if JOB_SYSTEM_ENABLED #if JOB_SYSTEM_USE_STATS const auto start = Platform::GetTimeCycles(); #endif const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount; JobData data; data.Job = job; data.JobKey = label; JobContext context; context.JobsLeft = jobCount; #if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); JobContexts.Add(label, context); for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.PushBack(data); JobsLocker.Unlock(); #else JobsLocker.Lock(); JobContexts.Add(label, context); JobsLocker.Unlock(); for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.enqueue(data); #endif #if JOB_SYSTEM_USE_STATS LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start)); #endif if (JobStartingOnDispatch) { if (jobCount == 1) JobsSignal.NotifyOne(); else JobsSignal.NotifyAll(); } return label; #else for (int32 i = 0; i < jobCount; i++) job(i); return 0; #endif } void JobSystem::Wait() { #if JOB_SYSTEM_ENABLED JobsLocker.Lock(); int32 numJobs = JobContexts.Count(); JobsLocker.Unlock(); while (numJobs > 0) { WaitMutex.Lock(); WaitSignal.Wait(WaitMutex, 1); WaitMutex.Unlock(); JobsLocker.Lock(); numJobs = JobContexts.Count(); JobsLocker.Unlock(); } #endif } void JobSystem::Wait(int64 label) { #if JOB_SYSTEM_ENABLED PROFILE_CPU(); while (Platform::AtomicRead(&ExitFlag) == 0) { JobsLocker.Lock(); const JobContext* context = JobContexts.TryGet(label); JobsLocker.Unlock(); // Skip if context has been already executed (last job removes it) if (!context) break; // Wait on signal until input label is not yet done WaitMutex.Lock(); WaitSignal.Wait(WaitMutex, 1); WaitMutex.Unlock(); // Wake up any thread to prevent stalling in highly multi-threaded environment JobsSignal.NotifyOne(); } #if JOB_SYSTEM_USE_STATS LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount); DequeueSum = DequeueCount = 0; #endif #endif } void JobSystem::SetJobStartingOnDispatch(bool value) { #if JOB_SYSTEM_ENABLED JobStartingOnDispatch = value; if (value) { #if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); const int32 count = Jobs.Count(); JobsLocker.Unlock(); #else const int32 count = Jobs.Count(); #endif if (count == 1) JobsSignal.NotifyOne(); else if (count != 0) JobsSignal.NotifyAll(); } #endif } int32 JobSystem::GetThreadsCount() { #if JOB_SYSTEM_ENABLED return ThreadsCount; #else return 0; #endif }