Files
XCEngine/engine/src/Threading/TaskSystem.cpp
ssdfasd 34c75e7129 feat: 实现Containers、Memory、Threading核心模块及单元测试
- Containers: String, Array, HashMap 容器实现及测试
- Memory: Allocator, LinearAllocator, PoolAllocator, ProxyAllocator, MemoryManager 实现及测试
- Threading: Mutex, SpinLock, ReadWriteLock, Thread, Task, TaskSystem 实现及测试
- 修复Windows平台兼容性: _aligned_malloc, std::hash特化
- 修复构建错误和测试用例问题
2026-03-13 20:37:08 +08:00

163 lines
3.7 KiB
C++

#include "Threading/TaskSystem.h"
#include "Threading/LambdaTask.h"
#include <algorithm>
namespace XCEngine {
namespace Threading {
TaskSystem& TaskSystem::Get() {
static TaskSystem instance;
return instance;
}
void TaskSystem::Initialize(const TaskSystemConfig& config) {
m_workerThreadCount = config.workerThreadCount > 0
? config.workerThreadCount
: std::thread::hardware_concurrency();
if (m_workerThreadCount == 0) {
m_workerThreadCount = 2;
}
m_running = true;
for (uint32_t i = 0; i < m_workerThreadCount; ++i) {
m_workerThreads.emplace_back([this]() { WorkerThread(); });
}
}
void TaskSystem::Shutdown() {
m_running = false;
m_shutdown = true;
m_taskAvailable.notify_all();
for (auto& thread : m_workerThreads) {
if (thread.joinable()) {
thread.join();
}
}
m_workerThreads.clear();
}
uint64_t TaskSystem::Submit(std::unique_ptr<ITask> task) {
if (!task) return 0;
uint64_t taskId = ++m_nextTaskId;
task->SetId(taskId);
TaskWrapper wrapper;
wrapper.task = task.get();
wrapper.priority = task->GetPriority();
wrapper.id = taskId;
{
std::lock_guard<Mutex> lock(m_queueMutex);
m_taskQueue.push(wrapper);
}
m_taskAvailable.notify_one();
return taskId;
}
uint64_t TaskSystem::Submit(std::function<void()>&& func, TaskPriority priority) {
ITask* task = new LambdaTask<std::function<void()>>(std::move(func), priority);
return Submit(std::unique_ptr<ITask>(task));
}
TaskGroup* TaskSystem::CreateTaskGroup() {
TaskGroup* group = new TaskGroup();
std::lock_guard<SpinLock> lock(m_groupMutex);
m_taskGroups.push_back(group);
return group;
}
void TaskSystem::DestroyTaskGroup(TaskGroup* group) {
if (!group) return;
{
std::lock_guard<SpinLock> lock(m_groupMutex);
auto it = std::find(m_taskGroups.begin(), m_taskGroups.end(), group);
if (it != m_taskGroups.end()) {
m_taskGroups.erase(it);
}
}
delete group;
}
void TaskSystem::Wait(uint64_t taskId) {
}
uint32_t TaskSystem::GetWorkerThreadCount() const {
return m_workerThreadCount;
}
void TaskSystem::Update() {
std::vector<std::function<void()>> tasks;
{
std::lock_guard<Mutex> lock(m_queueMutex);
tasks = std::move(m_mainThreadQueue);
m_mainThreadQueue.clear();
}
for (auto& task : tasks) {
task();
}
}
void TaskSystem::RunOnMainThread(std::function<void()>&& func) {
{
std::lock_guard<Mutex> lock(m_queueMutex);
m_mainThreadQueue.push_back(std::move(func));
}
}
void TaskSystem::WorkerThread() {
while (m_running) {
TaskWrapper taskWrapper;
if (GetNextTask(taskWrapper)) {
ExecuteTask(taskWrapper);
}
}
}
bool TaskSystem::GetNextTask(TaskWrapper& outTask) {
std::unique_lock<std::mutex> lock(m_conditionMutex);
m_taskAvailable.wait(lock, [this] {
return !m_taskQueue.empty() || !m_running || m_shutdown;
});
if (m_shutdown) {
return false;
}
if (!m_taskQueue.empty()) {
outTask = m_taskQueue.top();
m_taskQueue.pop();
return true;
}
return false;
}
void TaskSystem::ExecuteTask(TaskWrapper& taskWrapper) {
if (!taskWrapper.task) return;
taskWrapper.task->SetStatus(TaskStatus::Running);
try {
taskWrapper.task->Execute();
taskWrapper.task->SetStatus(TaskStatus::Completed);
taskWrapper.task->OnComplete();
} catch (...) {
taskWrapper.task->SetStatus(TaskStatus::Failed);
}
taskWrapper.task->Release();
}
} // namespace Threading
} // namespace XCEngine