diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index 090bab2ea..d3f879124 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -5,6 +5,7 @@ #include "Engine/Platform/CPUInfo.h" #include "Engine/Platform/Thread.h" #include "Engine/Platform/ConditionVariable.h" +#include "Engine/Core/Types/Span.h" #include "Engine/Core/Collections/Dictionary.h" #include "Engine/Engine/EngineService.h" #include "Engine/Profiler/ProfilerCPU.h" @@ -48,13 +49,26 @@ public: struct JobData { - Function Job; int32 Index; int64 JobKey; }; template<> struct TIsPODType +{ + enum { Value = true }; +}; + +struct JobContext +{ + volatile int64 JobsLeft; + volatile int64 DependenciesLeft; + Function Job; + Array Dependants; +}; + +template<> +struct TIsPODType { enum { Value = false }; }; @@ -79,17 +93,6 @@ public: } }; -struct JobContext -{ - volatile int64 JobsLeft; -}; - -template<> -struct TIsPODType -{ - enum { Value = true }; -}; - namespace { JobSystemService JobSystemInstance; @@ -158,6 +161,7 @@ int32 JobSystemThread::Run() Platform::SetThreadAffinityMask(1ull << Index); JobData data; + Function job; bool attachCSharpThread = true; #if !JOB_SYSTEM_USE_MUTEX moodycamel::ConsumerToken consumerToken(Jobs); @@ -174,18 +178,23 @@ int32 JobSystemThread::Run() { data = Jobs.PeekFront(); Jobs.PopFront(); + const JobContext& context = ((const Dictionary&)JobContexts).At(data.JobKey); + job = context.Job; } JobsLocker.Unlock(); #else - if (!Jobs.try_dequeue(consumerToken, data)) - data.Job.Unbind(); + if (Jobs.try_dequeue(consumerToken, data)) + { + const JobContext& context = ((const Dictionary&)JobContexts).At(data.JobKey); + job = context.Job; + } #endif #if JOB_SYSTEM_USE_STATS Platform::InterlockedIncrement(&DequeueCount); Platform::InterlockedAdd(&DequeueSum, Platform::GetTimeCycles() - start); #endif - if (data.Job.IsBinded()) + if (job.IsBinded()) { #if USE_CSHARP // Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System) @@ -197,7 +206,7 @@ int32 JobSystemThread::Run() #endif // Run job - data.Job(data.Index); + job(data.Index); // Move forward with the job queue bool notifyWaiting = false; @@ -205,16 +214,33 @@ int32 JobSystemThread::Run() JobContext& context = JobContexts.At(data.JobKey); if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0) { - ASSERT_LOW_LAYER(context.JobsLeft <= 0); + // Update any dependant jobs + for (int64 dependant : context.Dependants) + { + JobContext& dependantContext = JobContexts.At(dependant); + if (Platform::InterlockedDecrement(&dependantContext.DependenciesLeft) <= 0) + { + // Dispatch dependency when it's ready + JobData dependantData; + dependantData.JobKey = dependant; + for (dependantData.Index = 0; dependantData.Index < dependantContext.JobsLeft; dependantData.Index++) +#if JOB_SYSTEM_USE_MUTEX + Jobs.PushBack(dependantData); +#else + Jobs.enqueue(dependantData); +#endif + } + } + + // Remove completed context JobContexts.Remove(data.JobKey); notifyWaiting = true; } JobsLocker.Unlock(); - if (notifyWaiting) WaitSignal.NotifyAll(); - data.Job.Unbind(); + job.Unbind(); } else { @@ -250,9 +276,9 @@ void JobSystem::Execute(const Function& job, int32 jobCount) int64 JobSystem::Dispatch(const Function& job, int32 jobCount) { - PROFILE_CPU(); if (jobCount <= 0) return 0; + PROFILE_CPU(); #if JOB_SYSTEM_ENABLED #if JOB_SYSTEM_USE_STATS const auto start = Platform::GetTimeCycles(); @@ -260,21 +286,20 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount; JobData data; - data.Job = job; data.JobKey = label; JobContext context; + context.Job = job; context.JobsLeft = jobCount; + context.DependenciesLeft = 0; -#if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); JobContexts.Add(label, context); +#if JOB_SYSTEM_USE_MUTEX for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.PushBack(data); JobsLocker.Unlock(); #else - JobsLocker.Lock(); - JobContexts.Add(label, context); JobsLocker.Unlock(); for (data.Index = 0; data.Index < jobCount; data.Index++) Jobs.enqueue(data); @@ -300,6 +325,64 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) #endif } +int64 JobSystem::Dispatch(const Function& job, Span dependencies, int32 jobCount) +{ + if (jobCount <= 0) + return 0; + PROFILE_CPU(); +#if JOB_SYSTEM_ENABLED + const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount; + + JobData data; + data.JobKey = label; + + JobContext context; + context.Job = job; + context.JobsLeft = jobCount; + context.DependenciesLeft = 0; + + JobsLocker.Lock(); + for (int64 dependency : dependencies) + { + if (JobContext* dependencyContext = JobContexts.TryGet(dependency)) + { + context.DependenciesLeft++; + dependencyContext->Dependants.Add(label); + } + } + JobContexts.Add(label, context); +#if JOB_SYSTEM_USE_MUTEX + if (context.DependenciesLeft == 0) + { + for (data.Index = 0; data.Index < jobCount; data.Index++) + Jobs.PushBack(data); + } + JobsLocker.Unlock(); +#else + JobsLocker.Unlock(); + if (dispatchNow) + { + for (data.Index = 0; data.Index < jobCount; data.Index++) + Jobs.enqueue(data); + } +#endif + + if (context.DependenciesLeft == 0 && JobStartingOnDispatch) + { + if (jobCount == 1) + JobsSignal.NotifyOne(); + else + JobsSignal.NotifyAll(); + } + + return label; +#else + for (int32 i = 0; i < jobCount; i++) + job(i); + return 0; +#endif +} + void JobSystem::Wait() { #if JOB_SYSTEM_ENABLED diff --git a/Source/Engine/Threading/JobSystem.h b/Source/Engine/Threading/JobSystem.h index d269aa196..c6b4500fa 100644 --- a/Source/Engine/Threading/JobSystem.h +++ b/Source/Engine/Threading/JobSystem.h @@ -4,6 +4,9 @@ #include "Engine/Core/Delegate.h" +template +class Span; + /// /// Lightweight multi-threaded jobs execution scheduler. Uses a pool of threads and supports work-stealing concept. /// @@ -26,6 +29,15 @@ API_CLASS(Static) class FLAXENGINE_API JobSystem /// The label identifying this dispatch. Can be used to wait for the execution end. API_FUNCTION() static int64 Dispatch(const Function& job, int32 jobCount = 1); + /// + /// Dispatches the job for the execution after all of dependant jobs will complete. + /// + /// The job. Argument is an index of the job execution. + /// The list of dependant jobs that need to complete in order to start executing this job. + /// The job executions count. + /// The label identifying this dispatch. Can be used to wait for the execution end. + API_FUNCTION() static int64 Dispatch(const Function& job, Span dependencies, int32 jobCount = 1); + /// /// Waits for all dispatched jobs to finish. ///