Improve Job System changes to prevent dynamic memory allocation and ref counting (#721)

This commit is contained in:
Wojciech Figat
2022-07-05 08:52:53 +02:00
parent e4a4e7926e
commit ec7aece135

View File

@@ -38,7 +38,6 @@
class JobSystemService : public EngineService
{
public:
JobSystemService()
: EngineService(TEXT("JobSystem"), -800)
{
@@ -68,7 +67,6 @@ public:
uint64 Index;
public:
// [IRunnable]
String ToString() const override
{
@@ -85,8 +83,13 @@ public:
struct JobContext
{
volatile int64 RefCount = 0;
volatile int64 JobsLeft = 0;
volatile int64 JobsLeft;
};
template<>
struct TIsPODType<JobContext>
{
enum { Value = true };
};
namespace
@@ -97,14 +100,13 @@ namespace
bool JobStartingOnDispatch = true;
volatile int64 ExitFlag = 0;
volatile int64 JobLabel = 0;
Dictionary<int64, JobContext*> JobContexts;
CriticalSection JobContextMutex;
Dictionary<int64, JobContext> JobContexts;
ConditionVariable JobsSignal;
CriticalSection JobsMutex;
ConditionVariable WaitSignal;
CriticalSection WaitMutex;
#if JOB_SYSTEM_USE_MUTEX
CriticalSection JobsLocker;
#if JOB_SYSTEM_USE_MUTEX
RingBuffer<JobData, InlinedAllocation<256>> Jobs;
#else
ConcurrentQueue<JobData> Jobs;
@@ -202,14 +204,14 @@ int32 JobSystemThread::Run()
data.Job(data.Index);
// Move forward with the job queue
JobContextMutex.Lock();
JobContext* context = JobContexts.At(data.JobKey);
Platform::InterlockedDecrement(&context->JobsLeft);
Platform::InterlockedDecrement(&context->RefCount);
if (context->JobsLeft <= 0 && Platform::AtomicRead(&context->RefCount) == 0)
JobsLocker.Lock();
JobContext& context = JobContexts.At(data.JobKey);
if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0)
{
ASSERT_LOW_LAYER(context.JobsLeft <= 0);
JobContexts.Remove(data.JobKey);
JobContextMutex.Unlock();
}
JobsLocker.Unlock();
WaitSignal.NotifyAll();
@@ -260,21 +262,19 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
data.Job = job;
data.JobKey = label;
// Allocate dynamically in case the dictionary gets reallocated
JobContext* context = New<JobContext>();
context->JobsLeft = jobCount;
Platform::InterlockedIncrement(&context->RefCount);
JobContextMutex.Lock();
JobContexts.Add(label, context);
JobContextMutex.Unlock();
JobContext context;
context.JobsLeft = jobCount;
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
JobContexts.Add(label, context);
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
JobsLocker.Unlock();
#else
JobsLocker.Lock();
JobContexts.Add(label, context);
JobsLocker.Unlock();
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.enqueue(data);
#endif
@@ -302,9 +302,9 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
void JobSystem::Wait()
{
#if JOB_SYSTEM_ENABLED
JobContextMutex.Lock();
JobsLocker.Lock();
int32 numJobs = JobContexts.Count();
JobContextMutex.Unlock();
JobsLocker.Unlock();
while (numJobs > 0)
{
@@ -312,9 +312,9 @@ void JobSystem::Wait()
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
JobContextMutex.Lock();
JobsLocker.Lock();
numJobs = JobContexts.Count();
JobContextMutex.Unlock();
JobsLocker.Unlock();
}
#endif
}
@@ -324,32 +324,21 @@ void JobSystem::Wait(int64 label)
#if JOB_SYSTEM_ENABLED
PROFILE_CPU();
JobContextMutex.Lock();
JobContext* context = JobContexts.At(label);
if (context)
Platform::InterlockedIncrement(&context->RefCount);
JobContextMutex.Unlock();
// Early out
if (!context)
return;
if (Platform::AtomicRead(&context->JobsLeft) > 0)
while (Platform::AtomicRead(&ExitFlag) == 0)
{
// Wait on signal until input label is not yet done
do
{
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
} while (Platform::AtomicRead(&context->JobsLeft) > 0 && Platform::AtomicRead(&ExitFlag) == 0);
}
JobsLocker.Lock();
const JobContext* context = JobContexts.TryGet(label);
JobsLocker.Unlock();
JobContextMutex.Lock();
Platform::InterlockedDecrement(&context->RefCount);
if (Platform::AtomicRead(&context->RefCount) == 0)
JobContexts.Remove(label);
JobContextMutex.Unlock();
// Skip if context has been already executed (last job removes it)
if (!context)
break;
// Wait on signal until input label is not yet done
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
}
#if JOB_SYSTEM_USE_STATS
LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount);