You're breathtaking!

This commit is contained in:
Wojtek Figat
2020-12-07 23:40:54 +01:00
commit 6fb9eee74c
5143 changed files with 1153594 additions and 0 deletions

View File

@@ -0,0 +1,398 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Platform/Platform.h"
#include "Engine/Platform/CriticalSection.h"
#include "Engine/Core/Memory/Allocation.h"
#include "Engine/Core/Math/Math.h"
#include "Engine/Core/Core.h"
/// <summary>
/// The concurrent data buffer allows to implement asynchronous data writing to the linear buffer by more than one worker thread at once.
/// Supports only value types that don't require constructor/destructor invocation.
/// </summary>
template<typename T>
class ConcurrentBuffer
{
friend ConcurrentBuffer;
private:
int64 _count;
int64 _capacity;
T* _data;
CriticalSection _resizeLocker;
public:
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentBuffer"/> class.
/// </summary>
ConcurrentBuffer()
: _count(0)
, _capacity(0)
, _data(nullptr)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentBuffer"/> class.
/// </summary>
/// <param name="capacity">The capacity.</param>
ConcurrentBuffer(int32 capacity)
: _count(0)
, _capacity(capacity)
{
if (_capacity > 0)
_data = (T*)Allocator::Allocate(_capacity * sizeof(T));
else
_data = nullptr;
}
/// <summary>
/// Finalizes an instance of the <see cref="ConcurrentBuffer"/> class.
/// </summary>
~ConcurrentBuffer()
{
Allocator::Free(_data);
}
public:
/// <summary>
/// Gets the amount of the elements in the collection.
/// </summary>
/// <returns>The items count.</returns>
FORCE_INLINE int64 Count()
{
return Platform::AtomicRead(&_count);
}
/// <summary>
/// Get amount of the elements that can be holed by collection without resizing.
/// </summary>
/// <returns>the items capacity.</returns>
FORCE_INLINE int64 Capacity() const
{
return _capacity;
}
/// <summary>
/// Determines whether this collection isn't empty.
/// </summary>
/// <returns><c>true</c> if this collection has elements; otherwise, <c>false</c>.</returns>
FORCE_INLINE bool HasItems() const
{
return _count != 0;
}
/// <summary>
/// Determines whether this collection is empty.
/// </summary>
/// <returns><c>true</c> if this collection is empty; otherwise, <c>false</c>.</returns>
FORCE_INLINE bool IsEmpty() const
{
return _count == 0;
}
/// <summary>
/// Gets the pointer to the first element in the collection.
/// </summary>
/// <returns>The allocation start.</returns>
FORCE_INLINE T* Get()
{
return _data;
}
/// <summary>
/// Gets the pointer to the first element in the collection.
/// </summary>
/// <returns>The allocation start.</returns>
FORCE_INLINE const T* Get() const
{
return _data;
}
/// <summary>
/// Gets the last element.
/// </summary>
/// <returns>The last element reference.</returns>
FORCE_INLINE T& Last()
{
ASSERT(_count > 0);
return _data[_count - 1];
}
/// <summary>
/// Gets the last element.
/// </summary>
/// <returns>The last element reference.</returns>
FORCE_INLINE const T& Last() const
{
ASSERT(_count > 0);
return _data[_count - 1];
}
/// <summary>
/// Gets the first element.
/// </summary>
/// <returns>The first element reference.</returns>
FORCE_INLINE T& First()
{
ASSERT(_count > 0);
return _data[0];
}
/// <summary>
/// Gets the first element.
/// </summary>
/// <returns>The first element reference.</returns>
FORCE_INLINE const T& First() const
{
ASSERT(_count > 0);
return _data[0];
}
/// <summary>
/// Get or sets element by the index.
/// </summary>
/// <param name="index">The index.</param>
/// <returns>The item reference.</returns>
FORCE_INLINE T& operator[](int64 index)
{
ASSERT(index >= 0 && index < _count);
return _data[index];
}
/// <summary>
/// Get or sets element by the index.
/// </summary>
/// <param name="index">The index.</param>
/// <returns>The item reference (constant).</returns>
FORCE_INLINE const T& operator[](int64 index) const
{
ASSERT(index >= 0 && index < _count);
return _data[index];
}
public:
/// <summary>
/// Clear the collection but without changing its capacity.
/// </summary>
FORCE_INLINE void Clear()
{
Platform::InterlockedExchange(&_count, 0);
}
/// <summary>
/// Releases this buffer data.
/// </summary>
void Release()
{
_resizeLocker.Lock();
Allocator::Free(_data);
_data = nullptr;
_capacity = 0;
_count = 0;
_resizeLocker.Unlock();
}
/// <summary>
/// Sets the custom size of the collection. Only for the custom usage with dedicated data.
/// </summary>
/// <param name="size">The size.</param>
void SetSize(int32 size)
{
ASSERT(size >= 0 && size <= _capacity);
_count = size;
}
/// <summary>
/// Adds the single item to the collection. Handles automatic buffer resizing. Thread-safe function that can be called from many threads at once.
/// </summary>
/// <param name="item">The item to add.</param>
/// <returns>The index of the added item.</returns>
FORCE_INLINE int64 Add(const T& item)
{
return Add(&item, 1);
}
/// <summary>
/// Adds the array of items to the collection. Handles automatic buffer resizing. Thread-safe function that can be called from many threads at once.
/// </summary>
/// <param name="items">The collection of items to add.</param>
/// <param name="count">The items count.</param>
/// <returns>The index of the added first item.</returns>
int64 Add(const T* items, int32 count)
{
const int64 index = Platform::InterlockedAdd(&_count, (int64)count);
const int64 newCount = index + (int64)count;
EnsureCapacity(newCount);
Memory::CopyItems(_data + index, items, count);
return index;
}
// Add collection of items to the collection
// @param collection Array with the items to add
FORCE_INLINE void Add(ConcurrentBuffer<T>& collection)
{
Add(collection.Get(), collection.Count());
}
/// <summary>
/// Adds the given amount of items to the collection.
/// </summary>
/// <param name="count">The items count.</param>
/// <returns>The index of the added first item.</returns>
int64 AddDefault(int32 count = 1)
{
const int64 index = Platform::InterlockedAdd(&_count, (int64)count);
const int64 newCount = index + (int64)count;
EnsureCapacity(newCount);
Memory::ConstructItems(_data + newCount, count);
return index;
}
/// <summary>
/// Adds the one item to the collection and returns the reference to it.
/// </summary>
/// <returns>The reference to the added item.</returns>
FORCE_INLINE T& AddOne()
{
const int64 index = Platform::InterlockedAdd(&_count, 1);
const int64 newCount = index + 1;
EnsureCapacity(newCount);
Memory::ConstructItems(_data + index, 1);
return _data[index - 1];
}
/// <summary>
/// Adds the new items to the end of the collection, possibly reallocating the whole collection to fit. The new items will be zeroed.
/// </summary>
/// Warning! AddZeroed() will create items without calling the constructor and this is not appropriate for item types that require a constructor to function properly.
/// <remarks>
/// </remarks>
/// <param name="count">The number of new items to add.</param>
/// <returns>The index of the added first item.</returns>
int64 AddZeroed(int32 count = 1)
{
const int64 index = Platform::InterlockedAdd(&_count, 1);
const int64 newCount = index + 1;
EnsureCapacity(newCount);
Platform::MemoryClear(Get() + index, count * sizeof(T));
return _data[index - 1];
}
/// <summary>
/// Ensures that the buffer has the given the capacity (equal or more). Preserves the existing items by copy operation.
/// </summary>
/// <param name="minCapacity">The minimum capacity.</param>
void EnsureCapacity(int64 minCapacity)
{
// Early out
if (_capacity >= minCapacity)
return;
_resizeLocker.Lock();
// Skip if the other thread performed resizing
if (_capacity >= minCapacity)
{
_resizeLocker.Unlock();
return;
}
// Compute the new capacity
int64 newCapacity = _capacity == 0 ? 8 : Math::RoundUpToPowerOf2(_capacity) * 2;
if (newCapacity < minCapacity)
{
newCapacity = minCapacity;
}
ASSERT(newCapacity > _capacity);
// Allocate new data
T* newData = nullptr;
if (newCapacity > 0)
{
newData = (T*)Allocator::Allocate(newCapacity * sizeof(T));
}
// Check if has data
if (_data)
{
// Check if preserve contents
if (newData && _count > 0)
{
Platform::MemoryCopy(newData, _data, _capacity * sizeof(T));
}
// Delete old data
Allocator::Free(_data);
}
// Set state
_data = newData;
_capacity = newCapacity;
_resizeLocker.Unlock();
}
/// <summary>
/// Swaps the contents of buffer with the other object without copy operation. Performs fast internal data exchange.
/// </summary>
/// <param name="other">The other buffer.</param>
void Swap(ConcurrentBuffer& other)
{
const auto count = _count;
const auto capacity = _capacity;
const auto data = _data;
_count = other._count;
_capacity = other._capacity;
_data = other._data;
other._count = count;
other._capacity = capacity;
other._data = data;
}
public:
/// <summary>
/// Checks if the given element is in the collection.
/// </summary>
/// <param name="item">The item.</param>
/// <returns><c>true</c> if collection contains the specified item; otherwise, <c>false</c>.</returns>
bool Contains(const T& item) const
{
for (int32 i = 0; i < _count; i++)
{
if (_data[i] == item)
{
return true;
}
}
return false;
}
/// <summary>
/// Searches for the specified object and returns the zero-based index of the first occurrence within the entire collection.
/// </summary>
/// <param name="item">The item.</param>
/// <returns>The zero-based index of the first occurrence of itm within the entire collection, if found; otherwise, INVALID_INDEX.</returns>
int32 IndexOf(const T& item) const
{
for (int32 i = 0; i < _count; i++)
{
if (_data[i] == item)
{
return i;
}
}
return INVALID_INDEX;
}
};

