diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index 6637df4e4..234e05dab 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -38,7 +38,6 @@ class JobSystemService : public EngineService { public: - JobSystemService() : EngineService(TEXT("JobSystem"), -800) { @@ -68,7 +67,6 @@ public: uint64 Index; public: - // [IRunnable] String ToString() const override { @@ -85,8 +83,13 @@ public: struct JobContext { - volatile int64 RefCount = 0; - volatile int64 JobsLeft = 0; + volatile int64 JobsLeft; +}; + +template<> +struct TIsPODType +{ + enum { Value = true }; }; namespace @@ -97,14 +100,13 @@ namespace bool JobStartingOnDispatch = true; volatile int64 ExitFlag = 0; volatile int64 JobLabel = 0; - Dictionary JobContexts; - CriticalSection JobContextMutex; + Dictionary JobContexts; ConditionVariable JobsSignal; CriticalSection JobsMutex; ConditionVariable WaitSignal; CriticalSection WaitMutex; -#if JOB_SYSTEM_USE_MUTEX CriticalSection JobsLocker; +#if JOB_SYSTEM_USE_MUTEX RingBuffer> Jobs; #else ConcurrentQueue Jobs; @@ -202,14 +204,14 @@ int32 JobSystemThread::Run() data.Job(data.Index); // Move forward with the job queue - JobContextMutex.Lock(); - JobContext* context = JobContexts.At(data.JobKey); - Platform::InterlockedDecrement(&context->JobsLeft); - Platform::InterlockedDecrement(&context->RefCount); - if (context->JobsLeft <= 0 && Platform::AtomicRead(&context->RefCount) == 0) + JobsLocker.Lock(); + JobContext& context = JobContexts.At(data.JobKey); + if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0) + { + ASSERT_LOW_LAYER(context.JobsLeft <= 0); JobContexts.Remove(data.JobKey); - - JobContextMutex.Unlock(); + } + JobsLocker.Unlock(); WaitSignal.NotifyAll(); @@ -260,21 +262,19 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) data.Job = job; data.JobKey = label; - // Allocate dynamically in case the dictionary gets reallocated - JobContext* context = New(); - context->JobsLeft = jobCount; - Platform::InterlockedIncrement(&context->RefCount); - - JobContextMutex.Lock(); - JobContexts.Add(label, context); - JobContextMutex.Unlock(); + JobContext context; + context.JobsLeft = jobCount; #if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); + JobContexts.Add(label, context); for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.PushBack(data); JobsLocker.Unlock(); #else + JobsLocker.Lock(); + JobContexts.Add(label, context); + JobsLocker.Unlock(); for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.enqueue(data); #endif @@ -302,9 +302,9 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) void JobSystem::Wait() { #if JOB_SYSTEM_ENABLED - JobContextMutex.Lock(); + JobsLocker.Lock(); int32 numJobs = JobContexts.Count(); - JobContextMutex.Unlock(); + JobsLocker.Unlock(); while (numJobs > 0) { @@ -312,9 +312,9 @@ void JobSystem::Wait() WaitSignal.Wait(WaitMutex, 1); WaitMutex.Unlock(); - JobContextMutex.Lock(); + JobsLocker.Lock(); numJobs = JobContexts.Count(); - JobContextMutex.Unlock(); + JobsLocker.Unlock(); } #endif } @@ -324,32 +324,21 @@ void JobSystem::Wait(int64 label) #if JOB_SYSTEM_ENABLED PROFILE_CPU(); - JobContextMutex.Lock(); - JobContext* context = JobContexts.At(label); - if (context) - Platform::InterlockedIncrement(&context->RefCount); - JobContextMutex.Unlock(); - - // Early out - if (!context) - return; - - if (Platform::AtomicRead(&context->JobsLeft) > 0) + while (Platform::AtomicRead(&ExitFlag) == 0) { - // Wait on signal until input label is not yet done - do - { - WaitMutex.Lock(); - WaitSignal.Wait(WaitMutex, 1); - WaitMutex.Unlock(); - } while (Platform::AtomicRead(&context->JobsLeft) > 0 && Platform::AtomicRead(&ExitFlag) == 0); - } + JobsLocker.Lock(); + const JobContext* context = JobContexts.TryGet(label); + JobsLocker.Unlock(); - JobContextMutex.Lock(); - Platform::InterlockedDecrement(&context->RefCount); - if (Platform::AtomicRead(&context->RefCount) == 0) - JobContexts.Remove(label); - JobContextMutex.Unlock(); + // Skip if context has been already executed (last job removes it) + if (!context) + break; + + // Wait on signal until input label is not yet done + WaitMutex.Lock(); + WaitSignal.Wait(WaitMutex, 1); + WaitMutex.Unlock(); + } #if JOB_SYSTEM_USE_STATS LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount);