Files
FlaxEngine/Source/Engine/Threading/ThreadPool.cpp
Ari Vuollet b0bc1fa310 Fix error when joining exited threads
The internal thread handles were cleared prematurely when attempting to join them. The handles should be also cleared when trying to kill already exited threads.
2023-01-29 21:30:01 +02:00

128 lines
3.2 KiB
C++

// Copyright (c) 2012-2023 Wojciech Figat. All rights reserved.
#include "ThreadPool.h"
#include "IRunnable.h"
#include "Threading.h"
#include "ThreadPoolTask.h"
#include "ConcurrentTaskQueue.h"
#include "Engine/Core/Log.h"
#include "Engine/Core/Math/Math.h"
#include "Engine/Core/Types/String.h"
#include "Engine/Engine/Globals.h"
#include "Engine/Engine/EngineService.h"
#include "Engine/Platform/ConditionVariable.h"
#include "Engine/Platform/CPUInfo.h"
#include "Engine/Platform/Thread.h"
FLAXENGINE_API bool IsInMainThread()
{
return Globals::MainThreadID == Platform::GetCurrentThreadID();
}
namespace ThreadPoolImpl
{
volatile int64 ExitFlag = 0;
Array<Thread*> Threads;
ConcurrentTaskQueue<ThreadPoolTask> Jobs; // Hello Steve!
ConditionVariable JobsSignal;
CriticalSection JobsMutex;
}
String ThreadPoolTask::ToString() const
{
return String::Format(TEXT("Thread Pool Task ({0})"), (int32)GetState());
}
void ThreadPoolTask::Enqueue()
{
ThreadPoolImpl::Jobs.Add(this);
ThreadPoolImpl::JobsSignal.NotifyOne();
}
class ThreadPoolService : public EngineService
{
public:
ThreadPoolService()
: EngineService(TEXT("Thread Pool"), -900)
{
}
bool Init() override;
void BeforeExit() override;
void Dispose() override;
};
ThreadPoolService ThreadPoolServiceInstance;
bool ThreadPoolService::Init()
{
// Spawn threads
const int32 numThreads = Math::Clamp<int32>(Platform::GetCPUInfo().ProcessorCoreCount - 1, 2, PLATFORM_THREADS_LIMIT);
LOG(Info, "Spawning {0} Thread Pool workers", numThreads);
for (int32 i = ThreadPoolImpl::Threads.Count(); i < numThreads; i++)
{
// Create tread
auto runnable = New<SimpleRunnable>(true);
runnable->OnWork.Bind(ThreadPool::ThreadProc);
auto thread = Thread::Create(runnable, String::Format(TEXT("Thread Pool {0}"), i));
if (thread == nullptr)
{
LOG(Error, "Failed to spawn {0} thread in the Thread Pool", i + 1);
return true;
}
// Add to the list
ThreadPoolImpl::Threads.Add(thread);
}
return false;
}
void ThreadPoolService::BeforeExit()
{
// Set exit flag and wake up threads
Platform::AtomicStore(&ThreadPoolImpl::ExitFlag, 1);
ThreadPoolImpl::JobsSignal.NotifyAll();
}
void ThreadPoolService::Dispose()
{
// Set exit flag and wake up threads
Platform::AtomicStore(&ThreadPoolImpl::ExitFlag, 1);
ThreadPoolImpl::JobsSignal.NotifyAll();
// Wait some time
Platform::Sleep(10);
// Delete threads
for (int32 i = 0; i < ThreadPoolImpl::Threads.Count(); i++)
{
ThreadPoolImpl::Threads[i]->Kill(true);
}
ThreadPoolImpl::Threads.ClearDelete();
}
int32 ThreadPool::ThreadProc()
{
ThreadPoolTask* task;
// Work until end
while (Platform::AtomicRead(&ThreadPoolImpl::ExitFlag) == 0)
{
// Try to get a job
if (ThreadPoolImpl::Jobs.try_dequeue(task))
{
task->Execute();
}
else
{
ThreadPoolImpl::JobsMutex.Lock();
ThreadPoolImpl::JobsSignal.Wait(ThreadPoolImpl::JobsMutex);
ThreadPoolImpl::JobsMutex.Unlock();
}
}
return 0;
}