View File

@@ -0,0 +1,59 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Core/Memory/Memory.h"
#include <ThirdParty/concurrentqueue.h>
/// <summary>
/// The default engine configuration for concurrentqueue.
/// </summary>
struct ConcurrentQueueSettings : public moodycamel::ConcurrentQueueDefaultTraits
{
// Use bigger blocks
static const size_t BLOCK_SIZE = 256;
// Use default engine memory allocator
static inline void* malloc(size_t size)
{
return Allocator::Allocate((uint64)size);
}
// Use default engine memory allocator
static inline void free(void* ptr)
{
return Allocator::Free(ptr);
}
};
/// <summary>
/// Lock-free implementation of thread-safe queue.
/// Based on: https://github.com/cameron314/concurrentqueue
/// </summary>
template<typename T>
class ConcurrentQueue : public moodycamel::ConcurrentQueue<T, ConcurrentQueueSettings>
{
public:
typedef moodycamel::ConcurrentQueue<T, ConcurrentQueueSettings> Base;
public:
/// <summary>
/// Gets an estimate of the total number of elements currently in the queue.
/// </summary>
/// <returns>The items count.</returns>
FORCE_INLINE int32 Count() const
{
return static_cast<int32>(Base::size_approx());
}
/// <summary>
/// Adds item to the collection.
/// </summary>
/// <param name="item">The item to add.</param>
FORCE_INLINE void Add(T& item)
{
enqueue(item);
}
};

View File

