diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index 692a088b7..cbdf53136 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -8,7 +8,6 @@ #include "Engine/Core/Types/Span.h" #include "Engine/Core/Types/Pair.h" #include "Engine/Core/Memory/SimpleHeapAllocation.h" -#include "Engine/Core/Collections/Dictionary.h" #include "Engine/Core/Collections/RingBuffer.h" #include "Engine/Engine/EngineService.h" #include "Engine/Profiler/ProfilerCPU.h" @@ -22,14 +21,6 @@ #if JOB_SYSTEM_ENABLED -// Local allocator for job system memory that uses internal pooling and assumes that JobsLocker is taken (write access owned by the calling thread). -class JobSystemAllocation : public SimpleHeapAllocation -{ -public: - static void* Allocate(uintptr size); - static void Free(void* ptr, uintptr size); -}; - class JobSystemService : public EngineService { public: @@ -43,24 +34,25 @@ public: void Dispose() override; }; -struct JobData -{ - int32 Index; - int64 JobKey; -}; - -template<> -struct TIsPODType -{ - enum { Value = true }; -}; - -struct JobContext +// Holds a single job dispatch data +struct alignas(int64) JobContext { + // The next index of the job to process updated when picking a job by the thread. + volatile int64 JobIndex; + // The number of jobs left to process updated after job completion by the thread. volatile int64 JobsLeft; - int32 DependenciesLeft; + // The unique label of this job used to identify it. Set to -1 when job is done. + volatile int64 JobLabel; + // Utility atomic counter used to indicate that any job is waiting for this one to finish. Then Dependants can be accessed within thread-safe JobsLocker. + volatile int64 DependantsCount; + // The number of dependency jobs left to be finished before starting this job. + volatile int64 DependenciesLeft; + // The total number of jobs to process (in this context). + int32 JobsCount; + // The job function to execute. Function Job; - Array Dependants; + // List of dependant jobs to signal when this job is done. + Array Dependants; }; template<> @@ -92,50 +84,36 @@ public: namespace { JobSystemService JobSystemInstance; - Array> MemPool; Thread* Threads[PLATFORM_THREADS_LIMIT / 2] = {}; int32 ThreadsCount = 0; bool JobStartingOnDispatch = true; volatile int64 ExitFlag = 0; volatile int64 JobLabel = 0; - Dictionary JobContexts; + volatile int64 JobEndLabel = 0; + volatile int64 JobStartLabel = 0; + volatile int64 JobContextsCount = 0; + uint32 JobContextsSize = 0; + uint32 JobContextsMask = 0; + JobContext* JobContexts = nullptr; ConditionVariable JobsSignal; CriticalSection JobsMutex; ConditionVariable WaitSignal; CriticalSection WaitMutex; CriticalSection JobsLocker; - RingBuffer Jobs; -} - -void* JobSystemAllocation::Allocate(uintptr size) -{ - void* result = nullptr; - for (int32 i = 0; i < MemPool.Count(); i++) - { - if (MemPool.Get()[i].Second == size) - { - result = MemPool.Get()[i].First; - MemPool.RemoveAt(i); - break; - } - } - if (!result) - { - PROFILE_MEM(EngineThreading); - result = Platform::Allocate(size, 16); - } - return result; -} - -void JobSystemAllocation::Free(void* ptr, uintptr size) -{ - PROFILE_MEM(EngineThreading); - MemPool.Add({ ptr, size }); +#define GET_CONTEXT_INDEX(label) (uint32)((label) & (int64)JobContextsMask) } bool JobSystemService::Init() { PROFILE_MEM(EngineThreading); + + // Initialize job context storage (fixed-size ring buffer for active jobs tracking) + JobContextsSize = 256; + JobContextsMask = JobContextsSize - 1; + JobContexts = (JobContext*)Platform::Allocate(JobContextsSize * sizeof(JobContext), alignof(JobContext)); + Platform::MemoryClear(JobContexts, sizeof(JobContextsSize * sizeof(JobContext))); + + // Spawn threads ThreadsCount = Math::Min(Platform::GetCPUInfo().LogicalProcessorCount, ARRAY_COUNT(Threads)); for (int32 i = 0; i < ThreadsCount; i++) { @@ -146,6 +124,7 @@ bool JobSystemService::Init() return true; Threads[i] = thread; } + return false; } @@ -171,35 +150,66 @@ void JobSystemService::Dispose() } } - JobContexts.SetCapacity(0); - Jobs.Release(); - for (auto& e : MemPool) - Platform::Free(e.First); - MemPool.Clear(); + Platform::Free(JobContexts); + JobContexts = nullptr; } int32 JobSystemThread::Run() { + // Pin thread to the physical core Platform::SetThreadAffinityMask(1ull << Index); - JobData data; - Function job; bool attachCSharpThread = true; MONO_THREAD_INFO_TYPE* monoThreadInfo = nullptr; while (Platform::AtomicRead(&ExitFlag) == 0) { // Try to get a job - JobsLocker.Lock(); - if (Jobs.Count() != 0) + int32 jobIndex; + JobContext* jobContext = nullptr; { - data = Jobs.PeekFront(); - Jobs.PopFront(); - const JobContext& context = ((const Dictionary&)JobContexts).At(data.JobKey); - job = context.Job; - } - JobsLocker.Unlock(); + int64 jobOffset = 0; + RETRY: + int64 jobStartLabel = Platform::AtomicRead(&JobStartLabel) + jobOffset; + int64 jobEndLabel = Platform::AtomicRead(&JobEndLabel); + if (jobStartLabel <= jobEndLabel && jobEndLabel > 0) + { + jobContext = &JobContexts[GET_CONTEXT_INDEX(jobStartLabel)]; + if (Platform::AtomicRead(&jobContext->DependenciesLeft) > 0) + { + // This job still waits for dependency so skip it for now and try the next one + jobOffset++; + jobContext = nullptr; + goto RETRY; + } - if (job.IsBinded()) + // Move forward with index for a job + jobIndex = (int32)(Platform::InterlockedIncrement(&jobContext->JobIndex) - 1); + if (jobIndex < jobContext->JobsCount) + { + // Index is valid + } + else if (jobStartLabel < jobEndLabel && jobOffset == 0) + { + // No more jobs inside this context, move to the next one + Platform::InterlockedCompareExchange(&JobStartLabel, jobStartLabel + 1, jobStartLabel); + jobContext = nullptr; + goto RETRY; + } + else + { + // No more jobs + jobContext = nullptr; + if (jobStartLabel < jobEndLabel) + { + // Try with a different one before going to sleep + jobOffset++; + goto RETRY; + } + } + } + } + + if (jobContext) { #if USE_CSHARP // Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System) @@ -212,37 +222,39 @@ int32 JobSystemThread::Run() #endif // Run job - job(data.Index); + jobContext->Job(jobIndex); // Move forward with the job queue - bool notifyWaiting = false; - JobsLocker.Lock(); - JobContext& context = JobContexts.At(data.JobKey); - if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0) + if (Platform::InterlockedDecrement(&jobContext->JobsLeft) <= 0) { - // Update any dependant jobs - for (int64 dependant : context.Dependants) + // Mark job as done before processing dependants + Platform::AtomicStore(&jobContext->JobLabel, -1); + + // Check if any other job waits on this one + if (Platform::AtomicRead(&jobContext->DependantsCount) != 0) { - JobContext& dependantContext = JobContexts.At(dependant); - if (--dependantContext.DependenciesLeft <= 0) + // Update dependant jobs + JobsLocker.Lock(); + for (int64 dependant : jobContext->Dependants) { - // Dispatch dependency when it's ready - JobData dependantData; - dependantData.JobKey = dependant; - for (dependantData.Index = 0; dependantData.Index < dependantContext.JobsLeft; dependantData.Index++) - Jobs.PushBack(dependantData); + JobContext& dependantContext = JobContexts[GET_CONTEXT_INDEX(dependant)]; + if (dependantContext.JobLabel == dependant) + Platform::InterlockedDecrement(&dependantContext.DependenciesLeft); } + JobsLocker.Unlock(); } - // Remove completed context - JobContexts.Remove(data.JobKey); - notifyWaiting = true; - } - JobsLocker.Unlock(); - if (notifyWaiting) - WaitSignal.NotifyAll(); + // Cleanup completed context + jobContext->Job.Unbind(); + jobContext->Dependants.Clear(); + Platform::AtomicStore(&jobContext->DependantsCount, 0); + Platform::AtomicStore(&jobContext->DependenciesLeft, -999); // Mark to indicate deleted context + Platform::AtomicStore(&jobContext->JobLabel, -1); + Platform::InterlockedDecrement(&JobContextsCount); - job.Unbind(); + // Wakeup any thread waiting for the jobs to complete + WaitSignal.NotifyAll(); + } } else { @@ -266,8 +278,8 @@ void JobSystem::Execute(const Function& job, int32 jobCount) if (jobCount > 1) { // Async - const int64 jobWaitHandle = Dispatch(job, jobCount); - Wait(jobWaitHandle); + const int64 label = Dispatch(job, jobCount); + Wait(label); } else #endif @@ -284,21 +296,32 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) return 0; PROFILE_CPU(); #if JOB_SYSTEM_ENABLED - const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount; + while (Platform::InterlockedIncrement(&JobContextsCount) >= JobContextsSize) + { + // Too many jobs in flight, wait for some to complete to free up contexts + PROFILE_CPU_NAMED("JOB SYSTEM OVERFLOW"); + ZoneColor(TracyWaitZoneColor); + Platform::InterlockedDecrement(&JobContextsCount); + Platform::Sleep(1); + } - JobData data; - data.JobKey = label; + // Get a new label + const int64 label = Platform::InterlockedIncrement(&JobLabel); - JobContext context; + // Build job + JobContext& context = JobContexts[GET_CONTEXT_INDEX(label)]; context.Job = job; + context.JobIndex = 0; context.JobsLeft = jobCount; + context.JobLabel = label; + context.DependantsCount = 0; context.DependenciesLeft = 0; + context.JobsCount = jobCount; + ASSERT(context.Dependants.IsEmpty()); + context.Dependants.Clear(); - JobsLocker.Lock(); - JobContexts.Add(label, MoveTemp(context)); - for (data.Index = 0; data.Index < jobCount; data.Index++) - Jobs.PushBack(data); - JobsLocker.Unlock(); + // Move the job queue forward + Platform::InterlockedIncrement(&JobEndLabel); if (JobStartingOnDispatch) { @@ -321,34 +344,48 @@ int64 JobSystem::Dispatch(const Function& job, Span dependen if (jobCount <= 0) return 0; PROFILE_CPU(); + PROFILE_MEM(EngineThreading); #if JOB_SYSTEM_ENABLED - const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount; + while (Platform::InterlockedIncrement(&JobContextsCount) >= JobContextsSize) + { + // Too many jobs in flight, wait for some to complete to free up contexts + PROFILE_CPU_NAMED("JOB SYSTEM OVERFLOW"); + ZoneColor(TracyWaitZoneColor); + Platform::InterlockedDecrement(&JobContextsCount); + Platform::Sleep(1); + } - JobData data; - data.JobKey = label; + // Get a new label + const int64 label = Platform::InterlockedIncrement(&JobLabel); - JobContext context; + // Build job + JobContext& context = JobContexts[GET_CONTEXT_INDEX(label)]; context.Job = job; + context.JobIndex = 0; context.JobsLeft = jobCount; + context.JobLabel = label; + context.DependantsCount = 0; context.DependenciesLeft = 0; - - JobsLocker.Lock(); - for (int64 dependency : dependencies) + context.JobsCount = jobCount; + ASSERT(context.Dependants.IsEmpty()); + context.Dependants.Clear(); { - if (JobContext* dependencyContext = JobContexts.TryGet(dependency)) + JobsLocker.Lock(); + for (int64 dependency : dependencies) { - context.DependenciesLeft++; - dependencyContext->Dependants.Add(label); + JobContext& dependencyContext = JobContexts[GET_CONTEXT_INDEX(dependency)]; + if (Platform::AtomicRead(&dependencyContext.JobLabel) == dependency) + { + Platform::InterlockedIncrement(&dependencyContext.DependantsCount); + dependencyContext.Dependants.Add(label); + context.DependenciesLeft++; + } } + JobsLocker.Unlock(); } - JobContexts.Add(label, MoveTemp(context)); - if (context.DependenciesLeft == 0) - { - // No dependencies left to complete so dispatch now - for (data.Index = 0; data.Index < jobCount; data.Index++) - Jobs.PushBack(data); - } - JobsLocker.Unlock(); + + // Move the job queue forward + Platform::InterlockedIncrement(&JobEndLabel); if (context.DependenciesLeft == 0 && JobStartingOnDispatch) { @@ -369,19 +406,17 @@ int64 JobSystem::Dispatch(const Function& job, Span dependen void JobSystem::Wait() { #if JOB_SYSTEM_ENABLED - JobsLocker.Lock(); - int32 numJobs = JobContexts.Count(); - JobsLocker.Unlock(); + PROFILE_CPU(); + ZoneColor(TracyWaitZoneColor); + int64 numJobs = Platform::AtomicRead(&JobContextsCount); while (numJobs > 0) { WaitMutex.Lock(); WaitSignal.Wait(WaitMutex, 1); WaitMutex.Unlock(); - JobsLocker.Lock(); - numJobs = JobContexts.Count(); - JobsLocker.Unlock(); + numJobs = Platform::AtomicRead(&JobContextsCount); } #endif } @@ -394,12 +429,11 @@ void JobSystem::Wait(int64 label) while (Platform::AtomicRead(&ExitFlag) == 0) { - JobsLocker.Lock(); - const JobContext* context = JobContexts.TryGet(label); - JobsLocker.Unlock(); + const JobContext& context = JobContexts[GET_CONTEXT_INDEX(label)]; + const bool finished = Platform::AtomicRead(&context.JobLabel) != label || Platform::AtomicRead(&context.JobsLeft) <= 0; // Skip if context has been already executed (last job removes it) - if (!context) + if (finished) break; // Wait on signal until input label is not yet done @@ -417,15 +451,10 @@ void JobSystem::SetJobStartingOnDispatch(bool value) { #if JOB_SYSTEM_ENABLED JobStartingOnDispatch = value; - if (value) + if (value && (Platform::AtomicRead(&JobEndLabel) - Platform::AtomicRead(&JobStartLabel)) > 0) { - JobsLocker.Lock(); - const int32 count = Jobs.Count(); - JobsLocker.Unlock(); - if (count == 1) - JobsSignal.NotifyOne(); - else if (count != 0) - JobsSignal.NotifyAll(); + // Wake up threads to start processing jobs that may be already in the queue + JobsSignal.NotifyAll(); } #endif }