Files
XCEngine/engine/src/Core/Asset/AsyncLoader.cpp

285 lines
8.9 KiB
C++
Raw Normal View History

#include <XCEngine/Core/Asset/AsyncLoader.h>
#include <XCEngine/Core/Asset/ResourceManager.h>
#include <XCEngine/Core/Asset/ResourceTypes.h>
2026-04-02 18:50:41 +08:00
#include <XCEngine/Debug/Logger.h>
namespace XCEngine {
namespace Resources {
2026-04-02 18:50:41 +08:00
namespace {
std::string ToStdString(const Containers::String& value) {
return std::string(value.CStr());
}
bool ShouldTraceAsyncPath(const Containers::String& path) {
const std::string text = ToStdString(path);
return text.rfind("builtin://", 0) == 0 ||
text.find("backpack") != std::string::npos ||
text.find("New Material.mat") != std::string::npos;
}
void TraceAsyncLoad(const Containers::String& message) {
Debug::Logger::Get().Info(Debug::LogCategory::FileSystem, message);
}
} // namespace
Core::uint64 LoadRequest::GenerateRequestId() {
static std::atomic<Core::uint64> s_requestId{0};
return ++s_requestId;
}
AsyncLoader& AsyncLoader::Get() {
static AsyncLoader instance;
return instance;
}
void AsyncLoader::Initialize(Core::uint32 workerThreadCount) {
2026-04-02 18:50:41 +08:00
if (m_running.exchange(true)) {
return;
}
if (workerThreadCount == 0) {
workerThreadCount = 1;
}
m_workerThreads.reserve(workerThreadCount);
for (Core::uint32 index = 0; index < workerThreadCount; ++index) {
m_workerThreads.emplace_back(&AsyncLoader::WorkerThread, this);
}
}
void AsyncLoader::Shutdown() {
2026-04-02 18:50:41 +08:00
{
std::lock_guard<std::mutex> lock(m_queueMutex);
m_running = false;
m_pendingQueue.clear();
m_pendingCount = 0;
}
m_pendingCondition.notify_all();
for (std::thread& workerThread : m_workerThreads) {
if (workerThread.joinable()) {
workerThread.join();
}
}
m_workerThreads.clear();
{
std::lock_guard<std::mutex> lock(m_completedMutex);
m_completedQueue.clear();
}
m_completedCount = 0;
m_totalRequested = 0;
}
void AsyncLoader::Submit(const Containers::String& path, ResourceType type,
std::function<void(LoadResult)> callback) {
Submit(path, type, nullptr, std::move(callback));
}
void AsyncLoader::Submit(const Containers::String& path, ResourceType type, ImportSettings* settings,
std::function<void(LoadResult)> callback) {
LoadRequest request(path, type, std::move(callback), settings);
SubmitInternal(std::move(request));
}
void AsyncLoader::SubmitInternal(LoadRequest request) {
IResourceLoader* loader = FindLoader(request.type);
2026-04-02 18:50:41 +08:00
if (!loader) {
if (request.callback) {
LoadResult result(Containers::String("No loader for type: ") +
GetResourceTypeName(request.type));
request.callback(result);
}
return;
}
2026-04-02 18:50:41 +08:00
{
2026-04-02 18:50:41 +08:00
std::lock_guard<std::mutex> lock(m_queueMutex);
if (!m_running) {
if (request.callback) {
request.callback(LoadResult("Async loader is not initialized"));
}
return;
}
m_pendingQueue.emplace_back(std::move(request));
++m_pendingCount;
++m_totalRequested;
const LoadRequest& queuedRequest = m_pendingQueue.back();
if (ShouldTraceAsyncPath(queuedRequest.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] submit id=") +
Containers::String(std::to_string(queuedRequest.requestId).c_str()) +
" type=" +
GetResourceTypeName(queuedRequest.type) +
" path=" +
queuedRequest.path +
" pending=" +
Containers::String(std::to_string(m_pendingCount.load()).c_str()));
}
}
2026-04-02 18:50:41 +08:00
m_pendingCondition.notify_one();
}
void AsyncLoader::Update() {
2026-04-02 18:50:41 +08:00
std::deque<CompletedLoadRequest> completed;
{
2026-04-02 18:50:41 +08:00
std::lock_guard<std::mutex> lock(m_completedMutex);
completed.swap(m_completedQueue);
}
2026-04-02 18:50:41 +08:00
for (CompletedLoadRequest& entry : completed) {
if (m_pendingCount > 0) {
--m_pendingCount;
}
++m_completedCount;
if (ShouldTraceAsyncPath(entry.request.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] dispatch id=") +
Containers::String(std::to_string(entry.request.requestId).c_str()) +
" type=" +
GetResourceTypeName(entry.request.type) +
" path=" +
entry.request.path +
" success=" +
Containers::String(entry.result && entry.result.resource != nullptr ? "1" : "0") +
" pending=" +
Containers::String(std::to_string(m_pendingCount.load()).c_str()) +
(entry.result.errorMessage.Empty()
? Containers::String()
: Containers::String(" error=") + entry.result.errorMessage));
}
if (entry.request.callback) {
entry.request.callback(std::move(entry.result));
}
}
}
float AsyncLoader::GetProgress() const {
2026-04-02 18:50:41 +08:00
const Core::uint64 totalRequested = m_totalRequested.load();
if (totalRequested == 0) {
return 1.0f;
}
return static_cast<float>(totalRequested - m_pendingCount.load()) /
static_cast<float>(totalRequested);
}
void AsyncLoader::CancelAll() {
2026-04-02 18:50:41 +08:00
std::lock_guard<std::mutex> lock(m_queueMutex);
if (!m_pendingQueue.empty()) {
const Core::uint32 queuedCount = static_cast<Core::uint32>(m_pendingQueue.size());
m_pendingQueue.clear();
if (m_pendingCount >= queuedCount) {
m_pendingCount -= queuedCount;
} else {
m_pendingCount = 0;
}
}
}
void AsyncLoader::Cancel(Core::uint64 requestId) {
2026-04-02 18:50:41 +08:00
std::lock_guard<std::mutex> lock(m_queueMutex);
for (auto it = m_pendingQueue.begin(); it != m_pendingQueue.end(); ++it) {
if (it->requestId == requestId) {
m_pendingQueue.erase(it);
if (m_pendingCount > 0) {
--m_pendingCount;
}
return;
}
}
}
IResourceLoader* AsyncLoader::FindLoader(ResourceType type) const {
return ResourceManager::Get().GetLoader(type);
}
2026-04-02 18:50:41 +08:00
void AsyncLoader::WorkerThread() {
for (;;) {
LoadRequest request;
{
std::unique_lock<std::mutex> lock(m_queueMutex);
m_pendingCondition.wait(lock, [this]() {
return !m_running || !m_pendingQueue.empty();
});
if (!m_running && m_pendingQueue.empty()) {
return;
}
if (m_pendingQueue.empty()) {
continue;
}
request = std::move(m_pendingQueue.front());
m_pendingQueue.pop_front();
}
if (ShouldTraceAsyncPath(request.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] worker-begin id=") +
Containers::String(std::to_string(request.requestId).c_str()) +
" type=" +
GetResourceTypeName(request.type) +
" path=" +
request.path);
}
LoadResult result;
try {
result = ResourceManager::Get().LoadResource(request.path, request.type, request.settings);
} catch (const std::exception& exception) {
Debug::Logger::Get().Error(
Debug::LogCategory::FileSystem,
Containers::String("Async load threw exception for resource: ") +
request.path +
" - " +
exception.what());
result = LoadResult(
Containers::String("Async load threw exception: ") + exception.what());
} catch (...) {
Debug::Logger::Get().Error(
Debug::LogCategory::FileSystem,
Containers::String("Async load threw unknown exception for resource: ") +
request.path);
result = LoadResult("Async load threw unknown exception");
}
if (ShouldTraceAsyncPath(request.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] worker-end id=") +
Containers::String(std::to_string(request.requestId).c_str()) +
" type=" +
GetResourceTypeName(request.type) +
" path=" +
request.path +
" success=" +
Containers::String(result && result.resource != nullptr ? "1" : "0") +
(result.errorMessage.Empty()
? Containers::String()
: Containers::String(" error=") + result.errorMessage));
}
QueueCompleted(std::move(request), std::move(result));
}
}
void AsyncLoader::QueueCompleted(LoadRequest request, LoadResult result) {
2026-04-02 18:50:41 +08:00
std::lock_guard<std::mutex> lock(m_completedMutex);
m_completedQueue.emplace_back(std::move(request), std::move(result));
}
} // namespace Resources
} // namespace XCEngine