@@ -0,0 +1,40 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "ConcurrentQueue.h"
#include "Task.h"
/// <summary>
/// Lock-free implementation of thread-safe tasks queue.
/// </summary>
template<typename T = Task>
class ConcurrentTaskQueue : public ConcurrentQueue<T*>
{
public:
/// <summary>
/// Adds item to the collection (thread-safe).
/// </summary>
/// <param name="item">Item to add.</param>
FORCE_INLINE void Add(T* item)
{
ConcurrentQueue<T*>::enqueue(item);
}
/// <summary>
/// Cancels all the tasks from the queue and removes them.
/// </summary>
void CancelAll()
{
T* tasks[16];
std::size_t count;
while ((count = ConcurrentQueue<T*>::try_dequeue_bulk(tasks, ARRAY_COUNT(tasks))) != 0)
{
for (std::size_t i = 0; i != count; i++)
{
tasks[i]->Cancel();
}
}
}
};

View File

@@ -0,0 +1,98 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Core/Types/BaseTypes.h"
#include "Engine/Core/Types/String.h"
#include "Engine/Core/Object.h"
#include "Engine/Core/Delegate.h"
/// <summary>
/// Interface for runnable objects for multi-threading purposes.
/// </summary>
class IRunnable : public Object
{
public:
// Virtual destructor
virtual ~IRunnable()
{
}
// Initializes the runnable object
// @return True if initialization was successful, otherwise false
virtual bool Init()
{
return true;
}
// Runs the runnable object.
// @return The exit code
virtual int32 Run() = 0;
// Stops the runnable object. Called when thread is being terminated
virtual void Stop()
{
}
// Exits the runnable object
virtual void Exit()
{
}
// Called when thread ends work (via Kill or normally)
// @param wasKilled True if thead has been killed
virtual void AfterWork(bool wasKilled)
{
}
};
/// <summary>
/// Simple runnable object for single function bind
/// </summary>
class SimpleRunnable : public IRunnable
{
private:
bool _autoDelete;
public:
/// <summary>
/// Working function
/// </summary>
Function<int32()> OnWork;
public:
/// <summary>
/// Init
/// </summary>
/// <param name="autoDelete">True if delete itself after work.</param>
SimpleRunnable(bool autoDelete)
: _autoDelete(autoDelete)
{
}
public:
// [IRunnable]
String ToString() const override
{
return TEXT("SimpleRunnable");
}
int32 Run() override
{
int32 result = -1;
if (OnWork.IsBinded())
result = OnWork();
return result;
}
void AfterWork(bool wasKilled) override
{
if (_autoDelete)
Delete(this);
}
};

View File

@@ -0,0 +1,25 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#include "MainThreadTask.h"
#include "ConcurrentTaskQueue.h"
#include "Engine/Profiler/ProfilerCPU.h"
ConcurrentTaskQueue<MainThreadTask> MainThreadTasks;
void MainThreadTask::RunAll()
{
// TODO: use bulk dequeue
PROFILE_CPU();
MainThreadTask* task;
while (MainThreadTasks.try_dequeue(task))
{
task->Execute();
}
}
void MainThreadTask::Enqueue()
{
MainThreadTasks.Add(this);
}

View File

@@ -0,0 +1,153 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Task.h"
#include "Engine/Core/Types/String.h"
// Invokes a target method on a main thread (using task or directly if already on main thread)
// Example: INVOKE_ON_MAIN_THREAD(Collector, Collector::SyncData, this);
#define INVOKE_ON_MAIN_THREAD(targetType, targetMethod, targetObject) \
if (IsInMainThread()) \
{ \
targetObject->targetMethod(); \
} else { \
Function<void()> action; \
action.Bind<targetType, &targetMethod>(targetObject); \
Task::StartNew(New<MainThreadActionTask>(action))->Wait(); \
}
/// <summary>
/// General purpose task executed on Main Thread in the beginning of the next frame.
/// </summary>
/// <seealso cref="Task" />
class FLAXENGINE_API MainThreadTask : public Task
{
friend class Engine;
protected:
/// <summary>
/// Initializes a new instance of the <see cref="MainThreadTask"/> class.
/// </summary>
MainThreadTask()
: Task()
{
}
private:
/// <summary>
/// Runs all main thread tasks. Called only by the Engine class.
/// </summary>
static void RunAll();
public:
// [Task]
String ToString() const override
{
return String::Format(TEXT("Main Thread Task ({0})"), ::ToString(GetState()));
}
bool HasReference(Object* obj) const override
{
return false;
}
protected:
// [Task]
void Enqueue() override;
};
/// <summary>
/// General purpose task executing custom action using Main Thread in the beginning of the next frame.
/// </summary>
/// <seealso cref="MainThreadTask" />
/// <seealso cref="Task" />
class FLAXENGINE_API MainThreadActionTask : public MainThreadTask
{
protected:
Function<void()> _action1;
Function<bool()> _action2;
Object* _target;
public:
/// <summary>
/// Initializes a new instance of the <see cref="MainThreadActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
MainThreadActionTask(Function<void()>& action, Object* target = nullptr)
: MainThreadTask()
, _action1(action)
, _target(target)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="MainThreadActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
MainThreadActionTask(Function<void()>::Signature action, Object* target = nullptr)
: MainThreadTask()
, _action1(action)
, _target(target)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="MainThreadActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
MainThreadActionTask(Function<bool()>& action, Object* target = nullptr)
: MainThreadTask()
, _action2(action)
, _target(target)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="MainThreadActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
MainThreadActionTask(Function<bool()>::Signature action, Object* target = nullptr)
: MainThreadTask()
, _action2(action)
, _target(target)
{
}
protected:
// [MainThreadTask]
bool Run() override
{
if (_action1.IsBinded())
{
_action1();
return false;
}
if (_action2.IsBinded())
{
return _action2();
}
return true;
}
public:
// [MainThreadTask]
bool HasReference(Object* obj) const override
{
return obj == _target;
}
};

View File

