Add JobSystem::Dispatch that accepts dependent jobs that needs to be completed before

This commit is contained in:
Wojtek Figat
2024-06-24 13:12:48 +02:00
parent 2773949197
commit 861d8a683f
2 changed files with 119 additions and 24 deletions

View File

@@ -5,6 +5,7 @@
#include "Engine/Platform/CPUInfo.h"
#include "Engine/Platform/Thread.h"
#include "Engine/Platform/ConditionVariable.h"
#include "Engine/Core/Types/Span.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Engine/EngineService.h"
#include "Engine/Profiler/ProfilerCPU.h"
@@ -48,13 +49,26 @@ public:
struct JobData
{
Function<void(int32)> Job;
int32 Index;
int64 JobKey;
};
template<>
struct TIsPODType<JobData>
{
enum { Value = true };
};
struct JobContext
{
volatile int64 JobsLeft;
volatile int64 DependenciesLeft;
Function<void(int32)> Job;
Array<int64> Dependants;
};
template<>
struct TIsPODType<JobContext>
{
enum { Value = false };
};
@@ -79,17 +93,6 @@ public:
}
};
struct JobContext
{
volatile int64 JobsLeft;
};
template<>
struct TIsPODType<JobContext>
{
enum { Value = true };
};
namespace
{
JobSystemService JobSystemInstance;
@@ -158,6 +161,7 @@ int32 JobSystemThread::Run()
Platform::SetThreadAffinityMask(1ull << Index);
JobData data;
Function<void(int32)> job;
bool attachCSharpThread = true;
#if !JOB_SYSTEM_USE_MUTEX
moodycamel::ConsumerToken consumerToken(Jobs);
@@ -174,18 +178,23 @@ int32 JobSystemThread::Run()
{
data = Jobs.PeekFront();
Jobs.PopFront();
const JobContext& context = ((const Dictionary<int64, JobContext>&)JobContexts).At(data.JobKey);
job = context.Job;
}
JobsLocker.Unlock();
#else
if (!Jobs.try_dequeue(consumerToken, data))
data.Job.Unbind();
if (Jobs.try_dequeue(consumerToken, data))
{
const JobContext& context = ((const Dictionary<int64, JobContext>&)JobContexts).At(data.JobKey);
job = context.Job;
}
#endif
#if JOB_SYSTEM_USE_STATS
Platform::InterlockedIncrement(&DequeueCount);
Platform::InterlockedAdd(&DequeueSum, Platform::GetTimeCycles() - start);
#endif
if (data.Job.IsBinded())
if (job.IsBinded())
{
#if USE_CSHARP
// Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System)
@@ -197,7 +206,7 @@ int32 JobSystemThread::Run()
#endif
// Run job
data.Job(data.Index);
job(data.Index);
// Move forward with the job queue
bool notifyWaiting = false;
@@ -205,16 +214,33 @@ int32 JobSystemThread::Run()
JobContext& context = JobContexts.At(data.JobKey);
if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0)
{
ASSERT_LOW_LAYER(context.JobsLeft <= 0);
// Update any dependant jobs
for (int64 dependant : context.Dependants)
{
JobContext& dependantContext = JobContexts.At(dependant);
if (Platform::InterlockedDecrement(&dependantContext.DependenciesLeft) <= 0)
{
// Dispatch dependency when it's ready
JobData dependantData;
dependantData.JobKey = dependant;
for (dependantData.Index = 0; dependantData.Index < dependantContext.JobsLeft; dependantData.Index++)
#if JOB_SYSTEM_USE_MUTEX
Jobs.PushBack(dependantData);
#else
Jobs.enqueue(dependantData);
#endif
}
}
// Remove completed context
JobContexts.Remove(data.JobKey);
notifyWaiting = true;
}
JobsLocker.Unlock();
if (notifyWaiting)
WaitSignal.NotifyAll();
data.Job.Unbind();
job.Unbind();
}
else
{
@@ -250,9 +276,9 @@ void JobSystem::Execute(const Function<void(int32)>& job, int32 jobCount)
int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
{
PROFILE_CPU();
if (jobCount <= 0)
return 0;
PROFILE_CPU();
#if JOB_SYSTEM_ENABLED
#if JOB_SYSTEM_USE_STATS
const auto start = Platform::GetTimeCycles();
@@ -260,21 +286,20 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount;
JobData data;
data.Job = job;
data.JobKey = label;
JobContext context;
context.Job = job;
context.JobsLeft = jobCount;
context.DependenciesLeft = 0;
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
JobContexts.Add(label, context);
#if JOB_SYSTEM_USE_MUTEX
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
JobsLocker.Unlock();
#else
JobsLocker.Lock();
JobContexts.Add(label, context);
JobsLocker.Unlock();
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.enqueue(data);
@@ -300,6 +325,64 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
#endif
}
int64 JobSystem::Dispatch(const Function<void(int32)>& job, Span<int64> dependencies, int32 jobCount)
{
if (jobCount <= 0)
return 0;
PROFILE_CPU();
#if JOB_SYSTEM_ENABLED
const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount;
JobData data;
data.JobKey = label;
JobContext context;
context.Job = job;
context.JobsLeft = jobCount;
context.DependenciesLeft = 0;
JobsLocker.Lock();
for (int64 dependency : dependencies)
{
if (JobContext* dependencyContext = JobContexts.TryGet(dependency))
{
context.DependenciesLeft++;
dependencyContext->Dependants.Add(label);
}
}
JobContexts.Add(label, context);
#if JOB_SYSTEM_USE_MUTEX
if (context.DependenciesLeft == 0)
{
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
}
JobsLocker.Unlock();
#else
JobsLocker.Unlock();
if (dispatchNow)
{
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.enqueue(data);
}
#endif
if (context.DependenciesLeft == 0 && JobStartingOnDispatch)
{
if (jobCount == 1)
JobsSignal.NotifyOne();
else
JobsSignal.NotifyAll();
}
return label;
#else
for (int32 i = 0; i < jobCount; i++)
job(i);
return 0;
#endif
}
void JobSystem::Wait()
{
#if JOB_SYSTEM_ENABLED