diff --git a/Source/Engine/Core/Collections/ChunkedArray.h b/Source/Engine/Core/Collections/ChunkedArray.h index 1ed78ffe2..0d24e980d 100644 --- a/Source/Engine/Core/Collections/ChunkedArray.h +++ b/Source/Engine/Core/Collections/ChunkedArray.h @@ -46,7 +46,6 @@ public: /// /// Gets the amount of the elements in the collection. /// - /// The amount of the elements in the collection. FORCE_INLINE int32 Count() const { return _count; @@ -55,7 +54,6 @@ public: /// /// Gets the amount of the elements that can be hold by collection without resizing. /// - /// The current capacity of the collection. FORCE_INLINE int32 Capacity() const { return _chunks.Count() * ChunkSize; @@ -64,7 +62,6 @@ public: /// /// Returns true if array isn't empty. /// - /// True if array has any elements added, otherwise it is empty. FORCE_INLINE bool HasItems() const { return _count != 0; @@ -73,7 +70,6 @@ public: /// /// Returns true if collection is empty. /// - /// True if array is empty, otherwise it has any elements added. FORCE_INLINE bool IsEmpty() const { return _count == 0; @@ -154,20 +150,12 @@ public: public: - /// - /// Checks if iterator is in the end of the collection. - /// - /// True if is in the end, otherwise false. bool IsEnd() const { ASSERT(_collection); return Index() == _collection->Count(); } - /// - /// Checks if iterator is not in the end of the collection. - /// - /// True if is not in the end, otherwise false. bool IsNotEnd() const { ASSERT(_collection); @@ -331,6 +319,36 @@ public: return &chunk->At(chunk->Count() - 1); } + /// + /// Adds the one item to the collection and returns the reference to it. + /// + /// The reference to the added item. + T& AddOne() + { + // Find first chunk with some space + Chunk* chunk = nullptr; + for (int32 i = 0; i < _chunks.Count(); i++) + { + if (_chunks[i]->Count() < ChunkSize) + { + chunk = _chunks[i]; + break; + } + } + + // Allocate chunk if missing + if (chunk == nullptr) + { + chunk = New(); + chunk->SetCapacity(ChunkSize); + _chunks.Add(chunk); + } + + // Add item + _count++; + return chunk->AddOne(); + } + /// /// Removes the element at specified iterator position. /// @@ -408,7 +426,6 @@ public: /// The new size. void Resize(int32 newSize) { - // Check if shrink if (newSize < Count()) { MISSING_CODE("shrinking ChunkedArray on Resize"); @@ -439,7 +456,6 @@ public: chunkIndex++; } } - ASSERT(newSize == Count()); } diff --git a/Source/Engine/Core/Collections/RingBuffer.h b/Source/Engine/Core/Collections/RingBuffer.h index bffc70e40..8a8deb86f 100644 --- a/Source/Engine/Core/Collections/RingBuffer.h +++ b/Source/Engine/Core/Collections/RingBuffer.h @@ -48,7 +48,7 @@ public: { if (_capacity == 0 || _capacity == _count) { - const int32 capacity = _allocation.CalculateCapacityGrow(_capacity, 0); + const int32 capacity = _allocation.CalculateCapacityGrow(_capacity, _count + 1); AllocationData alloc; alloc.Allocate(capacity); const int32 frontCount = Math::Min(_capacity - _front, _count); @@ -79,6 +79,18 @@ public: return _allocation.Get()[_front]; } + FORCE_INLINE T& operator[](int32 index) + { + ASSERT(index >= 0 && index < _count); + return _allocation.Get()[(_front + index) % _capacity]; + } + + FORCE_INLINE const T& operator[](int32 index) const + { + ASSERT(index >= 0 && index < _count); + return _allocation.Get()[(_front + index) % _capacity]; + } + void PopFront() { ASSERT(_front != _back); diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index b878bbdea..0a29bdecf 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -2,12 +2,35 @@ #include "JobSystem.h" #include "IRunnable.h" -#include "Engine/Core/Collections/RingBuffer.h" #include "Engine/Platform/CPUInfo.h" #include "Engine/Platform/Thread.h" #include "Engine/Platform/ConditionVariable.h" #include "Engine/Engine/EngineService.h" #include "Engine/Profiler/ProfilerCPU.h" +#include "Engine/Scripting/ManagedCLR/MCore.h" +#if USE_MONO +#include "Engine/Scripting/ManagedCLR/MDomain.h" +#include +#include +#endif + +// Jobs storage perf info: +// (500 jobs, i7 9th gen) +// JOB_SYSTEM_USE_MUTEX=1, enqueue=130-280 cycles, dequeue=2-6 cycles +// JOB_SYSTEM_USE_MUTEX=0, enqueue=300-700 cycles, dequeue=10-16 cycles +// So using RingBuffer+Mutex+Signals is better than moodycamel::ConcurrentQueue + +#define JOB_SYSTEM_USE_MUTEX 1 +#define JOB_SYSTEM_USE_STATS 0 + +#if JOB_SYSTEM_USE_STATS +#include "Engine/Core/Log.h" +#endif +#if JOB_SYSTEM_USE_MUTEX +#include "Engine/Core/Collections/RingBuffer.h" +#else +#include "ConcurrentQueue.h" +#endif class JobSystemService : public EngineService { @@ -27,7 +50,6 @@ struct JobData { Function Job; int32 Index; - int32 Count; }; template<> @@ -39,7 +61,7 @@ struct TIsPODType class JobSystemThread : public IRunnable { public: - int32 Index; + uint64 Index; public: @@ -65,10 +87,18 @@ namespace volatile int64 ExitFlag = 0; volatile int64 DoneLabel = 0; volatile int64 NextLabel = 0; - CriticalSection JobsLocker; ConditionVariable JobsSignal; ConditionVariable WaitSignal; +#if JOB_SYSTEM_USE_MUTEX + CriticalSection JobsLocker; RingBuffer> Jobs; +#else + ConcurrentQueue Jobs; +#endif +#if JOB_SYSTEM_USE_STATS + int64 DequeueCount = 0; + int64 DequeueSum = 0; +#endif } bool JobSystemService::Init() @@ -77,7 +107,7 @@ bool JobSystemService::Init() for (int32 i = 0; i < ThreadsCount; i++) { auto runnable = New(); - runnable->Index = i; + runnable->Index = (uint64)i; auto thread = Thread::Create(runnable, String::Format(TEXT("Job System {0}"), i), ThreadPriority::AboveNormal); if (thread == nullptr) return true; @@ -108,38 +138,57 @@ void JobSystemService::Dispose() int32 JobSystemThread::Run() { - Platform::SetThreadAffinityMask(1 << Index); + Platform::SetThreadAffinityMask(1ull << Index); JobData data; CriticalSection mutex; + bool attachMonoThread = true; +#if !JOB_SYSTEM_USE_MUTEX + moodycamel::ConsumerToken consumerToken(Jobs); +#endif while (Platform::AtomicRead(&ExitFlag) == 0) { // Try to get a job +#if JOB_SYSTEM_USE_STATS + const auto start = Platform::GetTimeCycles(); +#endif +#if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); if (Jobs.Count() != 0) { - auto& front = Jobs.PeekFront(); - data = front; - front.Index++; - if (front.Index == front.Count) - { - Jobs.PopFront(); - } + data = Jobs.PeekFront(); + Jobs.PopFront(); } JobsLocker.Unlock(); +#else + if (!Jobs.try_dequeue(consumerToken, data)) + data.Job.Unbind(); +#endif +#if JOB_SYSTEM_USE_STATS + Platform::InterlockedIncrement(&DequeueCount); + Platform::InterlockedAdd(&DequeueSum, Platform::GetTimeCycles() - start); +#endif if (data.Job.IsBinded()) { +#if USE_MONO + // Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System) + if (attachMonoThread && !mono_domain_get()) + { + const auto domain = MCore::Instance()->GetActiveDomain(); + mono_thread_attach(domain->GetNative()); + attachMonoThread = false; + } +#endif + // Run job data.Job(data.Index); - data.Job.Unbind(); - if (data.Index + 1 == data.Count) - { - // Move forward with the job queue - Platform::InterlockedIncrement(&DoneLabel); - WaitSignal.NotifyAll(); - } + // Move forward with the job queue + Platform::InterlockedIncrement(&DoneLabel); + WaitSignal.NotifyAll(); + + data.Job.Unbind(); } else { @@ -157,16 +206,27 @@ int64 JobSystem::Dispatch(const Function& job, int32 jobCount) PROFILE_CPU(); if (jobCount <= 0) return 0; +#if JOB_SYSTEM_USE_STATS + const auto start = Platform::GetTimeCycles(); +#endif JobData data; data.Job = job; - data.Index = 0; - data.Count = jobCount; +#if JOB_SYSTEM_USE_MUTEX JobsLocker.Lock(); - const auto label = Platform::InterlockedIncrement(&NextLabel); - Jobs.PushBack(data); + for (data.Index = 0; data.Index < jobCount; data.Index++) + Jobs.PushBack(data); JobsLocker.Unlock(); +#else + for (data.Index = 0; data.Index < jobCount; data.Index++) + Jobs.enqueue(data); +#endif + const auto label = Platform::InterlockedAdd(&NextLabel, (int64)jobCount) + jobCount; + +#if JOB_SYSTEM_USE_STATS + LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start)); +#endif if (jobCount == 1) JobsSignal.NotifyOne(); @@ -191,10 +251,15 @@ void JobSystem::Wait(int64 label) // Wait on signal until input label is not yet done CriticalSection mutex; - while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0) + do { mutex.Lock(); - WaitSignal.Wait(mutex); + WaitSignal.Wait(mutex, 1); mutex.Unlock(); - } + } while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0); + +#if JOB_SYSTEM_USE_STATS + LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount); + DequeueSum = DequeueCount = 0; +#endif }