Finish job system

This commit is contained in:
Wojtek Figat
2021-06-10 19:08:48 +02:00
parent 07ad94de13
commit 544cb1ff6d
3 changed files with 135 additions and 42 deletions

View File

@@ -46,7 +46,6 @@ public:
/// <summary>
/// Gets the amount of the elements in the collection.
/// </summary>
/// <returns>The amount of the elements in the collection.</returns>
FORCE_INLINE int32 Count() const
{
return _count;
@@ -55,7 +54,6 @@ public:
/// <summary>
/// Gets the amount of the elements that can be hold by collection without resizing.
/// </summary>
/// <returns>The current capacity of the collection.</returns>
FORCE_INLINE int32 Capacity() const
{
return _chunks.Count() * ChunkSize;
@@ -64,7 +62,6 @@ public:
/// <summary>
/// Returns true if array isn't empty.
/// </summary>
/// <returns>True if array has any elements added, otherwise it is empty.</returns>
FORCE_INLINE bool HasItems() const
{
return _count != 0;
@@ -73,7 +70,6 @@ public:
/// <summary>
/// Returns true if collection is empty.
/// </summary>
/// <returns>True if array is empty, otherwise it has any elements added.</returns>
FORCE_INLINE bool IsEmpty() const
{
return _count == 0;
@@ -154,20 +150,12 @@ public:
public:
/// <summary>
/// Checks if iterator is in the end of the collection.
/// </summary>
/// <returns>True if is in the end, otherwise false.</returns>
bool IsEnd() const
{
ASSERT(_collection);
return Index() == _collection->Count();
}
/// <summary>
/// Checks if iterator is not in the end of the collection.
/// </summary>
/// <returns>True if is not in the end, otherwise false.</returns>
bool IsNotEnd() const
{
ASSERT(_collection);
@@ -331,6 +319,36 @@ public:
return &chunk->At(chunk->Count() - 1);
}
/// <summary>
/// Adds the one item to the collection and returns the reference to it.
/// </summary>
/// <returns>The reference to the added item.</returns>
T& AddOne()
{
// Find first chunk with some space
Chunk* chunk = nullptr;
for (int32 i = 0; i < _chunks.Count(); i++)
{
if (_chunks[i]->Count() < ChunkSize)
{
chunk = _chunks[i];
break;
}
}
// Allocate chunk if missing
if (chunk == nullptr)
{
chunk = New<Chunk>();
chunk->SetCapacity(ChunkSize);
_chunks.Add(chunk);
}
// Add item
_count++;
return chunk->AddOne();
}
/// <summary>
/// Removes the element at specified iterator position.
/// </summary>
@@ -408,7 +426,6 @@ public:
/// <param name="newSize">The new size.</param>
void Resize(int32 newSize)
{
// Check if shrink
if (newSize < Count())
{
MISSING_CODE("shrinking ChunkedArray on Resize");
@@ -439,7 +456,6 @@ public:
chunkIndex++;
}
}
ASSERT(newSize == Count());
}

View File

@@ -48,7 +48,7 @@ public:
{
if (_capacity == 0 || _capacity == _count)
{
const int32 capacity = _allocation.CalculateCapacityGrow(_capacity, 0);
const int32 capacity = _allocation.CalculateCapacityGrow(_capacity, _count + 1);
AllocationData alloc;
alloc.Allocate(capacity);
const int32 frontCount = Math::Min(_capacity - _front, _count);
@@ -79,6 +79,18 @@ public:
return _allocation.Get()[_front];
}
FORCE_INLINE T& operator[](int32 index)
{
ASSERT(index >= 0 && index < _count);
return _allocation.Get()[(_front + index) % _capacity];
}
FORCE_INLINE const T& operator[](int32 index) const
{
ASSERT(index >= 0 && index < _count);
return _allocation.Get()[(_front + index) % _capacity];
}
void PopFront()
{
ASSERT(_front != _back);

View File

@@ -2,12 +2,35 @@
#include "JobSystem.h"
#include "IRunnable.h"
#include "Engine/Core/Collections/RingBuffer.h"
#include "Engine/Platform/CPUInfo.h"
#include "Engine/Platform/Thread.h"
#include "Engine/Platform/ConditionVariable.h"
#include "Engine/Engine/EngineService.h"
#include "Engine/Profiler/ProfilerCPU.h"
#include "Engine/Scripting/ManagedCLR/MCore.h"
#if USE_MONO
#include "Engine/Scripting/ManagedCLR/MDomain.h"
#include <ThirdParty/mono-2.0/mono/metadata/appdomain.h>
#include <ThirdParty/mono-2.0/mono/metadata/threads.h>
#endif
// Jobs storage perf info:
// (500 jobs, i7 9th gen)
// JOB_SYSTEM_USE_MUTEX=1, enqueue=130-280 cycles, dequeue=2-6 cycles
// JOB_SYSTEM_USE_MUTEX=0, enqueue=300-700 cycles, dequeue=10-16 cycles
// So using RingBuffer+Mutex+Signals is better than moodycamel::ConcurrentQueue
#define JOB_SYSTEM_USE_MUTEX 1
#define JOB_SYSTEM_USE_STATS 0
#if JOB_SYSTEM_USE_STATS
#include "Engine/Core/Log.h"
#endif
#if JOB_SYSTEM_USE_MUTEX
#include "Engine/Core/Collections/RingBuffer.h"
#else
#include "ConcurrentQueue.h"
#endif
class JobSystemService : public EngineService
{
@@ -27,7 +50,6 @@ struct JobData
{
Function<void(int32)> Job;
int32 Index;
int32 Count;
};
template<>
@@ -39,7 +61,7 @@ struct TIsPODType<JobData>
class JobSystemThread : public IRunnable
{
public:
int32 Index;
uint64 Index;
public:
@@ -65,10 +87,18 @@ namespace
volatile int64 ExitFlag = 0;
volatile int64 DoneLabel = 0;
volatile int64 NextLabel = 0;
CriticalSection JobsLocker;
ConditionVariable JobsSignal;
ConditionVariable WaitSignal;
#if JOB_SYSTEM_USE_MUTEX
CriticalSection JobsLocker;
RingBuffer<JobData, InlinedAllocation<256>> Jobs;
#else
ConcurrentQueue<JobData> Jobs;
#endif
#if JOB_SYSTEM_USE_STATS
int64 DequeueCount = 0;
int64 DequeueSum = 0;
#endif
}
bool JobSystemService::Init()
@@ -77,7 +107,7 @@ bool JobSystemService::Init()
for (int32 i = 0; i < ThreadsCount; i++)
{
auto runnable = New<JobSystemThread>();
runnable->Index = i;
runnable->Index = (uint64)i;
auto thread = Thread::Create(runnable, String::Format(TEXT("Job System {0}"), i), ThreadPriority::AboveNormal);
if (thread == nullptr)
return true;
@@ -108,38 +138,57 @@ void JobSystemService::Dispose()
int32 JobSystemThread::Run()
{
Platform::SetThreadAffinityMask(1 << Index);
Platform::SetThreadAffinityMask(1ull << Index);
JobData data;
CriticalSection mutex;
bool attachMonoThread = true;
#if !JOB_SYSTEM_USE_MUTEX
moodycamel::ConsumerToken consumerToken(Jobs);
#endif
while (Platform::AtomicRead(&ExitFlag) == 0)
{
// Try to get a job
#if JOB_SYSTEM_USE_STATS
const auto start = Platform::GetTimeCycles();
#endif
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
if (Jobs.Count() != 0)
{
auto& front = Jobs.PeekFront();
data = front;
front.Index++;
if (front.Index == front.Count)
{
Jobs.PopFront();
}
data = Jobs.PeekFront();
Jobs.PopFront();
}
JobsLocker.Unlock();
#else
if (!Jobs.try_dequeue(consumerToken, data))
data.Job.Unbind();
#endif
#if JOB_SYSTEM_USE_STATS
Platform::InterlockedIncrement(&DequeueCount);
Platform::InterlockedAdd(&DequeueSum, Platform::GetTimeCycles() - start);
#endif
if (data.Job.IsBinded())
{
#if USE_MONO
// Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System)
if (attachMonoThread && !mono_domain_get())
{
const auto domain = MCore::Instance()->GetActiveDomain();
mono_thread_attach(domain->GetNative());
attachMonoThread = false;
}
#endif
// Run job
data.Job(data.Index);
data.Job.Unbind();
if (data.Index + 1 == data.Count)
{
// Move forward with the job queue
Platform::InterlockedIncrement(&DoneLabel);
WaitSignal.NotifyAll();
}
// Move forward with the job queue
Platform::InterlockedIncrement(&DoneLabel);
WaitSignal.NotifyAll();
data.Job.Unbind();
}
else
{
@@ -157,16 +206,27 @@ int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
PROFILE_CPU();
if (jobCount <= 0)
return 0;
#if JOB_SYSTEM_USE_STATS
const auto start = Platform::GetTimeCycles();
#endif
JobData data;
data.Job = job;
data.Index = 0;
data.Count = jobCount;
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
const auto label = Platform::InterlockedIncrement(&NextLabel);
Jobs.PushBack(data);
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
JobsLocker.Unlock();
#else
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.enqueue(data);
#endif
const auto label = Platform::InterlockedAdd(&NextLabel, (int64)jobCount) + jobCount;
#if JOB_SYSTEM_USE_STATS
LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start));
#endif
if (jobCount == 1)
JobsSignal.NotifyOne();
@@ -191,10 +251,15 @@ void JobSystem::Wait(int64 label)
// Wait on signal until input label is not yet done
CriticalSection mutex;
while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0)
do
{
mutex.Lock();
WaitSignal.Wait(mutex);
WaitSignal.Wait(mutex, 1);
mutex.Unlock();
}
} while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0);
#if JOB_SYSTEM_USE_STATS
LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount);
DequeueSum = DequeueCount = 0;
#endif
}