Refactor ThreadLocal when running on hardware with more cores than PLATFORM_THREADS_LIMIT
This commit is contained in:
@@ -6,14 +6,14 @@
|
||||
#include "Engine/Platform/Platform.h"
|
||||
|
||||
/// <summary>
|
||||
/// Per-thread local variable storage.
|
||||
/// Implemented using atomic with per-thread storage indexed via thread id hashing.
|
||||
/// ForConsider using 'THREADLOCAL' define before the variable instead.
|
||||
/// Per-thread local variable storage for basic types (POD). Implemented using atomic with per-thread storage indexed via thread id hashing. Consider using 'THREADLOCAL' define before the variable instead.
|
||||
/// </summary>
|
||||
template<typename T, int32 MaxThreads = PLATFORM_THREADS_LIMIT, bool ClearMemory = true>
|
||||
template<typename T, int32 MaxThreads = PLATFORM_THREADS_LIMIT>
|
||||
class ThreadLocal
|
||||
{
|
||||
protected:
|
||||
constexpr static int32 DynamicMaxThreads = 1024;
|
||||
static_assert(TIsPODType<T>::Value, "Only POD types are supported");
|
||||
|
||||
struct Bucket
|
||||
{
|
||||
@@ -21,34 +21,29 @@ protected:
|
||||
T Value;
|
||||
};
|
||||
|
||||
Bucket _buckets[MaxThreads];
|
||||
Bucket _staticBuckets[MaxThreads];
|
||||
Bucket* _dynamicBuckets = nullptr;
|
||||
|
||||
public:
|
||||
|
||||
ThreadLocal()
|
||||
{
|
||||
// Clear buckets
|
||||
if (ClearMemory)
|
||||
{
|
||||
Platform::MemoryClear(_buckets, sizeof(_buckets));
|
||||
}
|
||||
else
|
||||
{
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
_buckets[i].ThreadID = 0;
|
||||
}
|
||||
Platform::MemoryClear(_staticBuckets, sizeof(_staticBuckets));
|
||||
}
|
||||
|
||||
~ThreadLocal()
|
||||
{
|
||||
Platform::Free(_dynamicBuckets);
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
T& Get()
|
||||
FORCE_INLINE T& Get()
|
||||
{
|
||||
return _buckets[GetIndex()].Value;
|
||||
return GetBucket().Value;
|
||||
}
|
||||
|
||||
void Set(const T& value)
|
||||
FORCE_INLINE void Set(const T& value)
|
||||
{
|
||||
_buckets[GetIndex()].Value = value;
|
||||
GetBucket().Value = value;
|
||||
}
|
||||
|
||||
int32 Count() const
|
||||
@@ -56,9 +51,17 @@ public:
|
||||
int32 result = 0;
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
{
|
||||
if (Platform::AtomicRead((int64 volatile*)&_buckets[i].ThreadID) != 0)
|
||||
if (Platform::AtomicRead((int64 volatile*)&_staticBuckets[i].ThreadID) != 0)
|
||||
result++;
|
||||
}
|
||||
if (auto dynamicBuckets = (Bucket*)Platform::AtomicRead((intptr volatile*)&_dynamicBuckets))
|
||||
{
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
{
|
||||
if (Platform::AtomicRead((int64 volatile*)&dynamicBuckets[i].ThreadID) != 0)
|
||||
result++;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -67,89 +70,72 @@ public:
|
||||
{
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
{
|
||||
if (Platform::AtomicRead((int64 volatile*)&_buckets[i].ThreadID) != 0)
|
||||
result.Add(_buckets[i].Value);
|
||||
if (Platform::AtomicRead((int64 volatile*)&_staticBuckets[i].ThreadID) != 0)
|
||||
result.Add(_staticBuckets[i].Value);
|
||||
}
|
||||
if (auto dynamicBuckets = (Bucket*)Platform::AtomicRead((intptr volatile*)&_dynamicBuckets))
|
||||
{
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
{
|
||||
if (Platform::AtomicRead((int64 volatile*)&dynamicBuckets[i].ThreadID) != 0)
|
||||
result.Add(dynamicBuckets[i].Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Clear()
|
||||
{
|
||||
Platform::MemoryClear(_staticBuckets, sizeof(_staticBuckets));
|
||||
Platform::Free(_dynamicBuckets);
|
||||
_dynamicBuckets = nullptr;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
FORCE_INLINE static int32 Hash(const int64 value)
|
||||
Bucket& GetBucket()
|
||||
{
|
||||
return value & (MaxThreads - 1);
|
||||
}
|
||||
const int64 key = (int64)Platform::GetCurrentThreadID();
|
||||
|
||||
FORCE_INLINE int32 GetIndex()
|
||||
{
|
||||
int64 key = (int64)Platform::GetCurrentThreadID();
|
||||
auto index = Hash(key);
|
||||
while (true)
|
||||
// Search statically allocated buckets
|
||||
int32 index = (int32)(key & (MaxThreads - 1));
|
||||
int32 spaceLeft = MaxThreads;
|
||||
while (spaceLeft)
|
||||
{
|
||||
const int64 value = Platform::AtomicRead(&_buckets[index].ThreadID);
|
||||
const int64 value = Platform::AtomicRead(&_staticBuckets[index].ThreadID);
|
||||
if (value == key)
|
||||
break;
|
||||
if (value == 0 && Platform::InterlockedCompareExchange(&_buckets[index].ThreadID, key, 0) == 0)
|
||||
break;
|
||||
index = Hash(index + 1);
|
||||
return _staticBuckets[index];
|
||||
if (value == 0 && Platform::InterlockedCompareExchange(&_staticBuckets[index].ThreadID, key, 0) == 0)
|
||||
return _staticBuckets[index];
|
||||
index = (index + 1) & (MaxThreads - 1);
|
||||
spaceLeft--;
|
||||
}
|
||||
return index;
|
||||
}
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Per thread local object
|
||||
/// </summary>
|
||||
template<typename T, int32 MaxThreads = PLATFORM_THREADS_LIMIT>
|
||||
class ThreadLocalObject : public ThreadLocal<T*, MaxThreads>
|
||||
{
|
||||
public:
|
||||
|
||||
typedef ThreadLocal<T*, MaxThreads> Base;
|
||||
|
||||
public:
|
||||
|
||||
void Delete()
|
||||
{
|
||||
auto value = Base::Get();
|
||||
Base::SetAll(nullptr);
|
||||
::Delete(value);
|
||||
}
|
||||
|
||||
void DeleteAll()
|
||||
{
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
// Allocate dynamic buckets if missing
|
||||
DYNAMIC:
|
||||
auto dynamicBuckets = (Bucket*)Platform::AtomicRead((intptr volatile*)&_dynamicBuckets);
|
||||
if (!dynamicBuckets)
|
||||
{
|
||||
auto& bucket = Base::_buckets[i];
|
||||
if (bucket.Value != nullptr)
|
||||
dynamicBuckets = (Bucket*)Platform::Allocate(DynamicMaxThreads * sizeof(Bucket), 16);
|
||||
Platform::MemoryClear(dynamicBuckets, DynamicMaxThreads * sizeof(Bucket));
|
||||
if (Platform::InterlockedCompareExchange((intptr volatile*)&_dynamicBuckets, (intptr)dynamicBuckets, 0) != 0)
|
||||
{
|
||||
::Delete(bucket.Value);
|
||||
bucket.ThreadID = 0;
|
||||
bucket.Value = nullptr;
|
||||
Platform::Free(dynamicBuckets);
|
||||
goto DYNAMIC;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<typename AllocationType = HeapAllocation>
|
||||
void GetNotNullValues(Array<T*, AllocationType>& result) const
|
||||
{
|
||||
result.EnsureCapacity(MaxThreads);
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
// Search dynamically allocated buckets
|
||||
index = (int32)(key & (DynamicMaxThreads - 1));
|
||||
spaceLeft = DynamicMaxThreads;
|
||||
while (spaceLeft)
|
||||
{
|
||||
if (Base::_buckets[i].Value != nullptr)
|
||||
{
|
||||
result.Add(Base::_buckets[i].Value);
|
||||
}
|
||||
const int64 value = Platform::AtomicRead(&dynamicBuckets[index].ThreadID);
|
||||
if (value == key)
|
||||
return dynamicBuckets[index];
|
||||
if (value == 0 && Platform::InterlockedCompareExchange(&dynamicBuckets[index].ThreadID, key, 0) == 0)
|
||||
return dynamicBuckets[index];
|
||||
index = (index + 1) & (DynamicMaxThreads - 1);
|
||||
spaceLeft--;
|
||||
}
|
||||
}
|
||||
|
||||
int32 CountNotNullValues() const
|
||||
{
|
||||
int32 result = 0;
|
||||
for (int32 i = 0; i < MaxThreads; i++)
|
||||
{
|
||||
if (Base::_buckets[i].Value != nullptr)
|
||||
result++;
|
||||
}
|
||||
return result;
|
||||
return *(Bucket*)nullptr;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user