Refactor Job System to reduce mutex usage with more atomic operations

#3917
This commit is contained in:
Wojtek Figat
2026-02-06 09:11:12 +01:00
parent 4833c19366
commit c18b9163ca

View File

@@ -8,7 +8,6 @@
#include "Engine/Core/Types/Span.h"
#include "Engine/Core/Types/Pair.h"
#include "Engine/Core/Memory/SimpleHeapAllocation.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Core/Collections/RingBuffer.h"
#include "Engine/Engine/EngineService.h"
#include "Engine/Profiler/ProfilerCPU.h"
@@ -22,14 +21,6 @@
#if JOB_SYSTEM_ENABLED
// Local allocator for job system memory that uses internal pooling and assumes that JobsLocker is taken (write access owned by the calling thread).
class JobSystemAllocation : public SimpleHeapAllocation<JobSystemAllocation>
{
public:
static void* Allocate(uintptr size);
static void Free(void* ptr, uintptr size);
};
class JobSystemService : public EngineService
{
public:
@@ -43,24 +34,25 @@ public:
void Dispose() override;
};
struct JobData
{
int32 Index;
int64 JobKey;
};
template<>
struct TIsPODType<JobData>
{
enum { Value = true };
};
struct JobContext
// Holds a single job dispatch data
struct alignas(int64) JobContext
{
// The next index of the job to process updated when picking a job by the thread.
volatile int64 JobIndex;
// The number of jobs left to process updated after job completion by the thread.
volatile int64 JobsLeft;
int32 DependenciesLeft;
// The unique label of this job used to identify it. Set to -1 when job is done.
volatile int64 JobLabel;
// Utility atomic counter used to indicate that any job is waiting for this one to finish. Then Dependants can be accessed within thread-safe JobsLocker.
volatile int64 DependantsCount;
// The number of dependency jobs left to be finished before starting this job.
volatile int64 DependenciesLeft;
// The total number of jobs to process (in this context).
int32 JobsCount;
// The job function to execute.
Function<void(int32)> Job;
Array<int64, JobSystemAllocation> Dependants;
// List of dependant jobs to signal when this job is done.
Array<int64> Dependants;
};
template<>
@@ -92,50 +84,36 @@ public:
namespace
{
JobSystemService JobSystemInstance;
Array<Pair<void*, uintptr>> MemPool;
Thread* Threads[PLATFORM_THREADS_LIMIT / 2] = {};
int32 ThreadsCount = 0;
bool JobStartingOnDispatch = true;
volatile int64 ExitFlag = 0;
volatile int64 JobLabel = 0;
Dictionary<int64, JobContext, JobSystemAllocation> JobContexts;
volatile int64 JobEndLabel = 0;
volatile int64 JobStartLabel = 0;
volatile int64 JobContextsCount = 0;
uint32 JobContextsSize = 0;
uint32 JobContextsMask = 0;
JobContext* JobContexts = nullptr;
ConditionVariable JobsSignal;
CriticalSection JobsMutex;
ConditionVariable WaitSignal;
CriticalSection WaitMutex;
CriticalSection JobsLocker;
RingBuffer<JobData> Jobs;
}
void* JobSystemAllocation::Allocate(uintptr size)
{
void* result = nullptr;
for (int32 i = 0; i < MemPool.Count(); i++)
{
if (MemPool.Get()[i].Second == size)
{
result = MemPool.Get()[i].First;
MemPool.RemoveAt(i);
break;
}
}
if (!result)
{
PROFILE_MEM(EngineThreading);
result = Platform::Allocate(size, 16);
}
return result;
}
void JobSystemAllocation::Free(void* ptr, uintptr size)
{
PROFILE_MEM(EngineThreading);
MemPool.Add({ ptr, size });
#define GET_CONTEXT_INDEX(label) (uint32)((label) & (int64)JobContextsMask)
}
bool JobSystemService::Init()
{
PROFILE_MEM(EngineThreading);
// Initialize job context storage (fixed-size ring buffer for active jobs tracking)
JobContextsSize = 256;
JobContextsMask = JobContextsSize - 1;
JobContexts = (JobContext*)Platform::Allocate(JobContextsSize * sizeof(JobContext), alignof(JobContext));
Platform::MemoryClear(JobContexts, sizeof(JobContextsSize * sizeof(JobContext)));
// Spawn threads
ThreadsCount = Math::Min<int32>(Platform::GetCPUInfo().LogicalProcessorCount, ARRAY_COUNT(Threads));
for (int32 i = 0; i < ThreadsCount; i++)
{
@@ -146,6 +124,7 @@ bool JobSystemService::Init()
return true;
Threads[i] = thread;
}
return false;
}
@@ -171,35 +150,66 @@ void JobSystemService::Dispose()
}
}
JobContexts.SetCapacity(0);
Jobs.Release();
for (auto& e : MemPool)
Platform::Free(e.First);
MemPool.Clear();
Platform::Free(JobContexts);
JobContexts = nullptr;
}
int32 JobSystemThread::Run()
{
// Pin thread to the physical core
Platform::SetThreadAffinityMask(1ull << Index);
JobData data;
Function<void(int32)> job;
bool attachCSharpThread = true;
MONO_THREAD_INFO_TYPE* monoThreadInfo = nullptr;
while (Platform::AtomicRead(&ExitFlag) == 0)
{
// Try to get a job
JobsLocker.Lock();
if (Jobs.Count() != 0)
int32 jobIndex;
JobContext* jobContext = nullptr;
{
data = Jobs.PeekFront();
Jobs.PopFront();
const JobContext& context = ((const Dictionary<int64, JobContext>&)JobContexts).At(data.JobKey);
job = context.Job;
}
JobsLocker.Unlock();
int64 jobOffset = 0;
RETRY:
int64 jobStartLabel = Platform::AtomicRead(&JobStartLabel) + jobOffset;
int64 jobEndLabel = Platform::AtomicRead(&JobEndLabel);
if (jobStartLabel <= jobEndLabel && jobEndLabel > 0)
{
jobContext = &JobContexts[GET_CONTEXT_INDEX(jobStartLabel)];
if (Platform::AtomicRead(&jobContext->DependenciesLeft) > 0)
{
// This job still waits for dependency so skip it for now and try the next one
jobOffset++;
jobContext = nullptr;
goto RETRY;
}
if (job.IsBinded())
// Move forward with index for a job
jobIndex = (int32)(Platform::InterlockedIncrement(&jobContext->JobIndex) - 1);
if (jobIndex < jobContext->JobsCount)
{
// Index is valid
}
else if (jobStartLabel < jobEndLabel && jobOffset == 0)
{
// No more jobs inside this context, move to the next one
Platform::InterlockedCompareExchange(&JobStartLabel, jobStartLabel + 1, jobStartLabel);
jobContext = nullptr;
goto RETRY;
}
else
{
// No more jobs
jobContext = nullptr;
if (jobStartLabel < jobEndLabel)
{
// Try with a different one before going to sleep
jobOffset++;
goto RETRY;
}
}
}
}
if (jobContext)
{
#if USE_CSHARP
// Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System)
@@ -212,37 +222,39 @@ int32 JobSystemThread::Run()
#endif
// Run job
job(data.Index);
jobContext->Job(jobIndex);
// Move forward with the job queue
bool notifyWaiting = false;
JobsLocker.Lock();
JobContext& context = JobContexts.At(data.JobKey);
if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0)
if (Platform::InterlockedDecrement(&jobContext->JobsLeft) <= 0)
{
// Update any dependant jobs
for (int64 dependant : context.Dependants)
// Mark job as done before processing dependants
Platform::AtomicStore(&jobContext->JobLabel, -1);
// Check if any other job waits on this one
if (Platform::AtomicRead(&jobContext->DependantsCount) != 0)
{
JobContext& dependantContext = JobContexts.At(dependant);
if (--dependantContext.DependenciesLeft <= 0)
// Update dependant jobs
JobsLocker.Lock();
for (int64 dependant : jobContext->Dependants)
{
// Dispatch dependency when it's ready
JobData dependantData;
dependantData.JobKey = dependant;
for (dependantData.Index = 0; dependantData.Index < dependantContext.JobsLeft; dependantData.Index++)
Jobs.PushBack(dependantData);
JobContext& dependantContext = JobContexts[GET_CONTEXT_INDEX(dependant)];
if (dependantContext.JobLabel == dependant)
Platform::InterlockedDecrement(&dependantContext.DependenciesLeft);
}
JobsLocker.Unlock();
}
// Remove completed context
JobContexts.Remove(data.JobKey);
notifyWaiting = true;
}
JobsLocker.Unlock();
if (notifyWaiting)
WaitSignal.NotifyAll();
// Cleanup completed context
jobContext->Job.Unbind();
jobContext->Dependants.Clear();
Platform::AtomicStore(&jobContext->DependantsCount, 0);
Platform::AtomicStore(&jobContext->DependenciesLeft, -999); // Mark to indicate deleted context
Platform::AtomicStore(&jobContext->JobLabel, -1);
Platform::InterlockedDecrement(&JobContextsCount);
job.Unbind();
// Wakeup any thread waiting for the jobs to complete
WaitSignal.NotifyAll();
}
}
else
{
@@ -266,8 +278,8 @@ void JobSystem::Execute(const Function<void(int32)>& job, int32 jobCount)
if (jobCount > 1)
{
// Async
const int64 jobWaitHandle = Dispatch(job, jobCount);
Wait(jobWaitHandle);
const int64 label = Dispatch(job, jobCount);
Wait(label);
}
else
#endif
@@ -284,21 +296,32 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
return 0;
PROFILE_CPU();
#if JOB_SYSTEM_ENABLED
const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount;
while (Platform::InterlockedIncrement(&JobContextsCount) >= JobContextsSize)
{
// Too many jobs in flight, wait for some to complete to free up contexts
PROFILE_CPU_NAMED("JOB SYSTEM OVERFLOW");
ZoneColor(TracyWaitZoneColor);
Platform::InterlockedDecrement(&JobContextsCount);
Platform::Sleep(1);
}
JobData data;
data.JobKey = label;
// Get a new label
const int64 label = Platform::InterlockedIncrement(&JobLabel);
JobContext context;
// Build job
JobContext& context = JobContexts[GET_CONTEXT_INDEX(label)];
context.Job = job;
context.JobIndex = 0;
context.JobsLeft = jobCount;
context.JobLabel = label;
context.DependantsCount = 0;
context.DependenciesLeft = 0;
context.JobsCount = jobCount;
ASSERT(context.Dependants.IsEmpty());
context.Dependants.Clear();
JobsLocker.Lock();
JobContexts.Add(label, MoveTemp(context));
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
JobsLocker.Unlock();
// Move the job queue forward
Platform::InterlockedIncrement(&JobEndLabel);
if (JobStartingOnDispatch)
{
@@ -321,34 +344,48 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, Span<int64> dependen
if (jobCount <= 0)
return 0;
PROFILE_CPU();
PROFILE_MEM(EngineThreading);
#if JOB_SYSTEM_ENABLED
const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount;
while (Platform::InterlockedIncrement(&JobContextsCount) >= JobContextsSize)
{
// Too many jobs in flight, wait for some to complete to free up contexts
PROFILE_CPU_NAMED("JOB SYSTEM OVERFLOW");
ZoneColor(TracyWaitZoneColor);
Platform::InterlockedDecrement(&JobContextsCount);
Platform::Sleep(1);
}
JobData data;
data.JobKey = label;
// Get a new label
const int64 label = Platform::InterlockedIncrement(&JobLabel);
JobContext context;
// Build job
JobContext& context = JobContexts[GET_CONTEXT_INDEX(label)];
context.Job = job;
context.JobIndex = 0;
context.JobsLeft = jobCount;
context.JobLabel = label;
context.DependantsCount = 0;
context.DependenciesLeft = 0;
JobsLocker.Lock();
for (int64 dependency : dependencies)
context.JobsCount = jobCount;
ASSERT(context.Dependants.IsEmpty());
context.Dependants.Clear();
{
if (JobContext* dependencyContext = JobContexts.TryGet(dependency))
JobsLocker.Lock();
for (int64 dependency : dependencies)
{
context.DependenciesLeft++;
dependencyContext->Dependants.Add(label);
JobContext& dependencyContext = JobContexts[GET_CONTEXT_INDEX(dependency)];
if (Platform::AtomicRead(&dependencyContext.JobLabel) == dependency)
{
Platform::InterlockedIncrement(&dependencyContext.DependantsCount);
dependencyContext.Dependants.Add(label);
context.DependenciesLeft++;
}
}
JobsLocker.Unlock();
}
JobContexts.Add(label, MoveTemp(context));
if (context.DependenciesLeft == 0)
{
// No dependencies left to complete so dispatch now
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
}
JobsLocker.Unlock();
// Move the job queue forward
Platform::InterlockedIncrement(&JobEndLabel);
if (context.DependenciesLeft == 0 && JobStartingOnDispatch)
{
@@ -369,19 +406,17 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, Span<int64> dependen
void JobSystem::Wait()
{
#if JOB_SYSTEM_ENABLED
JobsLocker.Lock();
int32 numJobs = JobContexts.Count();
JobsLocker.Unlock();
PROFILE_CPU();
ZoneColor(TracyWaitZoneColor);
int64 numJobs = Platform::AtomicRead(&JobContextsCount);
while (numJobs > 0)
{
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
JobsLocker.Lock();
numJobs = JobContexts.Count();
JobsLocker.Unlock();
numJobs = Platform::AtomicRead(&JobContextsCount);
}
#endif
}
@@ -394,12 +429,11 @@ void JobSystem::Wait(int64 label)
while (Platform::AtomicRead(&ExitFlag) == 0)
{
JobsLocker.Lock();
const JobContext* context = JobContexts.TryGet(label);
JobsLocker.Unlock();
const JobContext& context = JobContexts[GET_CONTEXT_INDEX(label)];
const bool finished = Platform::AtomicRead(&context.JobLabel) != label || Platform::AtomicRead(&context.JobsLeft) <= 0;
// Skip if context has been already executed (last job removes it)
if (!context)
if (finished)
break;
// Wait on signal until input label is not yet done
@@ -417,15 +451,10 @@ void JobSystem::SetJobStartingOnDispatch(bool value)
{
#if JOB_SYSTEM_ENABLED
JobStartingOnDispatch = value;
if (value)
if (value && (Platform::AtomicRead(&JobEndLabel) - Platform::AtomicRead(&JobStartLabel)) > 0)
{
JobsLocker.Lock();
const int32 count = Jobs.Count();
JobsLocker.Unlock();
if (count == 1)
JobsSignal.NotifyOne();
else if (count != 0)
JobsSignal.NotifyAll();
// Wake up threads to start processing jobs that may be already in the queue
JobsSignal.NotifyAll();
}
#endif
}