@@ -0,0 +1,263 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#include "Task.h"
#include "ThreadPoolTask.h"
#include "Engine/Core/Log.h"
#include "Engine/Platform/Platform.h"
#include "Engine/Core/Types/DateTime.h"
#include "Engine/Core/Collections/Array.h"
#include "Engine/Core/Math/Math.h"
void Task::Start()
{
ASSERT(GetState() == TaskState::Created);
OnStart();
// Change state
_state = TaskState::Queued;
// Add task to the execution queue
Enqueue();
}
void Task::Cancel()
{
if (Platform::AtomicRead(&_cancelFlag) == 0)
{
// Send event
OnCancel();
// Cancel child
if (_continueWith)
_continueWith->Cancel();
}
}
bool Task::Wait(double timeoutMilliseconds) const
{
double startTime = Platform::GetTimeSeconds() * 0.001;
// TODO: no active waiting! use a semaphore!
do
{
auto state = GetState();
// Finished
if (state == TaskState::Finished)
{
// Wait for child if has
if (_continueWith)
{
auto spendTime = Platform::GetTimeSeconds() * 0.001 - startTime;
return _continueWith->Wait(timeoutMilliseconds - spendTime);
}
return false;
}
// Failed or canceled
if (state == TaskState::Failed || state == TaskState::Canceled)
return true;
Platform::Sleep(1);
} while (timeoutMilliseconds <= 0.0 || Platform::GetTimeSeconds() * 0.001 - startTime < timeoutMilliseconds);
// Timeout reached!
LOG(Warning, "\'{0}\' has timed out. Wait time: {1} ms", ToString(), timeoutMilliseconds);
return true;
}
Task* Task::ContinueWith(Task* task)
{
ASSERT(task != nullptr && task != this);
if (_continueWith)
return _continueWith->ContinueWith(task);
_continueWith = task;
return task;
}
Task* Task::ContinueWith(const Action& action, Object* target)
{
// Get binded functions
Array<Action::FunctionType> bindings;
bindings.Resize(action.Count());
action.GetBindings(bindings.Get(), bindings.Count());
// Continue with every action
Task* result = this;
for (int32 i = 0; i < bindings.Count(); i++)
result = result->ContinueWith(bindings[i], target);
return result;
}
Task* Task::ContinueWith(Function<void()> action, Object* target)
{
ASSERT(action.IsBinded());
return ContinueWith(New<ThreadPoolActionTask>(action, target));
}
Task* Task::ContinueWith(Function<bool()> action, Object* target)
{
ASSERT(action.IsBinded());
return ContinueWith(New<ThreadPoolActionTask>(action, target));
}
Task* Task::Delay(int32 milliseconds)
{
class DelayTask : public ThreadPoolTask
{
private:
int32 _milliseconds;
DateTime _startTimeUTC;
public:
DelayTask(int32 milliseconds)
: _milliseconds(milliseconds)
{
}
protected:
// [ThreadPoolTask]
bool Run() override
{
// Take into account the different between task enqueue (OnStart event) and the actual task execution
auto diff = DateTime::NowUTC() - _startTimeUTC;
auto ms = Math::Max(0, _milliseconds - (int32)diff.GetTotalMilliseconds());
Platform::Sleep(ms);
return false;
}
void OnStart() override
{
_startTimeUTC = DateTime::NowUTC();
}
};
return New<DelayTask>(milliseconds);
}
Task* Task::StartNew(Task* task)
{
ASSERT(task);
task->Start();
return task;
}
Task* Task::StartNew(Function<void()>& action, Object* target)
{
return StartNew(New<ThreadPoolActionTask>(action, target));
}
Task* Task::StartNew(Function<void()>::Signature action, Object* target)
{
return StartNew(New<ThreadPoolActionTask>(action, target));
}
Task* Task::StartNew(Function<bool()>& action, Object* target)
{
return StartNew(New<ThreadPoolActionTask>(action, target));
}
Task* Task::StartNew(Function<bool()>::Signature& action, Object* target)
{
return StartNew(New<ThreadPoolActionTask>(action, target));
}
void Task::Execute()
{
// Begin
if (IsCanceled())
return;
ASSERT(IsQueued());
_state = TaskState::Running;
// Perform an operation
bool failed = Run();
// Process result
if (IsCancelRequested())
{
_state = TaskState::Canceled;
}
else if (failed)
{
OnFail();
}
else
{
OnFinish();
}
}
void Task::OnStart()
{
}
void Task::OnFinish()
{
ASSERT(IsRunning());
ASSERT(!IsCancelRequested());
_state = TaskState::Finished;
// Send event further
if (_continueWith)
_continueWith->Start();
OnEnd();
}
void Task::OnFail()
{
_state = TaskState::Failed;
// Send event further
if (_continueWith)
_continueWith->OnFail();
OnEnd();
}
void Task::OnCancel()
{
// Set flag
Platform::InterlockedIncrement(&_cancelFlag);
Platform::MemoryBarrier();
// If task is active try to wait for a while
if (IsRunning())
{
// Wait for it a little bit
const double timeout = 2000.0;
LOG(Warning, "Cannot cancel \'{0}\' because it's still running, waiting for end with timeout: {1} ms", ToString(), timeout);
Wait(timeout);
}
// Don't call OnEnd twice
const auto state = GetState();
if (state != TaskState::Finished && state != TaskState::Failed)
{
_state = TaskState::Canceled;
OnEnd();
}
}
void Task::OnEnd()
{
ASSERT(!IsRunning());
// Add to delete
DeleteObject(30.0f, false);
}

View File

