Add customizable per-platform affinity for content and pool threads

This commit is contained in:
Wojtek Figat
2025-07-29 10:41:41 +02:00
parent 17c0892ff1
commit 99323c1d2f
4 changed files with 27 additions and 10 deletions

View File

@@ -4,8 +4,13 @@
#include "Engine/Core/Config.h" #include "Engine/Core/Config.h"
// Amount of content loading threads per single physical CPU core // Amount of content loading threads per single logical CPU core
#ifndef LOADING_THREAD_PER_LOGICAL_CORE
#define LOADING_THREAD_PER_LOGICAL_CORE 0.5f #define LOADING_THREAD_PER_LOGICAL_CORE 0.5f
#endif
// Enables pinning loading threads to the logical CPU cores with affinity mask
//#define LOADING_THREAD_AFFINITY_MASK(thread) (1 << (thread + 1))
// Enables additional assets metadata verification // Enables additional assets metadata verification
#define ASSETS_LOADING_EXTRA_VERIFICATION (BUILD_DEBUG || USE_EDITOR) #define ASSETS_LOADING_EXTRA_VERIFICATION (BUILD_DEBUG || USE_EDITOR)

View File

@@ -129,17 +129,17 @@ bool ContentService::Init()
LOG(Info, "Creating {0} content loading threads...", count); LOG(Info, "Creating {0} content loading threads...", count);
MainLoadThread = New<LoadingThread>(); MainLoadThread = New<LoadingThread>();
ThisLoadThread = MainLoadThread; ThisLoadThread = MainLoadThread;
LoadThreads.EnsureCapacity(count); LoadThreads.Resize(count);
for (int32 i = 0; i < count; i++) for (int32 i = 0; i < count; i++)
{ {
auto thread = New<LoadingThread>(); auto thread = New<LoadingThread>();
LoadThreads[i] = thread;
if (thread->Start(String::Format(TEXT("Load Thread {0}"), i))) if (thread->Start(String::Format(TEXT("Load Thread {0}"), i)))
{ {
LOG(Fatal, "Cannot spawn content thread {0}/{1}", i, count); LOG(Fatal, "Cannot spawn content thread {0}/{1}", i, count);
Delete(thread); Delete(thread);
return true; return true;
} }
LoadThreads.Add(thread);
} }
return false; return false;
@@ -339,6 +339,9 @@ int32 LoadingThread::Run()
return -1; return -1;
} }
#endif #endif
#ifdef LOADING_THREAD_AFFINITY_MASK
Platform::SetThreadAffinityMask(LOADING_THREAD_AFFINITY_MASK(LoadThreads.Find(this)));
#endif
ContentLoadTask* task; ContentLoadTask* task;
ThisLoadThread = this; ThisLoadThread = this;

View File

@@ -28,6 +28,9 @@ namespace ThreadPoolImpl
ConcurrentTaskQueue<ThreadPoolTask> Jobs; // Hello Steve! ConcurrentTaskQueue<ThreadPoolTask> Jobs; // Hello Steve!
ConditionVariable JobsSignal; ConditionVariable JobsSignal;
CriticalSection JobsMutex; CriticalSection JobsMutex;
#ifdef THREAD_POOL_AFFINITY_MASK
volatile int64 ThreadIndex = 0;
#endif
} }
String ThreadPoolTask::ToString() const String ThreadPoolTask::ToString() const
@@ -63,11 +66,12 @@ bool ThreadPoolService::Init()
PROFILE_MEM(EngineThreading); PROFILE_MEM(EngineThreading);
// Spawn threads // Spawn threads
const int32 numThreads = Math::Clamp<int32>(Platform::GetCPUInfo().ProcessorCoreCount - 1, 2, PLATFORM_THREADS_LIMIT / 2); const CPUInfo cpuInfo = Platform::GetCPUInfo();
LOG(Info, "Spawning {0} Thread Pool workers", numThreads); const int32 count = Math::Clamp<int32>(cpuInfo.ProcessorCoreCount - 1, 2, PLATFORM_THREADS_LIMIT / 2);
for (int32 i = ThreadPoolImpl::Threads.Count(); i < numThreads; i++) LOG(Info, "Spawning {0} Thread Pool workers", count);
ThreadPoolImpl::Threads.Resize(count);
for (int32 i = 0; i < count; i++)
{ {
// Create tread
auto runnable = New<SimpleRunnable>(true); auto runnable = New<SimpleRunnable>(true);
runnable->OnWork.Bind(ThreadPool::ThreadProc); runnable->OnWork.Bind(ThreadPool::ThreadProc);
auto thread = Thread::Create(runnable, String::Format(TEXT("Thread Pool {0}"), i)); auto thread = Thread::Create(runnable, String::Format(TEXT("Thread Pool {0}"), i));
@@ -76,9 +80,7 @@ bool ThreadPoolService::Init()
LOG(Error, "Failed to spawn {0} thread in the Thread Pool", i + 1); LOG(Error, "Failed to spawn {0} thread in the Thread Pool", i + 1);
return true; return true;
} }
ThreadPoolImpl::Threads[i] = thread;
// Add to the list
ThreadPoolImpl::Threads.Add(thread);
} }
return false; return false;
@@ -110,6 +112,10 @@ void ThreadPoolService::Dispose()
int32 ThreadPool::ThreadProc() int32 ThreadPool::ThreadProc()
{ {
#ifdef THREAD_POOL_AFFINITY_MASK
const int64 index = Platform::InterlockedIncrement(&ThreadPoolImpl::ThreadIndex) - 1;
Platform::SetThreadAffinityMask(THREAD_POOL_AFFINITY_MASK((int32)index));
#endif
ThreadPoolTask* task; ThreadPoolTask* task;
// Work until end // Work until end

View File

@@ -4,6 +4,9 @@
#include "Engine/Core/Types/BaseTypes.h" #include "Engine/Core/Types/BaseTypes.h"
// Enables pinning thread pool to the logical CPU cores with affinity mask
//#define THREAD_POOL_AFFINITY_MASK(thread) (1 << (thread + 1))
/// <summary> /// <summary>
/// Main engine thread pool for threaded tasks system. /// Main engine thread pool for threaded tasks system.
/// </summary> /// </summary>