diff --git a/Source/Engine/Core/Collections/Array.h b/Source/Engine/Core/Collections/Array.h index 9b8c41245..5069c0ac2 100644 --- a/Source/Engine/Core/Collections/Array.h +++ b/Source/Engine/Core/Collections/Array.h @@ -493,17 +493,8 @@ public: /// Adds the other collection to the collection. /// /// The other collection to add. - FORCE_INLINE void Add(const Array& other) - { - Add(other.Get(), other.Count()); - } - - /// - /// Adds the other collection to the collection. - /// - /// The other collection to add. - template - FORCE_INLINE void Add(const Array& other) + template + FORCE_INLINE void Add(const Array& other) { Add(other.Get(), other.Count()); } diff --git a/Source/Engine/Threading/TaskGraph.cpp b/Source/Engine/Threading/TaskGraph.cpp new file mode 100644 index 000000000..aa86a94a9 --- /dev/null +++ b/Source/Engine/Threading/TaskGraph.cpp @@ -0,0 +1,118 @@ +// Copyright (c) 2012-2021 Wojciech Figat. All rights reserved. + +#include "TaskGraph.h" +#include "JobSystem.h" +#include "Engine/Core/Collections/Sorting.h" +#include "Engine/Profiler/ProfilerCPU.h" + +namespace +{ + bool SortTaskGraphSystem(TaskGraphSystem* const& a, TaskGraphSystem* const& b) + { + return b->Order < a->Order; + }; +} + +TaskGraphSystem::TaskGraphSystem(const SpawnParams& params) + : PersistentScriptingObject(params) +{ +} + +void TaskGraphSystem::AddDependency(TaskGraphSystem* system) +{ + _dependencies.Add(system); +} + +void TaskGraphSystem::PreExecute(TaskGraph* graph) +{ +} + +void TaskGraphSystem::Execute(TaskGraph* graph) +{ +} + +void TaskGraphSystem::PostExecute(TaskGraph* graph) +{ +} + +TaskGraph::TaskGraph(const SpawnParams& params) + : PersistentScriptingObject(params) +{ +} + +const Array>& TaskGraph::GetSystems() const +{ + return _systems; +} + +void TaskGraph::AddSystem(TaskGraphSystem* system) +{ + _systems.Add(system); +} + +void TaskGraph::RemoveSystem(TaskGraphSystem* system) +{ + _systems.Remove(system); +} + +void TaskGraph::Execute() +{ + PROFILE_CPU(); + + for (auto system : _systems) + system->PreExecute(this); + + _queue.Clear(); + _remaining.Clear(); + _remaining.Add(_systems); + + while (_remaining.HasItems()) + { + // Find systems without dependencies or with already executed dependencies + for (int32 i = _remaining.Count() - 1; i >= 0; i--) + { + auto e = _remaining[i]; + bool hasReadyDependencies = true; + for (auto d : e->_dependencies) + { + if (_remaining.Contains(d)) + { + hasReadyDependencies = false; + break; + } + } + if (hasReadyDependencies) + { + _queue.Add(e); + _remaining.RemoveAt(i); + } + } + + // End if no systems left + if (_queue.IsEmpty()) + break; + + // Execute in order + Sorting::QuickSort(_queue.Get(), _queue.Count(), &SortTaskGraphSystem); + _currentLabel = 0; + for (int32 i = 0; i < _queue.Count(); i++) + { + _currentSystem = _queue[i]; + _currentSystem->Execute(this); + } + _currentSystem = nullptr; + _queue.Clear(); + + // Wait for async jobs to finish + JobSystem::Wait(_currentLabel); + } + + for (auto system : _systems) + system->PostExecute(this); +} + +void TaskGraph::DispatchJob(const Function& job, int32 jobCount) +{ + ASSERT(_currentSystem); + _currentLabel = JobSystem::Dispatch(job, jobCount); +} diff --git a/Source/Engine/Threading/TaskGraph.h b/Source/Engine/Threading/TaskGraph.h new file mode 100644 index 000000000..7268e644c --- /dev/null +++ b/Source/Engine/Threading/TaskGraph.h @@ -0,0 +1,95 @@ +// Copyright (c) 2012-2021 Wojciech Figat. All rights reserved. + +#pragma once + +#include "Engine/Scripting/ScriptingObject.h" +#include "Engine/Core/Collections/Array.h" + +class TaskGraph; + +/// +/// System that can generate work into Task Graph for asynchronous execution. +/// +API_CLASS(Abstract) class FLAXENGINE_API TaskGraphSystem : public PersistentScriptingObject +{ +DECLARE_SCRIPTING_TYPE(TaskGraphSystem); + friend TaskGraph; +private: + Array> _dependencies; + +public: + /// + /// The execution order of the system (systems with higher order are executed earlier). + /// + API_FIELD() int32 Order = 0; + +public: + /// + /// Adds the dependency on the system execution. Before this system can be executed the given dependant system has to be executed first. + /// + /// The system to depend on. + API_FUNCTION() void AddDependency(TaskGraphSystem* system); + + /// + /// Called before executing any systems of the graph. Can be used to initialize data (synchronous). + /// + /// The graph executing the system. + API_FUNCTION() virtual void PreExecute(TaskGraph* graph); + + /// + /// Executes the system logic and schedules the asynchronous work. + /// + /// The graph executing the system. + API_FUNCTION() virtual void Execute(TaskGraph* graph); + + /// + /// Called after executing all systems of the graph. Can be used to cleanup data (synchronous). + /// + /// The graph executing the system. + API_FUNCTION() virtual void PostExecute(TaskGraph* graph); +}; + +/// +/// Graph-based asynchronous tasks scheduler for high-performance computing and processing. +/// +API_CLASS() class FLAXENGINE_API TaskGraph : public PersistentScriptingObject +{ +DECLARE_SCRIPTING_TYPE(TaskGraph); +private: + Array> _systems; + Array> _remaining; + Array> _queue; + TaskGraphSystem* _currentSystem = nullptr; + int64 _currentLabel = 0; + +public: + /// + /// Gets the list of systems. + /// + API_PROPERTY() const Array>& GetSystems() const; + + /// + /// Adds the system to the graph for the execution. + /// + /// The system to add. + API_FUNCTION() void AddSystem(TaskGraphSystem* system); + + /// + /// Removes the system from the graph. + /// + /// The system to add. + API_FUNCTION() void RemoveSystem(TaskGraphSystem* system); + + /// + /// Schedules the asynchronous systems execution including ordering and dependencies handling. + /// + API_FUNCTION() void Execute(); + + /// + /// Dispatches the job for the execution. + /// + /// Call only from system's Execute method to properly schedule job. + /// The job. Argument is an index of the job execution. + /// The job executions count. + API_FUNCTION() void DispatchJob(const Function& job, int32 jobCount = 1); +};