diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index c16a1596d..6637df4e4 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -53,6 +53,7 @@ struct JobData { Function Job; int32 Index; + int64 JobKey; }; template<> @@ -82,6 +83,12 @@ public: } }; +struct JobContext +{ + volatile int64 RefCount = 0; + volatile int64 JobsLeft = 0; +}; + namespace { JobSystemService JobSystemInstance; @@ -89,8 +96,9 @@ namespace int32 ThreadsCount = 0; bool JobStartingOnDispatch = true; volatile int64 ExitFlag = 0; - volatile int64 DoneLabel = 0; - volatile int64 NextLabel = 0; + volatile int64 JobLabel = 0; + Dictionary JobContexts; + CriticalSection JobContextMutex; ConditionVariable JobsSignal; CriticalSection JobsMutex; ConditionVariable WaitSignal; @@ -194,7 +202,15 @@ int32 JobSystemThread::Run() data.Job(data.Index); // Move forward with the job queue - Platform::InterlockedIncrement(&DoneLabel); + JobContextMutex.Lock(); + JobContext* context = JobContexts.At(data.JobKey); + Platform::InterlockedDecrement(&context->JobsLeft); + Platform::InterlockedDecrement(&context->RefCount); + if (context->JobsLeft <= 0 && Platform::AtomicRead(&context->RefCount) == 0) + JobContexts.Remove(data.JobKey); + + JobContextMutex.Unlock(); + WaitSignal.NotifyAll(); data.Job.Unbind(); @@ -238,9 +254,20 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) #if JOB_SYSTEM_USE_STATS const auto start = Platform::GetTimeCycles(); #endif + const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount; JobData data; data.Job = job; + data.JobKey = label; + + // Allocate dynamically in case the dictionary gets reallocated + JobContext* context = New(); + context->JobsLeft = jobCount; + Platform::InterlockedIncrement(&context->RefCount); + + JobContextMutex.Lock(); + JobContexts.Add(label, context); + JobContextMutex.Unlock(); #if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); @@ -251,7 +278,6 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.enqueue(data); #endif - const auto label = Platform::InterlockedAdd(&NextLabel, (int64)jobCount) + jobCount; #if JOB_SYSTEM_USE_STATS LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start)); @@ -276,7 +302,20 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) void JobSystem::Wait() { #if JOB_SYSTEM_ENABLED - Wait(Platform::AtomicRead(&NextLabel)); + JobContextMutex.Lock(); + int32 numJobs = JobContexts.Count(); + JobContextMutex.Unlock(); + + while (numJobs > 0) + { + WaitMutex.Lock(); + WaitSignal.Wait(WaitMutex, 1); + WaitMutex.Unlock(); + + JobContextMutex.Lock(); + numJobs = JobContexts.Count(); + JobContextMutex.Unlock(); + } #endif } @@ -285,17 +324,32 @@ void JobSystem::Wait(int64 label) #if JOB_SYSTEM_ENABLED PROFILE_CPU(); + JobContextMutex.Lock(); + JobContext* context = JobContexts.At(label); + if (context) + Platform::InterlockedIncrement(&context->RefCount); + JobContextMutex.Unlock(); + // Early out - if (label <= Platform::AtomicRead(&DoneLabel)) + if (!context) return; - // Wait on signal until input label is not yet done - do + if (Platform::AtomicRead(&context->JobsLeft) > 0) { - WaitMutex.Lock(); - WaitSignal.Wait(WaitMutex, 1); - WaitMutex.Unlock(); - } while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0); + // Wait on signal until input label is not yet done + do + { + WaitMutex.Lock(); + WaitSignal.Wait(WaitMutex, 1); + WaitMutex.Unlock(); + } while (Platform::AtomicRead(&context->JobsLeft) > 0 && Platform::AtomicRead(&ExitFlag) == 0); + } + + JobContextMutex.Lock(); + Platform::InterlockedDecrement(&context->RefCount); + if (Platform::AtomicRead(&context->RefCount) == 0) + JobContexts.Remove(label); + JobContextMutex.Unlock(); #if JOB_SYSTEM_USE_STATS LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount); @@ -311,9 +365,13 @@ void JobSystem::SetJobStartingOnDispatch(bool value) if (value) { +#if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); const int32 count = Jobs.Count(); JobsLocker.Unlock(); +#else + const int32 count = Jobs.Count(); +#endif if (count == 1) JobsSignal.NotifyOne(); else if (count != 0)