@@ -0,0 +1,357 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Core/Object.h"
#include "Engine/Core/Delegate.h"
#include "Engine/Core/NonCopyable.h"
#include "Engine/Core/Enums.h"
#include "Engine/Core/Types/TimeSpan.h"
#include "Engine/Core/Collections/Array.h"
#include "Engine/Platform/Platform.h"
/// <summary>
/// Represents the current stage in the lifecycle of a Task.
/// </summary>
DECLARE_ENUM_EX_6(TaskState, int64, 0, Created, Failed, Canceled, Queued, Running, Finished);
/// <summary>
/// Represents an asynchronous operation.
/// </summary>
class FLAXENGINE_API Task : public RemovableObject, public NonCopyable
{
//
// Tasks execution and states flow:
//
// Task() [Created]
// \/
// Start() [Queued]
// \/
// Run() [Running]
// |
// ------------------------
// \/ \/
// Finish() [Finished] Fail/Cancel() [Failed/Canceled]
// \/ \/
// child.Start() child.Cancel()
// | \/
// -------------------------
// \/
// End()
//
protected:
/// <summary>
/// The cancel flag used to indicate that there is request to cancel task operation.
/// </summary>
volatile int64 _cancelFlag;
/// <summary>
/// The current task state.
/// </summary>
volatile TaskState _state;
/// <summary>
/// The task to start after finish.
/// </summary>
Task* _continueWith;
protected:
/// <summary>
/// Initializes a new instance of the <see cref="Task"/> class.
/// </summary>
Task()
: _cancelFlag(0)
, _state(TaskState::Created)
, _continueWith(nullptr)
{
}
public:
/// <summary>
/// Gets work state
/// </summary>
/// <returns>State</returns>
FORCE_INLINE TaskState GetState() const
{
return static_cast<TaskState>(Platform::AtomicRead((int64 volatile*)&_state));
}
/// <summary>
/// Determines whether the specified object has reference to the given object.
/// </summary>
/// <param name="obj">The object.</param>
/// <returns>True if the specified object has reference to the given object, otherwise false.</returns>
virtual bool HasReference(Object* obj) const
{
return false;
}
/// <summary>
/// Gets the task to start after this one.
/// </summary>
/// <returns>The next task.</returns>
FORCE_INLINE Task* GetContinueWithTask() const
{
return _continueWith;
}
public:
/// <summary>
/// Checks if operation failed
/// </summary>
/// <returns>True if operation failed, otherwise false</returns>
FORCE_INLINE bool IsFailed() const
{
return GetState() == TaskState::Failed;
}
/// <summary>
/// Checks if operation has been canceled
/// </summary>
/// <returns>True if operation has been canceled, otherwise false</returns>
FORCE_INLINE bool IsCanceled() const
{
return GetState() == TaskState::Canceled;
}
/// <summary>
/// Checks if operation has been queued
/// </summary>
/// <returns>True if operation has been queued, otherwise false</returns>
FORCE_INLINE bool IsQueued() const
{
return GetState() == TaskState::Queued;
}
/// <summary>
/// Checks if operation is running
/// </summary>
/// <returns>True if operation is running, otherwise false</returns>
FORCE_INLINE bool IsRunning() const
{
return GetState() == TaskState::Running;
}
/// <summary>
/// Checks if operation has been finished
/// </summary>
/// <returns>True if operation has been finished, otherwise false</returns>
FORCE_INLINE bool IsFinished() const
{
return GetState() == TaskState::Finished;
}
/// <summary>
/// Checks if operation has been ended (via cancel, fail or finish).
/// </summary>
/// <returns>True if operation has been ended, otherwise false</returns>
bool IsEnded() const
{
auto state = GetState();
return state == TaskState::Failed || state == TaskState::Canceled || state == TaskState::Finished;
}
/// <summary>
/// Returns true if task has been requested to cancel it's operation.
/// </summary>
/// <returns>True if task has been canceled and should stop it's work without calling child task start.</returns>
FORCE_INLINE bool IsCancelRequested()
{
return Platform::AtomicRead(&_cancelFlag) != 0;
}
public:
/// <summary>
/// Starts this task execution (and will continue with all children).
/// </summary>
void Start();
/// <summary>
/// Cancels this task (and all child tasks).
/// </summary>
void Cancel();
/// <summary>
/// Waits the specified timeout for the task to be finished.
/// </summary>
/// <param name="timeout">The maximum amount of time to wait for the task to finish it's job. Timeout smaller/equal 0 will result in infinite waiting.</param>
/// <returns>True if task failed or has been canceled or has timeout, otherwise false.</returns>
FORCE_INLINE bool Wait(const TimeSpan& timeout) const
{
return Wait(timeout.GetTotalMilliseconds());
}
/// <summary>
/// Waits the specified timeout for the task to be finished.
/// </summary>
/// <param name="timeoutMilliseconds">The maximum amount of milliseconds to wait for the task to finish it's job. Timeout smaller/equal 0 will result in infinite waiting.</param>
/// <returns>True if task failed or has been canceled or has timeout, otherwise false.</returns>
bool Wait(double timeoutMilliseconds = -1) const;
/// <summary>
/// Waits for all the tasks from the list.
/// </summary>
/// <param name="tasks">The tasks list to wait for.</param>
/// <param name="timeoutMilliseconds">The maximum amount of milliseconds to wait for the task to finish it's job. Timeout smaller/equal 0 will result in infinite waiting.</param>
/// <returns>True if any task failed or has been canceled or has timeout, otherwise false.</returns>
template<class T = Task>
static bool WaitAll(Array<T*>& tasks, double timeoutMilliseconds = -1)
{
for (int32 i = 0; i < tasks.Count(); i++)
{
if (tasks[i]->Wait())
return true;
}
return false;
}
public:
/// <summary>
/// Continues that task execution with a given task (will call Start on given task after finishing that one).
/// </summary>
/// <param name="task">The task to Start after current finish (will propagate OnCancel event if need to).</param>
/// <returns>Enqueued task.</returns>
Task* ContinueWith(Task* task);
/// <summary>
/// Continues that task execution with a given action (will spawn new async action).
/// </summary>
/// <param name="action">Action to run.</param>
/// <param name="target">The action target object.</param>
/// <returns>Enqueued task.</returns>
Task* ContinueWith(const Action& action, Object* target = nullptr);
/// <summary>
/// Continues that task execution with a given action (will spawn new async action).
/// </summary>
/// <param name="action">Action to run.</param>
/// <param name="target">The action target object.</param>
/// <returns>Enqueued task.</returns>
Task* ContinueWith(Function<void()> action, Object* target = nullptr);
/// <summary>
/// Continues that task execution with a given action (will spawn new async action).
/// </summary>
/// <param name="action">Action to run.</param>
/// <param name="target">The action target object.</param>
/// <returns>Enqueued task.</returns>
Task* ContinueWith(Function<bool()> action, Object* target = nullptr);
public:
/// <summary>
/// Creates a task that completes after a specified time interval (not started).
/// </summary>
/// <param name="delay">The time span to wait before completing the returned task.</param>
/// <returns>A task that represents the time delay (not started).</returns>
FORCE_INLINE static Task* Delay(const TimeSpan& delay)
{
return Delay(static_cast<int32>(delay.GetTotalMilliseconds()));
}
/// <summary>
/// Creates a task that completes after a specified time interval (not started).
/// </summary>
/// <param name="milliseconds">The amount of milliseconds to wait before completing the returned task.</param>
/// <returns>A task that represents the time delay (not started).</returns>
static Task* Delay(int32 milliseconds);
public:
/// <summary>
/// Starts the new task.
/// </summary>
/// <param name="task">The task.</param>
/// <returns>Task</returns>
static Task* StartNew(Task* task);
/// <summary>
/// Starts the new task.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The action target object.</param>
/// <returns>Task</returns>
static Task* StartNew(Function<void()>& action, Object* target = nullptr);
/// <summary>
/// Starts the new task.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The action target object.</param>
/// <returns>Task</returns>
static Task* StartNew(Function<void()>::Signature action, Object* target = nullptr);
/// <summary>
/// Starts the new task.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The action target object.</param>
/// <returns>Task</returns>
static Task* StartNew(Function<bool()>& action, Object* target = nullptr);
/// <summary>
/// Starts the new task.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The action target object.</param>
/// <returns>Task</returns>
static Task* StartNew(Function<bool()>::Signature& action, Object* target = nullptr);
/// <summary>
/// Starts the new task.
/// </summary>
/// <param name="callee">The callee object.</param>
/// <returns>Task</returns>
template<class T, bool(T::*Method)()>
static Task* StartNew(T* callee)
{
Function<bool()> action;
action.Bind<T, Method>(callee);
return StartNew(action, dynamic_cast<Object*>(callee));
}
/// <summary>
/// Cancels all the tasks from the list and clears it.
/// </summary>
template<class T = Task>
static void CancelAll(Array<T*>& tasks)
{
for (int32 i = 0; i < tasks.Count(); i++)
{
tasks[i]->Cancel();
}
tasks.Clear();
}
protected:
/// <summary>
/// Executes this task.
/// It should be called by the task consumer (thread pool or other executor of this task type).
/// It calls run() and handles result).
/// </summary>
void Execute();
/// <summary>
/// Runs the task specified operations
/// Does not handles any task related logic, only performs the actual job.
/// </summary>
/// <returns>The task execution result. Returns true if failed, otherwise false.</returns>
virtual bool Run() = 0;
protected:
virtual void Enqueue() = 0;
virtual void OnStart();
virtual void OnFinish();
virtual void OnFail();
virtual void OnCancel();
virtual void OnEnd();
};

