Files
FlaxEngine/Source/Engine/Threading/JobSystem.cpp
Ari Vuollet b0bc1fa310 Fix error when joining exited threads
The internal thread handles were cleared prematurely when attempting to join them. The handles should be also cleared when trying to kill already exited threads.
2023-01-29 21:30:01 +02:00

383 lines
8.8 KiB
C++

// Copyright (c) 2012-2023 Wojciech Figat. All rights reserved.
#include "JobSystem.h"
#include "IRunnable.h"
#include "Engine/Platform/CPUInfo.h"
#include "Engine/Platform/Thread.h"
#include "Engine/Platform/ConditionVariable.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Engine/EngineService.h"
#include "Engine/Profiler/ProfilerCPU.h"
#include "Engine/Scripting/ManagedCLR/MCore.h"
#if USE_MONO
#include "Engine/Scripting/ManagedCLR/MDomain.h"
#include <ThirdParty/mono-2.0/mono/metadata/appdomain.h>
#include <ThirdParty/mono-2.0/mono/metadata/threads.h>
#endif
// Jobs storage perf info:
// (500 jobs, i7 9th gen)
// JOB_SYSTEM_USE_MUTEX=1, enqueue=130-280 cycles, dequeue=2-6 cycles
// JOB_SYSTEM_USE_MUTEX=0, enqueue=300-700 cycles, dequeue=10-16 cycles
// So using RingBuffer+Mutex+Signals is better than moodycamel::ConcurrentQueue
#define JOB_SYSTEM_ENABLED 1
#define JOB_SYSTEM_USE_MUTEX 1
#define JOB_SYSTEM_USE_STATS 0
#if JOB_SYSTEM_USE_STATS
#include "Engine/Core/Log.h"
#endif
#if JOB_SYSTEM_USE_MUTEX
#include "Engine/Core/Collections/RingBuffer.h"
#else
#include "ConcurrentQueue.h"
#endif
#if JOB_SYSTEM_ENABLED
class JobSystemService : public EngineService
{
public:
JobSystemService()
: EngineService(TEXT("JobSystem"), -800)
{
}
bool Init() override;
void BeforeExit() override;
void Dispose() override;
};
struct JobData
{
Function<void(int32)> Job;
int32 Index;
int64 JobKey;
};
template<>
struct TIsPODType<JobData>
{
enum { Value = false };
};
class JobSystemThread : public IRunnable
{
public:
uint64 Index;
public:
// [IRunnable]
String ToString() const override
{
return TEXT("JobSystemThread");
}
int32 Run() override;
void AfterWork(bool wasKilled) override
{
Delete(this);
}
};
struct JobContext
{
volatile int64 JobsLeft;
};
template<>
struct TIsPODType<JobContext>
{
enum { Value = true };
};
namespace
{
JobSystemService JobSystemInstance;
Thread* Threads[PLATFORM_THREADS_LIMIT] = {};
int32 ThreadsCount = 0;
bool JobStartingOnDispatch = true;
volatile int64 ExitFlag = 0;
volatile int64 JobLabel = 0;
Dictionary<int64, JobContext> JobContexts;
ConditionVariable JobsSignal;
CriticalSection JobsMutex;
ConditionVariable WaitSignal;
CriticalSection WaitMutex;
CriticalSection JobsLocker;
#if JOB_SYSTEM_USE_MUTEX
RingBuffer<JobData, InlinedAllocation<256>> Jobs;
#else
ConcurrentQueue<JobData> Jobs;
#endif
#if JOB_SYSTEM_USE_STATS
int64 DequeueCount = 0;
int64 DequeueSum = 0;
#endif
}
bool JobSystemService::Init()
{
ThreadsCount = Math::Min<int32>(Platform::GetCPUInfo().LogicalProcessorCount, ARRAY_COUNT(Threads));
for (int32 i = 0; i < ThreadsCount; i++)
{
auto runnable = New<JobSystemThread>();
runnable->Index = (uint64)i;
auto thread = Thread::Create(runnable, String::Format(TEXT("Job System {0}"), i), ThreadPriority::AboveNormal);
if (thread == nullptr)
return true;
Threads[i] = thread;
}
return false;
}
void JobSystemService::BeforeExit()
{
Platform::AtomicStore(&ExitFlag, 1);
JobsSignal.NotifyAll();
}
void JobSystemService::Dispose()
{
Platform::AtomicStore(&ExitFlag, 1);
JobsSignal.NotifyAll();
Platform::Sleep(1);
for (int32 i = 0; i < ThreadsCount; i++)
{
if (Threads[i])
{
Threads[i]->Kill(true);
Delete(Threads[i]);
Threads[i] = nullptr;
}
}
}
int32 JobSystemThread::Run()
{
Platform::SetThreadAffinityMask(1ull << Index);
JobData data;
bool attachMonoThread = true;
#if !JOB_SYSTEM_USE_MUTEX
moodycamel::ConsumerToken consumerToken(Jobs);
#endif
while (Platform::AtomicRead(&ExitFlag) == 0)
{
// Try to get a job
#if JOB_SYSTEM_USE_STATS
const auto start = Platform::GetTimeCycles();
#endif
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
if (Jobs.Count() != 0)
{
data = Jobs.PeekFront();
Jobs.PopFront();
}
JobsLocker.Unlock();
#else
if (!Jobs.try_dequeue(consumerToken, data))
data.Job.Unbind();
#endif
#if JOB_SYSTEM_USE_STATS
Platform::InterlockedIncrement(&DequeueCount);
Platform::InterlockedAdd(&DequeueSum, Platform::GetTimeCycles() - start);
#endif
if (data.Job.IsBinded())
{
#if USE_MONO
// Ensure to have C# thread attached to this thead (late init due to MCore being initialized after Job System)
if (attachMonoThread && !mono_domain_get())
{
const auto domain = MCore::GetActiveDomain();
mono_thread_attach(domain->GetNative());
attachMonoThread = false;
}
#endif
// Run job
data.Job(data.Index);
// Move forward with the job queue
JobsLocker.Lock();
JobContext& context = JobContexts.At(data.JobKey);
if (Platform::InterlockedDecrement(&context.JobsLeft) <= 0)
{
ASSERT_LOW_LAYER(context.JobsLeft <= 0);
JobContexts.Remove(data.JobKey);
}
JobsLocker.Unlock();
WaitSignal.NotifyAll();
data.Job.Unbind();
}
else
{
// Wait for signal
JobsMutex.Lock();
JobsSignal.Wait(JobsMutex);
JobsMutex.Unlock();
}
}
return 0;
}
#endif
void JobSystem::Execute(const Function<void(int32)>& job, int32 jobCount)
{
// TODO: disable async if called on job thread? or maybe Wait should handle waiting in job thread to do the processing?
if (jobCount > 1)
{
// Async
const int64 jobWaitHandle = Dispatch(job, jobCount);
Wait(jobWaitHandle);
}
else if (jobCount > 0)
{
// Sync
for (int32 i = 0; i < jobCount; i++)
job(i);
}
}
int64 JobSystem::Dispatch(const Function<void(int32)>& job, int32 jobCount)
{
PROFILE_CPU();
if (jobCount <= 0)
return 0;
#if JOB_SYSTEM_ENABLED
#if JOB_SYSTEM_USE_STATS
const auto start = Platform::GetTimeCycles();
#endif
const auto label = Platform::InterlockedAdd(&JobLabel, (int64)jobCount) + jobCount;
JobData data;
data.Job = job;
data.JobKey = label;
JobContext context;
context.JobsLeft = jobCount;
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
JobContexts.Add(label, context);
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.PushBack(data);
JobsLocker.Unlock();
#else
JobsLocker.Lock();
JobContexts.Add(label, context);
JobsLocker.Unlock();
for (data.Index = 0; data.Index < jobCount; data.Index++)
Jobs.enqueue(data);
#endif
#if JOB_SYSTEM_USE_STATS
LOG(Info, "Job enqueue time: {0} cycles", (int64)(Platform::GetTimeCycles() - start));
#endif
if (JobStartingOnDispatch)
{
if (jobCount == 1)
JobsSignal.NotifyOne();
else
JobsSignal.NotifyAll();
}
return label;
#else
for (int32 i = 0; i < jobCount; i++)
job(i);
return 0;
#endif
}
void JobSystem::Wait()
{
#if JOB_SYSTEM_ENABLED
JobsLocker.Lock();
int32 numJobs = JobContexts.Count();
JobsLocker.Unlock();
while (numJobs > 0)
{
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
JobsLocker.Lock();
numJobs = JobContexts.Count();
JobsLocker.Unlock();
}
#endif
}
void JobSystem::Wait(int64 label)
{
#if JOB_SYSTEM_ENABLED
PROFILE_CPU();
while (Platform::AtomicRead(&ExitFlag) == 0)
{
JobsLocker.Lock();
const JobContext* context = JobContexts.TryGet(label);
JobsLocker.Unlock();
// Skip if context has been already executed (last job removes it)
if (!context)
break;
// Wait on signal until input label is not yet done
WaitMutex.Lock();
WaitSignal.Wait(WaitMutex, 1);
WaitMutex.Unlock();
// Wake up any thread to prevent stalling in highly multi-threaded environment
JobsSignal.NotifyOne();
}
#if JOB_SYSTEM_USE_STATS
LOG(Info, "Job average dequeue time: {0} cycles", DequeueSum / DequeueCount);
DequeueSum = DequeueCount = 0;
#endif
#endif
}
void JobSystem::SetJobStartingOnDispatch(bool value)
{
#if JOB_SYSTEM_ENABLED
JobStartingOnDispatch = value;
if (value)
{
#if JOB_SYSTEM_USE_MUTEX
JobsLocker.Lock();
const int32 count = Jobs.Count();
JobsLocker.Unlock();
#else
const int32 count = Jobs.Count();
#endif
if (count == 1)
JobsSignal.NotifyOne();
else if (count != 0)
JobsSignal.NotifyAll();
}
#endif
}
int32 JobSystem::GetThreadsCount()
{
#if JOB_SYSTEM_ENABLED
return ThreadsCount;
#else
return 0;
#endif
}