Merge branch 'jobsystem_changes' of https://github.com/GoaLitiuM/FlaxEngine into GoaLitiuM-jobsystem_changes
This commit is contained in:
@@ -53,6 +53,7 @@ struct JobData
|
||||
{
|
||||
Function<void(int32)> Job;
|
||||
int32 Index;
|
||||
int64 JobKey;
|
||||
};
|
||||
|
||||
template<>
|
||||
@@ -82,6 +83,12 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
struct JobContext
|
||||
{
|
||||
volatile int64 RefCount = 0;
|
||||
volatile int64 JobsLeft = 0;
|
||||
};
|
||||
|
||||
namespace
|
||||
{
|
||||
JobSystemService JobSystemInstance;
|
||||
@@ -89,8 +96,9 @@ namespace
|
||||
int32 ThreadsCount = 0;
|
||||
bool JobStartingOnDispatch = true;
|
||||
volatile int64 ExitFlag = 0;
|
||||
volatile int64 DoneLabel = 0;
|
||||
volatile int64 NextLabel = 0;
|
||||
volatile int64 JobLabel = 0;
|
||||
Dictionary<int64, JobContext*> JobContexts;
|
||||
CriticalSection JobContextMutex;
|
||||
ConditionVariable JobsSignal;
|
||||
CriticalSection JobsMutex;
|
||||
ConditionVariable WaitSignal;
|
||||
@@ -194,7 +202,15 @@ int32 JobSystemThread::Run()
|
||||
data.Job(data.Index);
|
||||
|
||||
// Move forward with the job queue
|
||||
Platform::InterlockedIncrement(&DoneLabel);
|
||||
JobContextMutex.Lock();
|
||||
JobContext* context = JobContexts.At(data.JobKey);
|
||||
Platform::InterlockedDecrement(&context->JobsLeft);
|
||||
Platform::InterlockedDecrement(&context->RefCount);
|
||||
if (context->JobsLeft <= 0 && Platform::AtomicRead(&context->RefCount) == 0)
|
||||
JobContexts.Remove(data.JobKey);
|
||||
|
||||
JobContextMutex.Unlock();
|
||||
|
||||
WaitSignal.NotifyAll();
|
||||
|
||||
data.Job.Unbind();
|
||||
@@ -238,9 +254,20 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
|
||||
#if JOB_SYSTEM_USE_STATS
|
||||
const auto start = Platform::GetTimeCycles();
|
||||
#endif
|
||||
const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount;
|
||||
|
||||
JobData data;
|
||||
data.Job = job;
|
||||
data.JobKey = label;
|
||||
|
||||
// Allocate dynamically in case the dictionary gets reallocated
|
||||
JobContext* context = New<JobContext>();
|
||||
context->JobsLeft = jobCount;
|
||||
Platform::InterlockedIncrement(&context->RefCount);
|
||||
|
||||
JobContextMutex.Lock();
|
||||
JobContexts.Add(label, context);
|
||||
JobContextMutex.Unlock();
|
||||
|
||||
#if JOB_SYSTEM_USE_MUTEX
|
||||
JobsLocker.Lock();
|
||||
@@ -251,7 +278,6 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
|
||||
for (data.Index = 0; data.Index < jobCount; data.Index++)
|
||||
Jobs.enqueue(data);
|
||||
#endif
|
||||
const auto label = Platform::InterlockedAdd(&NextLabel, (int64)jobCount) + jobCount;
|
||||
|
||||
#if JOB_SYSTEM_USE_STATS
|
||||
LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start));
|
||||
@@ -276,7 +302,20 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
|
||||
void JobSystem::Wait()
|
||||
{
|
||||
#if JOB_SYSTEM_ENABLED
|
||||
Wait(Platform::AtomicRead(&NextLabel));
|
||||
JobContextMutex.Lock();
|
||||
int32 numJobs = JobContexts.Count();
|
||||
JobContextMutex.Unlock();
|
||||
|
||||
while (numJobs > 0)
|
||||
{
|
||||
WaitMutex.Lock();
|
||||
WaitSignal.Wait(WaitMutex, 1);
|
||||
WaitMutex.Unlock();
|
||||
|
||||
JobContextMutex.Lock();
|
||||
numJobs = JobContexts.Count();
|
||||
JobContextMutex.Unlock();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -285,17 +324,32 @@ void JobSystem::Wait(int64 label)
|
||||
#if JOB_SYSTEM_ENABLED
|
||||
PROFILE_CPU();
|
||||
|
||||
JobContextMutex.Lock();
|
||||
JobContext* context = JobContexts.At(label);
|
||||
if (context)
|
||||
Platform::InterlockedIncrement(&context->RefCount);
|
||||
JobContextMutex.Unlock();
|
||||
|
||||
// Early out
|
||||
if (label <= Platform::AtomicRead(&DoneLabel))
|
||||
if (!context)
|
||||
return;
|
||||
|
||||
// Wait on signal until input label is not yet done
|
||||
do
|
||||
if (Platform::AtomicRead(&context->JobsLeft) > 0)
|
||||
{
|
||||
WaitMutex.Lock();
|
||||
WaitSignal.Wait(WaitMutex, 1);
|
||||
WaitMutex.Unlock();
|
||||
} while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0);
|
||||
// Wait on signal until input label is not yet done
|
||||
do
|
||||
{
|
||||
WaitMutex.Lock();
|
||||
WaitSignal.Wait(WaitMutex, 1);
|
||||
WaitMutex.Unlock();
|
||||
} while (Platform::AtomicRead(&context->JobsLeft) > 0 && Platform::AtomicRead(&ExitFlag) == 0);
|
||||
}
|
||||
|
||||
JobContextMutex.Lock();
|
||||
Platform::InterlockedDecrement(&context->RefCount);
|
||||
if (Platform::AtomicRead(&context->RefCount) == 0)
|
||||
JobContexts.Remove(label);
|
||||
JobContextMutex.Unlock();
|
||||
|
||||
#if JOB_SYSTEM_USE_STATS
|
||||
LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount);
|
||||
@@ -311,9 +365,13 @@ void JobSystem::SetJobStartingOnDispatch(bool value)
|
||||
|
||||
if (value)
|
||||
{
|
||||
#if JOB_SYSTEM_USE_MUTEX
|
||||
JobsLocker.Lock();
|
||||
const int32 count = Jobs.Count();
|
||||
JobsLocker.Unlock();
|
||||
#else
|
||||
const int32 count = Jobs.Count();
|
||||
#endif
|
||||
if (count == 1)
|
||||
JobsSignal.NotifyOne();
|
||||
else if (count != 0)
|
||||
|
||||
Reference in New Issue
Block a user