View File

@@ -0,0 +1,159 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Core/Types/BaseTypes.h"
#include "Engine/Core/Collections/Array.h"
#include "Threading.h"
// Maximum amount of threads with an access to the thread local variable (we use static limit due to engine threading design)
#define THREAD_LOCAL_MAX_CAPACITY 16
/// <summary>
/// Per thread local variable
/// </summary>
template<typename T, int32 MaxThreads = THREAD_LOCAL_MAX_CAPACITY, bool ClearMemory = true>
class ThreadLocal
{
// Note: this is kind of weak-implementation. We don't want to use locks/semaphores.
// For better performance use 'THREADLOCAL' define before the variable
protected:
struct Bucket
{
uint64 ThreadID;
T Value;
};
Bucket _buckets[MaxThreads];
public:
ThreadLocal()
{
// Clear buckets
if (ClearMemory)
{
Platform::MemoryClear(_buckets, sizeof(_buckets));
}
else
{
for (int32 i = 0; i < MaxThreads; i++)
_buckets[i].ThreadID = 0;
}
}
public:
T& Get()
{
return _buckets[GetIndex()].Value;
}
void Set(const T& value)
{
_buckets[GetIndex()].Value = value;
}
public:
int32 Count() const
{
int32 result = 0;
for (int32 i = 0; i < MaxThreads; i++)
{
if (_buckets[i].ThreadID != 0)
result++;
}
return result;
}
void GetValues(Array<T>& result) const
{
result.EnsureCapacity(MaxThreads);
for (int32 i = 0; i < MaxThreads; i++)
{
result.Add(_buckets[i].Value);
}
}
protected:
FORCE_INLINE static int32 Hash(const uint64 value)
{
return value & (MaxThreads - 1);
}
FORCE_INLINE int32 GetIndex()
{
// TODO: fix it because now we can use only (MaxThreads-1) buckets
ASSERT(Count() < MaxThreads);
auto key = Platform::GetCurrentThreadID();
auto index = Hash(key);
while (_buckets[index].ThreadID != key && _buckets[index].ThreadID != 0)
index = Hash(index + 1);
_buckets[index].ThreadID = key;
return index;
}
};
/// <summary>
/// Per thread local object
/// </summary>
template<typename T, int32 MaxThreads = THREAD_LOCAL_MAX_CAPACITY>
class ThreadLocalObject : public ThreadLocal<T*, MaxThreads>
{
public:
typedef ThreadLocal<T*, MaxThreads> Base;
public:
void Delete()
{
auto value = Base::Get();
Base::SetAll(nullptr);
::Delete(value);
}
void DeleteAll()
{
for (int32 i = 0; i < MaxThreads; i++)
{
auto& bucket = Base::_buckets[i];
if (bucket.Value != nullptr)
{
::Delete(bucket.Value);
bucket.ThreadID = 0;
bucket.Value = nullptr;
}
}
}
void GetNotNullValues(Array<T*>& result) const
{
result.EnsureCapacity(MaxThreads);
for (int32 i = 0; i < MaxThreads; i++)
{
if (Base::_buckets[i].Value != nullptr)
{
result.Add(Base::_buckets[i].Value);
}
}
}
int32 CountNotNullValues() const
{
int32 result = 0;
for (int32 i = 0; i < MaxThreads; i++)
{
if (Base::_buckets[i].Value != nullptr)
result++;
}
return result;
}
};

