diff --git a/Source/Engine/Content/Content.cpp b/Source/Engine/Content/Content.cpp index 59d2dfffa..1e2e267d9 100644 --- a/Source/Engine/Content/Content.cpp +++ b/Source/Engine/Content/Content.cpp @@ -80,10 +80,14 @@ namespace // Loading assets THREADLOCAL LoadingThread* ThisLoadThread = nullptr; LoadingThread* MainLoadThread = nullptr; +#if PLATFORM_THREADS_LIMIT > 1 Array LoadThreads; ConcurrentTaskQueue LoadTasks; ConditionVariable LoadTasksSignal; CriticalSection LoadTasksMutex; +#else + Array LoadTasks; +#endif // Unloading assets Dictionary UnloadQueue; @@ -125,11 +129,12 @@ bool ContentService::Init() Cache.Init(); // Create loading threads + MainLoadThread = New(); + ThisLoadThread = MainLoadThread; +#if PLATFORM_THREADS_LIMIT > 1 const CPUInfo cpuInfo = Platform::GetCPUInfo(); const int32 count = Math::Clamp(Math::CeilToInt(LOADING_THREAD_PER_LOGICAL_CORE * (float)cpuInfo.LogicalProcessorCount), 1, 12); LOG(Info, "Creating {0} content loading threads...", count); - MainLoadThread = New(); - ThisLoadThread = MainLoadThread; LoadThreads.Resize(count); for (int32 i = 0; i < count; i++) { @@ -142,6 +147,7 @@ bool ContentService::Init() return true; } } +#endif return false; } @@ -150,14 +156,29 @@ void ContentService::Update() { PROFILE_CPU(); - ScopeLock lock(LoadedAssetsToInvokeLocker); +#if PLATFORM_THREADS_LIMIT == 1 + // Run content-streaming tasks on a main thread + if (LoadTasks.HasItems()) + { + double timeLimit = 0.01; // 10ms + double startTime = Platform::GetTimeSeconds(); + do + { + auto task = LoadTasks[0]; + LoadTasks.RemoveAt(0); + MainLoadThread->Run(task); + } while (LoadTasks.HasItems() && Platform::GetTimeSeconds() - startTime < timeLimit); + } +#endif // Broadcast `OnLoaded` events + LoadedAssetsToInvokeLocker.Lock(); while (LoadedAssetsToInvoke.HasItems()) { auto asset = LoadedAssetsToInvoke.Dequeue(); asset->onLoaded_MainThread(); } + LoadedAssetsToInvokeLocker.Unlock(); } void ContentService::LateUpdate() @@ -219,10 +240,12 @@ void ContentService::LateUpdate() void ContentService::BeforeExit() { +#if PLATFORM_THREADS_LIMIT > 1 // Signal threads to end work soon for (auto thread : LoadThreads) thread->NotifyExit(); LoadTasksSignal.NotifyAll(); +#endif } void ContentService::Dispose() @@ -251,6 +274,7 @@ void ContentService::Dispose() // NOW dispose graphics device - where there is no loaded assets at all Graphics::DisposeDevice(); +#if PLATFORM_THREADS_LIMIT > 1 // Exit all load threads for (auto thread : LoadThreads) thread->NotifyExit(); @@ -258,12 +282,19 @@ void ContentService::Dispose() for (auto thread : LoadThreads) thread->Join(); LoadThreads.ClearDelete(); +#endif Delete(MainLoadThread); MainLoadThread = nullptr; ThisLoadThread = nullptr; +#if PLATFORM_THREADS_LIMIT > 1 // Cancel all remaining tasks (no chance to execute them) LoadTasks.CancelAll(); +#else + for (auto* e : LoadTasks) + e->Cancel(); + LoadTasks.Clear(); +#endif } IAssetFactory::Collection& IAssetFactory::Get() @@ -329,6 +360,7 @@ String LoadingThread::ToString() const int32 LoadingThread::Run() { +#if PLATFORM_THREADS_LIMIT > 1 PROFILE_MEM(Content); #if USE_EDITOR && PLATFORM_WINDOWS // Initialize COM @@ -366,6 +398,7 @@ int32 LoadingThread::Run() } ThisLoadThread = nullptr; +#endif return 0; } @@ -382,7 +415,9 @@ String ContentLoadTask::ToString() const void ContentLoadTask::Enqueue() { LoadTasks.Add(this); +#if PLATFORM_THREADS_LIMIT > 1 LoadTasksSignal.NotifyOne(); +#endif } bool ContentLoadTask::Run() @@ -1137,6 +1172,7 @@ void Content::WaitForTask(ContentLoadTask* loadingTask, double timeoutInMillisec #define CHECK_CONDITIONS() (!Engine::ShouldExit() && (timeoutInSeconds <= 0.0 || Platform::GetTimeSeconds() - startTime < timeoutInSeconds)) do { +#if PLATFORM_THREADS_LIMIT > 1 // Give opportunity for other threads to use the current core if (loopCounter == 0) ; // First run is fast @@ -1183,6 +1219,34 @@ void Content::WaitForTask(ContentLoadTask* loadingTask, double timeoutInMillisec LoadTasks.enqueue_bulk(localQueue.Get(), localQueue.Count()); localQueue.Clear(); } +#else + // Try to execute content tasks + if (task->IsQueued() && CHECK_CONDITIONS() && !LoadTasks.Remove((ContentLoadTask*)task)) + { + PROFILE_CPU_NAMED("Inline"); + ZoneColor(0xffaaaaaa); + thread->Run((ContentLoadTask*)task); + } + while (!task->IsQueued() && CHECK_CONDITIONS() && LoadTasks.HasItems()) + { + // Find a task that can be executed (some tasks may be waiting for other tasks to finish so they are not queued yet) + int32 index = 0; + for (int32 i = 0; i < LoadTasks.Count(); i++) + { + if (LoadTasks[i]->GetContinueWithTask() == task) + { + index = i; + break; + } + } + ContentLoadTask* tmp = LoadTasks[index]; + LoadTasks.RemoveAt(index); + + PROFILE_CPU_NAMED("Inline"); + ZoneColor(0xffaaaaaa); + thread->Run(tmp); + } +#endif // Check if task is done if (task->IsEnded()) diff --git a/Source/Engine/Content/Loading/ContentLoadTask.h b/Source/Engine/Content/Loading/ContentLoadTask.h index e04c090d6..6f82b883f 100644 --- a/Source/Engine/Content/Loading/ContentLoadTask.h +++ b/Source/Engine/Content/Loading/ContentLoadTask.h @@ -5,14 +5,14 @@ #include "Engine/Threading/Task.h" class Asset; -class LoadingThread; /// /// Describes content loading task object. /// class ContentLoadTask : public Task { - friend LoadingThread; + friend class LoadingThread; + friend class ContentService; public: /// diff --git a/Source/Engine/Level/Scene/SceneRendering.cpp b/Source/Engine/Level/Scene/SceneRendering.cpp index dc3a57ea8..d87e5580c 100644 --- a/Source/Engine/Level/Scene/SceneRendering.cpp +++ b/Source/Engine/Level/Scene/SceneRendering.cpp @@ -88,6 +88,7 @@ void SceneRendering::Draw(RenderContextBatch& renderContextBatch, DrawCategory c // Draw all visual components _drawListIndex = -1; +#if PLATFORM_THREADS_LIMIT > 1 if (_drawListSize >= 64 && category == SceneDrawAsync && renderContextBatch.EnableAsync) { // Run in async via Job System @@ -97,6 +98,7 @@ void SceneRendering::Draw(RenderContextBatch& renderContextBatch, DrawCategory c renderContextBatch.WaitLabels.Add(waitLabel); } else +#endif { // Scene is small so draw on a main-thread DrawActorsJob(0); diff --git a/Source/Engine/Physics/PhysX/PhysicsBackendPhysX.cpp b/Source/Engine/Physics/PhysX/PhysicsBackendPhysX.cpp index 8c3d3610e..a849791aa 100644 --- a/Source/Engine/Physics/PhysX/PhysicsBackendPhysX.cpp +++ b/Source/Engine/Physics/PhysX/PhysicsBackendPhysX.cpp @@ -539,11 +539,35 @@ protected: hitInfo = hit.shape ? static_cast(hit.shape->userData) : nullptr; \ } +#if PLATFORM_THREADS_LIMIT <= 1 + +class DummyCpuDispatcher : public PxCpuDispatcher +{ +public: + void submitTask(PxBaseTask& task) override + { + // Run directly + PROFILE_CPU_NAMED("Physics"); + task.run(); + task.release(); + } + uint32_t getWorkerCount() const override + { + return 1; + } +}; + +#endif + namespace { PxFoundation* Foundation = nullptr; PxPhysics* PhysX = nullptr; +#if PLATFORM_THREADS_LIMIT > 1 PxDefaultCpuDispatcher* CpuDispatcher = nullptr; +#else + DummyCpuDispatcher* CpuDispatcher = nullptr; +#endif #if WITH_PVD PxPvd* PVD = nullptr; #endif @@ -1736,7 +1760,11 @@ void PhysicsBackend::Shutdown() #if WITH_PVD RELEASE_PHYSX(PVD); #endif +#if PLATFORM_THREADS_LIMIT > 1 RELEASE_PHYSX(CpuDispatcher); +#else + SAFE_DELETE(CpuDispatcher); +#endif RELEASE_PHYSX(Foundation); SceneOrigins.Clear(); } @@ -1796,9 +1824,14 @@ void* PhysicsBackend::CreateScene(const PhysicsSettings& settings) { if (CpuDispatcher == nullptr) { +#if PLATFORM_THREADS_LIMIT > 1 uint32 threads = Math::Clamp(Platform::GetCPUInfo().ProcessorCoreCount - 1, 1, 8); CpuDispatcher = PxDefaultCpuDispatcherCreate(threads); CHECK_INIT(CpuDispatcher, "PxDefaultCpuDispatcherCreate failed!"); +#else + CpuDispatcher = New(); +#endif + CHECK_INIT(CpuDispatcher, "PxDefaultCpuDispatcherCreate failed!"); } sceneDesc.cpuDispatcher = CpuDispatcher; } diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp index 7f65e5210..4e895ff34 100644 --- a/Source/Engine/Threading/JobSystem.cpp +++ b/Source/Engine/Threading/JobSystem.cpp @@ -14,7 +14,7 @@ #endif #include "Engine/Scripting/Internal/InternalCalls.h" -#define JOB_SYSTEM_ENABLED 1 +#define JOB_SYSTEM_ENABLED (PLATFORM_THREADS_LIMIT > 1) #if JOB_SYSTEM_ENABLED diff --git a/Source/Engine/Threading/ThreadPool.cpp b/Source/Engine/Threading/ThreadPool.cpp index d7410d0fc..032f42bc1 100644 --- a/Source/Engine/Threading/ThreadPool.cpp +++ b/Source/Engine/Threading/ThreadPool.cpp @@ -22,9 +22,16 @@ FLAXENGINE_API bool IsInMainThread() return Globals::MainThreadID == Platform::GetCurrentThreadID(); } +String ThreadPoolTask::ToString() const +{ + return String::Format(TEXT("Thread Pool Task ({0})"), (int32)GetState()); +} + +#if PLATFORM_THREADS_LIMIT > 1 + namespace ThreadPoolImpl { - volatile int64 ExitFlag = 0; + volatile intptr ExitFlag = 0; Array Threads; ConcurrentTaskQueue Jobs; // Hello Steve! ConditionVariable JobsSignal; @@ -34,11 +41,6 @@ namespace ThreadPoolImpl #endif } -String ThreadPoolTask::ToString() const -{ - return String::Format(TEXT("Thread Pool Task ({0})"), (int32)GetState()); -} - void ThreadPoolTask::Enqueue() { PROFILE_MEM(EngineThreading); @@ -141,3 +143,13 @@ int32 ThreadPool::ThreadProc() return 0; } + +#else + +void ThreadPoolTask::Enqueue() +{ + // Run task on the main thread (fallback when no threading is supported) + Execute(); +} + +#endif