From dc2a9c070289eb1d2018abb4c23f1319c4b7de23 Mon Sep 17 00:00:00 2001 From: Ari Vuollet Date: Sun, 5 Jun 2022 11:23:11 +0300 Subject: [PATCH 1/2] Support dispatching jobs within jobs in JobSystem --- Source/Engine/Threading/JobSystem.cpp | 82 +++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 12 deletions(-) diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index de3d1c301..0fcee47f9 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -53,6 +53,7 @@ struct JobData { Function Job; int32 Index; + int64 JobKey; }; template<> @@ -82,6 +83,12 @@ public: } }; +struct JobContext +{ + volatile int64 RefCount = 0; + volatile int64 JobsLeft = 0; +}; + namespace { JobSystemService JobSystemInstance; @@ -89,8 +96,9 @@ namespace int32 ThreadsCount = 0; bool JobStartingOnDispatch = true; volatile int64 ExitFlag = 0; - volatile int64 DoneLabel = 0; - volatile int64 NextLabel = 0; + volatile int64 JobLabel = 0; + Dictionary JobContexts; + CriticalSection JobContextMutex; ConditionVariable JobsSignal; CriticalSection JobsMutex; ConditionVariable WaitSignal; @@ -194,7 +202,17 @@ int32 JobSystemThread::Run() data.Job(data.Index); // Move forward with the job queue - Platform::InterlockedIncrement(&DoneLabel); + JobContextMutex.Lock(); + JobContext& context = JobContexts.At(data.JobKey); + Platform::InterlockedDecrement(&context.JobsLeft); + ASSERT(Platform::AtomicRead(&context.JobsLeft) >= 0) + if (context.JobsLeft <= 0) + { + if (Platform::AtomicRead(&context.RefCount) == 0) + JobContexts.Remove(data.JobKey); + } + JobContextMutex.Unlock(); + WaitSignal.NotifyAll(); data.Job.Unbind(); @@ -221,9 +239,18 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) #if JOB_SYSTEM_USE_STATS const auto start = Platform::GetTimeCycles(); #endif + const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount; JobData data; data.Job = job; + data.JobKey = label; + + JobContext context; + context.JobsLeft = jobCount; + + JobContextMutex.Lock(); + JobContexts.Add(label, context); + JobContextMutex.Unlock(); #if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); @@ -234,7 +261,6 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.enqueue(data); #endif - const auto label = Platform::InterlockedAdd(&NextLabel, (int64)jobCount) + jobCount; #if JOB_SYSTEM_USE_STATS LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start)); @@ -259,7 +285,20 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) void JobSystem::Wait() { #if JOB_SYSTEM_ENABLED - Wait(Platform::AtomicRead(&NextLabel)); + JobContextMutex.Lock(); + int32 numJobs = JobContexts.Count(); + JobContextMutex.Unlock(); + + while (numJobs > 0) + { + WaitMutex.Lock(); + WaitSignal.Wait(WaitMutex, 1); + WaitMutex.Unlock(); + + JobContextMutex.Lock(); + numJobs = JobContexts.Count(); + JobContextMutex.Unlock(); + } #endif } @@ -268,17 +307,32 @@ void JobSystem::Wait(int64 label) #if JOB_SYSTEM_ENABLED PROFILE_CPU(); + JobContextMutex.Lock(); + JobContext* context = JobContexts.TryGet(label); + if (context) + Platform::InterlockedIncrement(&context->RefCount); + JobContextMutex.Unlock(); + // Early out - if (label <= Platform::AtomicRead(&DoneLabel)) + if (!context) return; - // Wait on signal until input label is not yet done - do + if (Platform::AtomicRead(&context->JobsLeft) > 0) { - WaitMutex.Lock(); - WaitSignal.Wait(WaitMutex, 1); - WaitMutex.Unlock(); - } while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0); + // Wait on signal until input label is not yet done + do + { + WaitMutex.Lock(); + WaitSignal.Wait(WaitMutex, 1); + WaitMutex.Unlock(); + } while (Platform::AtomicRead(&context->JobsLeft) > 0 && Platform::AtomicRead(&ExitFlag) == 0); + } + + JobContextMutex.Lock(); + Platform::InterlockedDecrement(&context->RefCount); + if (Platform::AtomicRead(&context->RefCount) == 0) + JobContexts.Remove(label); + JobContextMutex.Unlock(); #if JOB_SYSTEM_USE_STATS LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount); @@ -294,9 +348,13 @@ void JobSystem::SetJobStartingOnDispatch(bool value) if (value) { +#if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); const int32 count = Jobs.Count(); JobsLocker.Unlock(); +#else + const int32 count = Jobs.Count(); +#endif if (count == 1) JobsSignal.NotifyOne(); else if (count != 0) From 1978949b22a7d34a4276047dde398f22f4aed7e7 Mon Sep 17 00:00:00 2001 From: Ari Vuollet Date: Sun, 12 Jun 2022 13:23:54 +0300 Subject: [PATCH 2/2] Allocate job context dynamically to prevent deadlocks --- Source/Engine/Threading/JobSystem.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index 0fcee47f9..c3d4a074b 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -97,7 +97,7 @@ namespace bool JobStartingOnDispatch = true; volatile int64 ExitFlag = 0; volatile int64 JobLabel = 0; - Dictionary JobContexts; + Dictionary JobContexts; CriticalSection JobContextMutex; ConditionVariable JobsSignal; CriticalSection JobsMutex; @@ -203,14 +203,12 @@ int32 JobSystemThread::Run() // Move forward with the job queue JobContextMutex.Lock(); - JobContext& context = JobContexts.At(data.JobKey); - Platform::InterlockedDecrement(&context.JobsLeft); - ASSERT(Platform::AtomicRead(&context.JobsLeft) >= 0) - if (context.JobsLeft <= 0) - { - if (Platform::AtomicRead(&context.RefCount) == 0) - JobContexts.Remove(data.JobKey); - } + JobContext* context = JobContexts.At(data.JobKey); + Platform::InterlockedDecrement(&context->JobsLeft); + Platform::InterlockedDecrement(&context->RefCount); + if (context->JobsLeft <= 0 && Platform::AtomicRead(&context->RefCount) == 0) + JobContexts.Remove(data.JobKey); + JobContextMutex.Unlock(); WaitSignal.NotifyAll(); @@ -245,8 +243,10 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) data.Job = job; data.JobKey = label; - JobContext context; - context.JobsLeft = jobCount; + // Allocate dynamically in case the dictionary gets reallocated + JobContext* context = New(); + context->JobsLeft = jobCount; + Platform::InterlockedIncrement(&context->RefCount); JobContextMutex.Lock(); JobContexts.Add(label, context); @@ -308,7 +308,7 @@ void JobSystem::Wait(int64 label) PROFILE_CPU(); JobContextMutex.Lock(); - JobContext* context = JobContexts.TryGet(label); + JobContext* context = JobContexts.At(label); if (context) Platform::InterlockedIncrement(&context->RefCount); JobContextMutex.Unlock();