Optimize job system memory allocations

This commit is contained in:
Wojtek Figat
2024-06-24 23:19:01 +02:00
parent 3bbaa8dad0
commit b545d8800c
5 changed files with 145 additions and 95 deletions

View File

@@ -6,6 +6,8 @@
#include "Engine/Platform/Thread.h"
#include "Engine/Platform/ConditionVariable.h"
#include "Engine/Core/Types/Span.h"
#include "Engine/Core/Types/Pair.h"
#include "Engine/Core/Memory/SimpleHeapAllocation.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Core/Collections/RingBuffer.h"
#include "Engine/Engine/EngineService.h"
@@ -18,6 +20,14 @@
#if JOB_SYSTEM_ENABLED
// Local allocator for job system memory that uses internal pooling and assumes that JobsLocker is taken (write access owned by the calling thread).
class JobSystemAllocation : public SimpleHeapAllocation<JobSystemAllocation>
{
public:
static void* Allocate(uintptr size);
static void Free(void* ptr, uintptr size);
};
class JobSystemService : public EngineService
{
public:
@@ -46,9 +56,9 @@ struct TIsPODType<JobData>
struct JobContext
{
volatile int64 JobsLeft;
volatile int64 DependenciesLeft;
int32 DependenciesLeft;
Function<void(int32)> Job;
Array<int64> Dependants;
Array<int64, JobSystemAllocation> Dependants;
};
template<>
@@ -80,12 +90,13 @@ public:
namespace
{
JobSystemService JobSystemInstance;
Array<Pair<void*, uintptr>> MemPool;
Thread* Threads[PLATFORM_THREADS_LIMIT / 2] = {};
int32 ThreadsCount = 0;
bool JobStartingOnDispatch = true;
volatile int64 ExitFlag = 0;
volatile int64 JobLabel = 0;
Dictionary<int64, JobContext> JobContexts;
Dictionary<int64, JobContext, JobSystemAllocation> JobContexts;
ConditionVariable JobsSignal;
CriticalSection JobsMutex;
ConditionVariable WaitSignal;
@@ -94,6 +105,28 @@ namespace
RingBuffer<JobData> Jobs;
}
void* JobSystemAllocation::Allocate(uintptr size)
{
void* result = nullptr;
for (int32 i = 0; i < MemPool.Count(); i++)
{
if (MemPool.Get()[i].Second == size)
{
result = MemPool.Get()[i].First;
MemPool.RemoveAt(i);
break;
}
}
if (!result)
result = Platform::Allocate(size, 16);
return result;
}
void JobSystemAllocation::Free(void* ptr, uintptr size)
{
MemPool.Add({ ptr, size });
}
bool JobSystemService::Init()
{
ThreadsCount = Math::Min<int32>(Platform::GetCPUInfo().LogicalProcessorCount, ARRAY_COUNT(Threads));
@@ -130,6 +163,12 @@ void JobSystemService::Dispose()
Threads[i] = nullptr;
}
}
JobContexts.SetCapacity(0);
Jobs.Release();
for (auto& e : MemPool)
Platform::Free(e.First);
MemPool.Clear();
}
int32 JobSystemThread::Run()
@@ -176,7 +215,7 @@ int32 JobSystemThread::Run()
for (int64 dependant : context.Dependants)
{
JobContext& dependantContext = JobContexts.At(dependant);
if (Platform::InterlockedDecrement(&dependantContext.DependenciesLeft) <= 0)
if (--dependantContext.DependenciesLeft <= 0)
{
// Dispatch dependency when it's ready
JobData dependantData;
@@ -245,7 +284,7 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
context.DependenciesLeft = 0;
JobsLocker.Lock();
JobContexts.Add(label, context);
JobContexts.Add(label, MoveTemp(context));
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
JobsLocker.Unlock();
@@ -291,9 +330,10 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, Span<int64> dependen
dependencyContext->Dependants.Add(label);
}
}
JobContexts.Add(label, context);
JobContexts.Add(label, MoveTemp(context));
if (context.DependenciesLeft == 0)
{
// No dependencies left to complete so dispatch now
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
}