Support dispatching jobs within jobs in JobSystem

This commit is contained in:
2022-06-05 11:23:11 +03:00
parent 778e4e0ea7
commit dc2a9c0702

View File

@@ -53,6 +53,7 @@ struct JobData
{
Function<void(int32)> Job;
int32 Index;
int64 JobKey;
};
template<>
@@ -82,6 +83,12 @@ public:
}
};
struct JobContext
{
volatile int64 RefCount = 0;
volatile int64 JobsLeft = 0;
};
namespace
{
JobSystemService JobSystemInstance;
@@ -89,8 +96,9 @@ namespace
int32 ThreadsCount = 0;
bool JobStartingOnDispatch = true;
volatile int64 ExitFlag = 0;
volatile int64 DoneLabel = 0;
volatile int64 NextLabel = 0;
volatile int64 JobLabel = 0;
Dictionary<int64, JobContext> JobContexts;
CriticalSection JobContextMutex;
ConditionVariable JobsSignal;
CriticalSection JobsMutex;
ConditionVariable WaitSignal;
@@ -194,7 +202,17 @@ int32 JobSystemThread::Run()
data.Job(data.Index);
// Move forward with the job queue
Platform::InterlockedIncrement(&DoneLabel);
JobContextMutex.Lock();
JobContext& context = JobContexts.At(data.JobKey);
Platform::InterlockedDecrement(&context.JobsLeft);
ASSERT(Platform::AtomicRead(&context.JobsLeft) >= 0)
if (context.JobsLeft <= 0)
{
if (Platform::AtomicRead(&context.RefCount) == 0)
JobContexts.Remove(data.JobKey);
}
JobContextMutex.Unlock();
WaitSignal.NotifyAll();
data.Job.Unbind();
@@ -221,9 +239,18 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
#if JOB_SYSTEM_USE_STATS
const auto start = Platform::GetTimeCycles();
#endif
const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount;
JobData data;
data.Job = job;
data.JobKey = label;
JobContext context;
context.JobsLeft = jobCount;
JobContextMutex.Lock();
JobContexts.Add(label, context);
JobContextMutex.Unlock();
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
@@ -234,7 +261,6 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.enqueue(data);
#endif
const auto label = Platform::InterlockedAdd(&NextLabel, (int64)jobCount) + jobCount;
#if JOB_SYSTEM_USE_STATS
LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start));
@@ -259,7 +285,20 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
void JobSystem::Wait()
{
#if JOB_SYSTEM_ENABLED
Wait(Platform::AtomicRead(&NextLabel));
JobContextMutex.Lock();
int32 numJobs = JobContexts.Count();
JobContextMutex.Unlock();
while (numJobs > 0)
{
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
JobContextMutex.Lock();
numJobs = JobContexts.Count();
JobContextMutex.Unlock();
}
#endif
}
@@ -268,17 +307,32 @@ void JobSystem::Wait(int64 label)
#if JOB_SYSTEM_ENABLED
PROFILE_CPU();
JobContextMutex.Lock();
JobContext* context = JobContexts.TryGet(label);
if (context)
Platform::InterlockedIncrement(&context->RefCount);
JobContextMutex.Unlock();
// Early out
if (label <= Platform::AtomicRead(&DoneLabel))
if (!context)
return;
// Wait on signal until input label is not yet done
do
if (Platform::AtomicRead(&context->JobsLeft) > 0)
{
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
} while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0);
// Wait on signal until input label is not yet done
do
{
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
} while (Platform::AtomicRead(&context->JobsLeft) > 0 && Platform::AtomicRead(&ExitFlag) == 0);
}
JobContextMutex.Lock();
Platform::InterlockedDecrement(&context->RefCount);
if (Platform::AtomicRead(&context->RefCount) == 0)
JobContexts.Remove(label);
JobContextMutex.Unlock();
#if JOB_SYSTEM_USE_STATS
LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount);
@@ -294,9 +348,13 @@ void JobSystem::SetJobStartingOnDispatch(bool value)
if (value)
{
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
const int32 count = Jobs.Count();
JobsLocker.Unlock();
#else
const int32 count = Jobs.Count();
#endif
if (count == 1)
JobsSignal.NotifyOne();
else if (count != 0)