// Copyright (c) 2012-2023 Wojciech Figat. All rights reserved. using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading; using Flax.Build.Graph; namespace Flax.Build.BuildSystem.Graph { /// /// The local tasks executor. Uses thread pool to submit a tasks execution in parallel. /// /// public class LocalExecutor : TaskExecutor { private int _counter; private readonly object _locker = new object(); private readonly List _waitingTasks = new List(); private readonly HashSet _executedTasks = new HashSet(); /// /// The minimum amount of threads to be used for the parallel execution. /// public int ThreadCountMin = 1; /// /// The maximum amount of threads to be used for the parallel execution. /// public int ThreadCountMax = Configuration.MaxConcurrency; /// /// The amount of threads to allocate per processor. Use it to allocate more threads for faster execution or use less to keep reduce CPU usage during build. /// public float ProcessorCountScale = Configuration.ConcurrencyProcessorScale; /// public override int Execute(List tasks) { // Find tasks to executed _waitingTasks.Clear(); _executedTasks.Clear(); foreach (var task in tasks) { if (!task.HasValidCachedResults) _waitingTasks.Add(task); } int count = _waitingTasks.Count; if (count == 0) return 0; _counter = 0; // Calculate amount of threads to spawn for the tasks execution int logicalCoresCount = (int)(Environment.ProcessorCount * ProcessorCountScale); int threadsCount = Math.Min(Math.Max(ThreadCountMin, Math.Min(ThreadCountMax, logicalCoresCount)), count); Log.Info(string.Format("Executing {0} {2} using {1} {3}", count, threadsCount, count == 1 ? "task" : "tasks", threadsCount == 1 ? "thread" : "threads")); // Spawn threads var threads = new Thread[threadsCount]; for (int threadIndex = 0; threadIndex < threadsCount; threadIndex++) { threads[threadIndex] = new Thread(ThreadMain) { Name = "Local Executor " + threadIndex, }; threads[threadIndex].Start(threadIndex); } // Wait for the execution end for (int threadIndex = 0; threadIndex < threadsCount; threadIndex++) { threads[threadIndex].Join(); } return _counter; } private void ThreadMain(object obj) { var threadIndex = (int)obj; // TODO: set affinity mask? var failedTasks = new List(); while (true) { // Try to pick a task for the execution Task taskToRun = null; lock (_locker) { // End when run out of the tasks to perform if (_waitingTasks.Count == 0) break; failedTasks.Clear(); foreach (var task in _waitingTasks) { // Check if all its dependencies has been executed bool hasCompletedDependencies = true; bool hasAnyDependencyFailed = false; if (task.DependentTasks != null) { foreach (var dependentTask in task.DependentTasks) { if (_executedTasks.Contains(dependentTask)) { // Handle dependency task execution result if (dependentTask.Failed) { hasAnyDependencyFailed = true; } } else if (!dependentTask.HasValidCachedResults) { // Need to execute dependency task before this one hasCompletedDependencies = false; break; } } } if (hasAnyDependencyFailed) { // Cannot execute task if one of its dependencies has failed failedTasks.Add(task); } else if (hasCompletedDependencies) { // Pick this task for execution taskToRun = task; break; } } foreach (var task in failedTasks) { _waitingTasks.Remove(task); task.Result = -1; _executedTasks.Add(task); } if (taskToRun != null) { _waitingTasks.Remove(taskToRun); } failedTasks.Clear(); } if (taskToRun != null) { // Perform that task taskToRun.StartTime = DateTime.Now; var result = ExecuteTask(taskToRun); taskToRun.EndTime = DateTime.Now; if (result != 0) { Log.Error(string.Format("Task {0} {1} failed with exit code {2}", taskToRun.CommandPath, taskToRun.CommandArguments, result)); Log.Error(""); } // Cache execution result lock (_locker) { taskToRun.Result = result; _executedTasks.Add(taskToRun); _counter++; } } else { // Wait for other thread to process any dependency Thread.Sleep(10); } } } private int ExecuteTask(Task task) { string name = "Task"; if (task.ProducedFiles != null && task.ProducedFiles.Count != 0) name = Path.GetFileName(task.ProducedFiles[0]); var profilerEvent = Profiling.Begin(name); if (Configuration.Verbose) { lock (_locker) { Log.Verbose(""); Log.Verbose(task.CommandPath); Log.Verbose(task.CommandArguments); Log.Verbose(""); } } if (task.InfoMessage != null) { Log.Info(task.InfoMessage); } // Custom action execution (eg. instead of executable file run) if (task.Command != null) { try { task.Command(); } catch (Exception ex) { Log.Exception(ex); return -1; } } if (task.CommandPath == null) { Profiling.End(profilerEvent); return 0; } Process process = null; try { try { process = new Process(); process.StartInfo = new ProcessStartInfo { WorkingDirectory = task.WorkingDirectory, FileName = task.CommandPath, Arguments = task.CommandArguments, UseShellExecute = false, RedirectStandardInput = false, RedirectStandardOutput = true, RedirectStandardError = true, CreateNoWindow = true, }; process.OutputDataReceived += ProcessDebugOutput; process.ErrorDataReceived += ProcessDebugOutput; process.Start(); process.BeginOutputReadLine(); process.BeginErrorReadLine(); } catch (Exception ex) { Log.Error("Failed to start local process for task"); Log.Exception(ex); return -1; } // Hang until process end process.WaitForExit(); Profiling.End(profilerEvent); return process.ExitCode; } finally { Profiling.End(profilerEvent); // Ensure to cleanup data process?.Close(); } } private static void ProcessDebugOutput(object sender, DataReceivedEventArgs e) { string output = e.Data; if (output != null) { Log.Info(output); } } } }