Files
XCEngine/engine/include/XCEngine/Threading/TaskSystem.h

114 lines
2.9 KiB
C++

#pragma once
#include "TaskSystemConfig.h"
#include "Task.h"
#include "LambdaTask.h"
#include "TaskGroup.h"
#include "Mutex.h"
#include "SpinLock.h"
#include <algorithm>
#include <memory>
#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <functional>
namespace XCEngine {
namespace Threading {
class TaskSystem {
public:
static TaskSystem& Get();
void Initialize(const TaskSystemConfig& config);
void Shutdown();
uint64_t Submit(std::unique_ptr<ITask> task);
uint64_t Submit(std::function<void()>&& func, TaskPriority priority = TaskPriority::Normal);
TaskGroup* CreateTaskGroup();
void DestroyTaskGroup(TaskGroup* group);
void Wait(uint64_t taskId);
uint32_t GetWorkerThreadCount() const;
void Update();
template<typename Func>
void ParallelFor(int32_t start, int32_t end, Func&& func);
void RunOnMainThread(std::function<void()>&& func);
private:
TaskSystem() = default;
~TaskSystem() = default;
struct TaskWrapper {
ITask* task;
TaskPriority priority;
uint64_t id;
bool operator<(const TaskWrapper& other) const {
return priority > other.priority;
}
};
void WorkerThread();
bool GetNextTask(TaskWrapper& outTask);
void ExecuteTask(TaskWrapper& task);
std::vector<std::thread> m_workerThreads;
std::priority_queue<TaskWrapper> m_taskQueue;
std::vector<TaskGroup*> m_taskGroups;
std::vector<std::function<void()>> m_mainThreadQueue;
Mutex m_queueMutex;
std::mutex m_conditionMutex;
SpinLock m_groupMutex;
std::condition_variable m_taskAvailable;
std::condition_variable m_mainThreadCondition;
std::atomic<bool> m_running{false};
std::atomic<uint64_t> m_nextTaskId{0};
uint32_t m_workerThreadCount = 0;
bool m_shutdown = false;
};
template<typename Func>
void TaskSystem::ParallelFor(int32_t start, int32_t end, Func&& func) {
int32_t count = end - start;
if (count <= 0) return;
uint32_t numThreads = std::thread::hardware_concurrency();
if (numThreads == 0) numThreads = 2;
int32_t chunkSize = (count + numThreads - 1) / numThreads;
if (chunkSize < 1) chunkSize = 1;
auto parallelTask = [&func, start, chunkSize, count](int32_t threadIndex) {
int32_t begin = start + threadIndex * chunkSize;
int32_t endIndex = std::min(begin + chunkSize, start + count);
for (int32_t i = begin; i < endIndex; ++i) {
func(i);
}
};
std::vector<std::function<void()>> tasks;
tasks.reserve(numThreads);
for (uint32_t i = 0; i < numThreads; ++i) {
tasks.emplace_back([=]() { parallelTask(i); });
}
for (auto& task : tasks) {
Submit(std::make_unique<::XCEngine::Threading::LambdaTask<std::function<void()>>>(
std::move(task),
TaskPriority::High));
}
}
} // namespace Threading
} // namespace XCEngine