View File

@@ -0,0 +1,118 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#include "ThreadPool.h"
#include "IRunnable.h"
#include "Threading.h"
#include "ThreadPoolTask.h"
#include "ConcurrentTaskQueue.h"
#include "Engine/Core/Log.h"
#include "Engine/Core/Math/Math.h"
#include "Engine/Engine/EngineService.h"
#include "Engine/Platform/ConditionVariable.h"
#include "Engine/Platform/CPUInfo.h"
#include "Engine/Platform/Thread.h"
namespace ThreadPoolImpl
{
volatile int64 ExitFlag = 0;
Array<Thread*> Threads;
ConcurrentTaskQueue<ThreadPoolTask> Jobs; // Hello Steve!
ConditionVariable JobsSignal;
}
void ThreadPoolTask::Enqueue()
{
ThreadPoolImpl::Jobs.Add(this);
ThreadPoolImpl::JobsSignal.NotifyOne();
}
class ThreadPoolService : public EngineService
{
public:
ThreadPoolService()
: EngineService(TEXT("Thread Pool"), -900)
{
}
bool Init() override;
void BeforeExit() override;
void Dispose() override;
};
ThreadPoolService ThreadPoolServiceInstance;
bool ThreadPoolService::Init()
{
// Spawn threads
const int32 numThreads = Math::Max<int32>(2, Platform::GetCPUInfo().ProcessorCoreCount - 1);
LOG(Info, "Spawning {0} Thread Pool workers", numThreads);
for (int32 i = ThreadPoolImpl::Threads.Count(); i < numThreads; i++)
{
// Create tread
auto runnable = New<SimpleRunnable>(true);
runnable->OnWork.Bind(ThreadPool::ThreadProc);
auto thread = Thread::Create(runnable, String::Format(TEXT("Therad Pool {0}"), i));
if (thread == nullptr)
{
LOG(Error, "Failed to spawn {0} thread in the Thread Pool", i + 1);
return true;
}
// Add to the list
ThreadPoolImpl::Threads.Add(thread);
}
return false;
}
void ThreadPoolService::BeforeExit()
{
// Set exit flag and wake up threads
Platform::AtomicStore(&ThreadPoolImpl::ExitFlag, 1);
ThreadPoolImpl::JobsSignal.NotifyAll();
}
void ThreadPoolService::Dispose()
{
// Set exit flag and wake up threads
Platform::AtomicStore(&ThreadPoolImpl::ExitFlag, 1);
ThreadPoolImpl::JobsSignal.NotifyAll();
// Wait some time
Platform::Sleep(10);
// Delete threads
for (int32 i = 0; i < ThreadPoolImpl::Threads.Count(); i++)
{
if (ThreadPoolImpl::Threads[i]->IsRunning())
{
ThreadPoolImpl::Threads[i]->Kill(true);
}
}
ThreadPoolImpl::Threads.ClearDelete();
}
int32 ThreadPool::ThreadProc()
{
ThreadPoolTask* task;
// Work until end
CriticalSection mutex;
while (Platform::AtomicRead(&ThreadPoolImpl::ExitFlag) == 0)
{
// Try to get a job
if (ThreadPoolImpl::Jobs.try_dequeue(task))
{
task->Execute();
}
else
{
mutex.Lock();
ThreadPoolImpl::JobsSignal.Wait(mutex);
mutex.Unlock();
}
}
return 0;
}

View File

@@ -0,0 +1,17 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Core/Types/BaseTypes.h"
/// <summary>
/// Main engine thread pool for threaded tasks system.
/// </summary>
class ThreadPool
{
friend class ThreadPoolTask;
friend class ThreadPoolService;
private:
static int32 ThreadProc();
};

View File

