Add support for running engine with a single-thread only (content, jobs, drawing, physics)

This commit is contained in:
Wojtek Figat
2026-02-16 16:10:31 +01:00
parent 6f13a33be2
commit d1557e5292
6 changed files with 123 additions and 12 deletions

View File

@@ -80,10 +80,14 @@ namespace
// Loading assets
THREADLOCAL LoadingThread* ThisLoadThread = nullptr;
LoadingThread* MainLoadThread = nullptr;
#if PLATFORM_THREADS_LIMIT > 1
Array<LoadingThread*> LoadThreads;
ConcurrentTaskQueue<ContentLoadTask> LoadTasks;
ConditionVariable LoadTasksSignal;
CriticalSection LoadTasksMutex;
#else
Array<ContentLoadTask*> LoadTasks;
#endif
// Unloading assets
Dictionary<Asset*, TimeSpan> UnloadQueue;
@@ -125,11 +129,12 @@ bool ContentService::Init()
Cache.Init();
// Create loading threads
MainLoadThread = New<LoadingThread>();
ThisLoadThread = MainLoadThread;
#if PLATFORM_THREADS_LIMIT > 1
const CPUInfo cpuInfo = Platform::GetCPUInfo();
const int32 count = Math::Clamp(Math::CeilToInt(LOADING_THREAD_PER_LOGICAL_CORE * (float)cpuInfo.LogicalProcessorCount), 1, 12);
LOG(Info, "Creating {0} content loading threads...", count);
MainLoadThread = New<LoadingThread>();
ThisLoadThread = MainLoadThread;
LoadThreads.Resize(count);
for (int32 i = 0; i < count; i++)
{
@@ -142,6 +147,7 @@ bool ContentService::Init()
return true;
}
}
#endif
return false;
}
@@ -150,14 +156,29 @@ void ContentService::Update()
{
PROFILE_CPU();
ScopeLock lock(LoadedAssetsToInvokeLocker);
#if PLATFORM_THREADS_LIMIT == 1
// Run content-streaming tasks on a main thread
if (LoadTasks.HasItems())
{
double timeLimit = 0.01; // 10ms
double startTime = Platform::GetTimeSeconds();
do
{
auto task = LoadTasks[0];
LoadTasks.RemoveAt(0);
MainLoadThread->Run(task);
} while (LoadTasks.HasItems() && Platform::GetTimeSeconds() - startTime < timeLimit);
}
#endif
// Broadcast `OnLoaded` events
LoadedAssetsToInvokeLocker.Lock();
while (LoadedAssetsToInvoke.HasItems())
{
auto asset = LoadedAssetsToInvoke.Dequeue();
asset->onLoaded_MainThread();
}
LoadedAssetsToInvokeLocker.Unlock();
}
void ContentService::LateUpdate()
@@ -219,10 +240,12 @@ void ContentService::LateUpdate()
void ContentService::BeforeExit()
{
#if PLATFORM_THREADS_LIMIT > 1
// Signal threads to end work soon
for (auto thread : LoadThreads)
thread->NotifyExit();
LoadTasksSignal.NotifyAll();
#endif
}
void ContentService::Dispose()
@@ -251,6 +274,7 @@ void ContentService::Dispose()
// NOW dispose graphics device - where there is no loaded assets at all
Graphics::DisposeDevice();
#if PLATFORM_THREADS_LIMIT > 1
// Exit all load threads
for (auto thread : LoadThreads)
thread->NotifyExit();
@@ -258,12 +282,19 @@ void ContentService::Dispose()
for (auto thread : LoadThreads)
thread->Join();
LoadThreads.ClearDelete();
#endif
Delete(MainLoadThread);
MainLoadThread = nullptr;
ThisLoadThread = nullptr;
#if PLATFORM_THREADS_LIMIT > 1
// Cancel all remaining tasks (no chance to execute them)
LoadTasks.CancelAll();
#else
for (auto* e : LoadTasks)
e->Cancel();
LoadTasks.Clear();
#endif
}
IAssetFactory::Collection& IAssetFactory::Get()
@@ -329,6 +360,7 @@ String LoadingThread::ToString() const
int32 LoadingThread::Run()
{
#if PLATFORM_THREADS_LIMIT > 1
PROFILE_MEM(Content);
#if USE_EDITOR && PLATFORM_WINDOWS
// Initialize COM
@@ -366,6 +398,7 @@ int32 LoadingThread::Run()
}
ThisLoadThread = nullptr;
#endif
return 0;
}
@@ -382,7 +415,9 @@ String ContentLoadTask::ToString() const
void ContentLoadTask::Enqueue()
{
LoadTasks.Add(this);
#if PLATFORM_THREADS_LIMIT > 1
LoadTasksSignal.NotifyOne();
#endif
}
bool ContentLoadTask::Run()
@@ -1137,6 +1172,7 @@ void Content::WaitForTask(ContentLoadTask* loadingTask, double timeoutInMillisec
#define CHECK_CONDITIONS() (!Engine::ShouldExit() && (timeoutInSeconds <= 0.0 || Platform::GetTimeSeconds() - startTime < timeoutInSeconds))
do
{
#if PLATFORM_THREADS_LIMIT > 1
// Give opportunity for other threads to use the current core
if (loopCounter == 0)
; // First run is fast
@@ -1183,6 +1219,34 @@ void Content::WaitForTask(ContentLoadTask* loadingTask, double timeoutInMillisec
LoadTasks.enqueue_bulk(localQueue.Get(), localQueue.Count());
localQueue.Clear();
}
#else
// Try to execute content tasks
if (task->IsQueued() && CHECK_CONDITIONS() && !LoadTasks.Remove((ContentLoadTask*)task))
{
PROFILE_CPU_NAMED("Inline");
ZoneColor(0xffaaaaaa);
thread->Run((ContentLoadTask*)task);
}
while (!task->IsQueued() && CHECK_CONDITIONS() && LoadTasks.HasItems())
{
// Find a task that can be executed (some tasks may be waiting for other tasks to finish so they are not queued yet)
int32 index = 0;
for (int32 i = 0; i < LoadTasks.Count(); i++)
{
if (LoadTasks[i]->GetContinueWithTask() == task)
{
index = i;
break;
}
}
ContentLoadTask* tmp = LoadTasks[index];
LoadTasks.RemoveAt(index);
PROFILE_CPU_NAMED("Inline");
ZoneColor(0xffaaaaaa);
thread->Run(tmp);
}
#endif
// Check if task is done
if (task->IsEnded())

View File

@@ -5,14 +5,14 @@
#include "Engine/Threading/Task.h"
class Asset;
class LoadingThread;
/// <summary>
/// Describes content loading task object.
/// </summary>
class ContentLoadTask : public Task
{
friend LoadingThread;
friend class LoadingThread;
friend class ContentService;
public:
/// <summary>

View File

@@ -88,6 +88,7 @@ void SceneRendering::Draw(RenderContextBatch& renderContextBatch, DrawCategory c
// Draw all visual components
_drawListIndex = -1;
#if PLATFORM_THREADS_LIMIT > 1
if (_drawListSize >= 64 && category == SceneDrawAsync && renderContextBatch.EnableAsync)
{
// Run in async via Job System
@@ -97,6 +98,7 @@ void SceneRendering::Draw(RenderContextBatch& renderContextBatch, DrawCategory c
renderContextBatch.WaitLabels.Add(waitLabel);
}
else
#endif
{
// Scene is small so draw on a main-thread
DrawActorsJob(0);

View File

@@ -539,11 +539,35 @@ protected:
hitInfo = hit.shape ? static_cast<PhysicsColliderActor*>(hit.shape->userData) : nullptr; \
}
#if PLATFORM_THREADS_LIMIT <= 1
class DummyCpuDispatcher : public PxCpuDispatcher
{
public:
void submitTask(PxBaseTask& task) override
{
// Run directly
PROFILE_CPU_NAMED("Physics");
task.run();
task.release();
}
uint32_t getWorkerCount() const override
{
return 1;
}
};
#endif
namespace
{
PxFoundation* Foundation = nullptr;
PxPhysics* PhysX = nullptr;
#if PLATFORM_THREADS_LIMIT > 1
PxDefaultCpuDispatcher* CpuDispatcher = nullptr;
#else
DummyCpuDispatcher* CpuDispatcher = nullptr;
#endif
#if WITH_PVD
PxPvd* PVD = nullptr;
#endif
@@ -1736,7 +1760,11 @@ void PhysicsBackend::Shutdown()
#if WITH_PVD
RELEASE_PHYSX(PVD);
#endif
#if PLATFORM_THREADS_LIMIT > 1
RELEASE_PHYSX(CpuDispatcher);
#else
SAFE_DELETE(CpuDispatcher);
#endif
RELEASE_PHYSX(Foundation);
SceneOrigins.Clear();
}
@@ -1796,9 +1824,14 @@ void* PhysicsBackend::CreateScene(const PhysicsSettings& settings)
{
if (CpuDispatcher == nullptr)
{
#if PLATFORM_THREADS_LIMIT > 1
uint32 threads = Math::Clamp<uint32>(Platform::GetCPUInfo().ProcessorCoreCount - 1, 1, 8);
CpuDispatcher = PxDefaultCpuDispatcherCreate(threads);
CHECK_INIT(CpuDispatcher, "PxDefaultCpuDispatcherCreate failed!");
#else
CpuDispatcher = New<DummyCpuDispatcher>();
#endif
CHECK_INIT(CpuDispatcher, "PxDefaultCpuDispatcherCreate failed!");
}
sceneDesc.cpuDispatcher = CpuDispatcher;
}

View File

@@ -14,7 +14,7 @@
#endif
#include "Engine/Scripting/Internal/InternalCalls.h"
#define JOB_SYSTEM_ENABLED 1
#define JOB_SYSTEM_ENABLED (PLATFORM_THREADS_LIMIT > 1)
#if JOB_SYSTEM_ENABLED

View File

@@ -22,9 +22,16 @@ FLAXENGINE_API bool IsInMainThread()
return Globals::MainThreadID == Platform::GetCurrentThreadID();
}
String ThreadPoolTask::ToString() const
{
return String::Format(TEXT("Thread Pool Task ({0})"), (int32)GetState());
}
#if PLATFORM_THREADS_LIMIT > 1
namespace ThreadPoolImpl
{
volatile int64 ExitFlag = 0;
volatile intptr ExitFlag = 0;
Array<Thread*> Threads;
ConcurrentTaskQueue<ThreadPoolTask> Jobs; // Hello Steve!
ConditionVariable JobsSignal;
@@ -34,11 +41,6 @@ namespace ThreadPoolImpl
#endif
}
String ThreadPoolTask::ToString() const
{
return String::Format(TEXT("Thread Pool Task ({0})"), (int32)GetState());
}
void ThreadPoolTask::Enqueue()
{
PROFILE_MEM(EngineThreading);
@@ -141,3 +143,13 @@ int32 ThreadPool::ThreadProc()
return 0;
}
#else
void ThreadPoolTask::Enqueue()
{
// Run task on the main thread (fallback when no threading is supported)
Execute();
}
#endif