@@ -0,0 +1,131 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Task.h"
#include "Engine/Core/Types/String.h"
class ThreadPool;
/// <summary>
/// General purpose task executed using Thread Pool.
/// </summary>
/// <seealso cref="Task" />
class ThreadPoolTask : public Task
{
friend ThreadPool;
protected:
/// <summary>
/// Initializes a new instance of the <see cref="ThreadPoolTask"/> class.
/// </summary>
ThreadPoolTask()
: Task()
{
}
public:
// [Task]
String ToString() const override
{
return String::Format(TEXT("Thread Pool Task ({0})"), ::ToString(GetState()));
}
protected:
// [Task]
void Enqueue() override;
};
/// <summary>
/// General purpose task executing custom action using Thread Pool.
/// </summary>
/// <seealso cref="ThreadPoolTask" />
/// <seealso cref="Task" />
class ThreadPoolActionTask : public ThreadPoolTask
{
protected:
Function<void()> _action1;
Function<bool()> _action2;
Object* _target;
public:
/// <summary>
/// Initializes a new instance of the <see cref="ThreadPoolActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
ThreadPoolActionTask(Function<void()>& action, Object* target = nullptr)
: ThreadPoolTask()
, _action1(action)
, _target(target)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ThreadPoolActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
ThreadPoolActionTask(Function<void()>::Signature action, Object* target = nullptr)
: ThreadPoolTask()
, _action1(action)
, _target(target)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ThreadPoolActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
ThreadPoolActionTask(Function<bool()>& action, Object* target = nullptr)
: ThreadPoolTask()
, _action2(action)
, _target(target)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ThreadPoolActionTask"/> class.
/// </summary>
/// <param name="action">The action.</param>
/// <param name="target">The target object.</param>
ThreadPoolActionTask(Function<bool()>::Signature action, Object* target = nullptr)
: ThreadPoolTask()
, _action2(action)
, _target(target)
{
}
public:
// [ThreadPoolTask]
bool HasReference(Object* obj) const override
{
return obj == _target;
}
protected:
// [ThreadPoolTask]
bool Run() override
{
if (_action1.IsBinded())
{
_action1();
return false;
}
if (_action2.IsBinded())
{
return _action2();
}
return false;
}
};

View File

@@ -0,0 +1,65 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#include "ThreadRegistry.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Platform/CriticalSection.h"
namespace ThreadRegistryImpl
{
Dictionary<uint64, Thread*> Registry(64);
CriticalSection Locker;
}
using namespace ThreadRegistryImpl;
Thread* ThreadRegistry::GetThread(uint64 id)
{
Thread* result = nullptr;
Locker.Lock();
Registry.TryGet(id, result);
Locker.Unlock();
return result;
}
int32 ThreadRegistry::Count()
{
Locker.Lock();
int32 count = Registry.Count();
Locker.Unlock();
return count;
}
void ThreadRegistry::KillEmAll()
{
Locker.Lock();
for (auto i = Registry.Begin(); i.IsNotEnd(); ++i)
{
i->Value->Kill(false);
}
Locker.Unlock();
// Now album Kill'Em All from Metallica...
}
void ThreadRegistry::Add(Thread* thread)
{
ASSERT(thread && thread->GetID() != 0);
Locker.Lock();
ASSERT(!Registry.ContainsKey(thread->GetID()) && !Registry.ContainsValue(thread));
Registry.Add(thread->GetID(), thread);
Locker.Unlock();
}
void ThreadRegistry::Remove(Thread* thread)
{
ASSERT(thread && thread->GetID() != 0);
Locker.Lock();
ASSERT_LOW_LAYER(Registry.ContainsKey(thread->GetID()) && Registry[thread->GetID()] == thread);
Registry.Remove(thread->GetID());
Locker.Unlock();
}

View File

@@ -0,0 +1,37 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Core/Types/BaseTypes.h"
#include "Engine/Platform/Thread.h"
/// <summary>
/// Holds all created threads (except the main thread)
/// </summary>
class FLAXENGINE_API ThreadRegistry
{
public:
/// <summary>
/// Gets thread with given ID
/// </summary>
/// <param name="id">Thread ID</param>
/// <returns>Founded thread, or null if is missing</returns>
static Thread* GetThread(uint64 id);
/// <summary>
/// Gets amount of threads in a registry
/// </summary>
/// <returns>The amount of threads used by the engine.</returns>
static int32 Count();
/// <summary>
/// Attempts to kill all threads. Also starts playing Metallica album Kill'Em All. Hit the Lights...
/// </summary>
static void KillEmAll();
public:
static void Add(Thread* thread);
static void Remove(Thread* thread);
};

View File

@@ -0,0 +1,29 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Platform/Thread.h"
#include "Threading.h"
#include "IRunnable.h"
/// <summary>
/// Helper class to spawn custom thread for performing long-time action. Don't use it for short tasks.
/// </summary>
class FLAXENGINE_API ThreadSpawner
{
public:
/// <summary>
/// Starts a new thread the specified callback.
/// </summary>
/// <param name="callback">The callback function.</param>
/// <param name="threadName">Name of the thread.</param>
/// <param name="priority">The thread priority.</param>
/// <returns>The created thread.</returns>
static Thread* Start(const Function<int32()>& callback, const String& threadName, ThreadPriority priority = ThreadPriority::Normal)
{
auto runnable = New<SimpleRunnable>(true);
runnable->OnWork = callback;
return Thread::Create(runnable, threadName, priority);
}
};

View File

@@ -0,0 +1,10 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
using Flax.Build;
/// <summary>
/// Threading module.
/// </summary>
public class Threading : EngineModule
{
}

View File

@@ -0,0 +1,61 @@
// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Platform/Platform.h"
#include "Engine/Platform/CriticalSection.h"
#include "Engine/Engine/Globals.h"
/// <summary>
/// Checks if current execution in on the main thread.
/// </summary>
/// <returns>True if running on the main thread, otherwise false.</returns>
inline bool IsInMainThread()
{
return Globals::MainThreadID == Platform::GetCurrentThreadID();
}
/// <summary>
/// Scope locker for critical section.
/// </summary>
class ScopeLock
{
private:
const CriticalSection* _section;
ScopeLock() = default;
ScopeLock(const ScopeLock&) = delete;
ScopeLock& operator=(const ScopeLock&) = delete;
public:
/// <summary>
/// Init, enters critical section
/// </summary>
/// <param name="section">The synchronization object to manage</param>
ScopeLock(const CriticalSection* section)
: _section(section)
{
ASSERT_LOW_LAYER(_section);
_section->Lock();
}
/// <summary>
/// Init, enters critical section
/// </summary>
/// <param name="section">The synchronization object to manage</param>
ScopeLock(const CriticalSection& section)
: _section(&section)
{
_section->Lock();
}
/// <summary>
/// Destructor, releases critical section
/// </summary>
~ScopeLock()
{
_section->Unlock();
}
};