Add deferred async scene asset loading

This commit is contained in:
2026-04-02 18:50:41 +08:00
parent dd08d8969e
commit 86144416af
18 changed files with 1806 additions and 97 deletions

View File

@@ -1,10 +1,30 @@
#include <XCEngine/Core/Asset/AsyncLoader.h>
#include <XCEngine/Core/Asset/ResourceManager.h>
#include <XCEngine/Core/Asset/ResourceTypes.h>
#include <XCEngine/Debug/Logger.h>
namespace XCEngine {
namespace Resources {
namespace {
std::string ToStdString(const Containers::String& value) {
return std::string(value.CStr());
}
bool ShouldTraceAsyncPath(const Containers::String& path) {
const std::string text = ToStdString(path);
return text.rfind("builtin://", 0) == 0 ||
text.find("backpack") != std::string::npos ||
text.find("New Material.mat") != std::string::npos;
}
void TraceAsyncLoad(const Containers::String& message) {
Debug::Logger::Get().Info(Debug::LogCategory::FileSystem, message);
}
} // namespace
Core::uint64 LoadRequest::GenerateRequestId() {
static std::atomic<Core::uint64> s_requestId{0};
return ++s_requestId;
@@ -16,11 +36,43 @@ AsyncLoader& AsyncLoader::Get() {
}
void AsyncLoader::Initialize(Core::uint32 workerThreadCount) {
(void)workerThreadCount;
if (m_running.exchange(true)) {
return;
}
if (workerThreadCount == 0) {
workerThreadCount = 1;
}
m_workerThreads.reserve(workerThreadCount);
for (Core::uint32 index = 0; index < workerThreadCount; ++index) {
m_workerThreads.emplace_back(&AsyncLoader::WorkerThread, this);
}
}
void AsyncLoader::Shutdown() {
CancelAll();
{
std::lock_guard<std::mutex> lock(m_queueMutex);
m_running = false;
m_pendingQueue.clear();
m_pendingCount = 0;
}
m_pendingCondition.notify_all();
for (std::thread& workerThread : m_workerThreads) {
if (workerThread.joinable()) {
workerThread.join();
}
}
m_workerThreads.clear();
{
std::lock_guard<std::mutex> lock(m_completedMutex);
m_completedQueue.clear();
}
m_completedCount = 0;
m_totalRequested = 0;
}
void AsyncLoader::Submit(const Containers::String& path, ResourceType type,
@@ -36,7 +88,7 @@ void AsyncLoader::Submit(const Containers::String& path, ResourceType type, Impo
void AsyncLoader::SubmitInternal(LoadRequest request) {
IResourceLoader* loader = FindLoader(request.type);
if (!loader) {
if (request.callback) {
LoadResult result(Containers::String("No loader for type: ") +
@@ -45,58 +97,187 @@ void AsyncLoader::SubmitInternal(LoadRequest request) {
}
return;
}
{
std::lock_guard lock(m_queueMutex);
m_pendingQueue.PushBack(std::move(request));
m_pendingCount++;
m_totalRequested++;
std::lock_guard<std::mutex> lock(m_queueMutex);
if (!m_running) {
if (request.callback) {
request.callback(LoadResult("Async loader is not initialized"));
}
return;
}
m_pendingQueue.emplace_back(std::move(request));
++m_pendingCount;
++m_totalRequested;
const LoadRequest& queuedRequest = m_pendingQueue.back();
if (ShouldTraceAsyncPath(queuedRequest.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] submit id=") +
Containers::String(std::to_string(queuedRequest.requestId).c_str()) +
" type=" +
GetResourceTypeName(queuedRequest.type) +
" path=" +
queuedRequest.path +
" pending=" +
Containers::String(std::to_string(m_pendingCount.load()).c_str()));
}
}
m_pendingCondition.notify_one();
}
void AsyncLoader::Update() {
Containers::Array<LoadRequest> completed;
std::deque<CompletedLoadRequest> completed;
{
std::lock_guard lock(m_completedMutex);
completed = std::move(m_completedQueue);
m_completedQueue.Clear();
std::lock_guard<std::mutex> lock(m_completedMutex);
completed.swap(m_completedQueue);
}
for (auto& request : completed) {
m_pendingCount--;
if (request.callback) {
LoadResult result(true);
request.callback(result);
for (CompletedLoadRequest& entry : completed) {
if (m_pendingCount > 0) {
--m_pendingCount;
}
++m_completedCount;
if (ShouldTraceAsyncPath(entry.request.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] dispatch id=") +
Containers::String(std::to_string(entry.request.requestId).c_str()) +
" type=" +
GetResourceTypeName(entry.request.type) +
" path=" +
entry.request.path +
" success=" +
Containers::String(entry.result && entry.result.resource != nullptr ? "1" : "0") +
" pending=" +
Containers::String(std::to_string(m_pendingCount.load()).c_str()) +
(entry.result.errorMessage.Empty()
? Containers::String()
: Containers::String(" error=") + entry.result.errorMessage));
}
if (entry.request.callback) {
entry.request.callback(std::move(entry.result));
}
}
}
float AsyncLoader::GetProgress() const {
if (m_totalRequested == 0) return 1.0f;
return static_cast<float>(m_totalRequested - m_pendingCount.load()) / m_totalRequested;
const Core::uint64 totalRequested = m_totalRequested.load();
if (totalRequested == 0) {
return 1.0f;
}
return static_cast<float>(totalRequested - m_pendingCount.load()) /
static_cast<float>(totalRequested);
}
void AsyncLoader::CancelAll() {
std::lock_guard lock(m_queueMutex);
m_pendingQueue.Clear();
m_pendingCount = 0;
std::lock_guard<std::mutex> lock(m_queueMutex);
if (!m_pendingQueue.empty()) {
const Core::uint32 queuedCount = static_cast<Core::uint32>(m_pendingQueue.size());
m_pendingQueue.clear();
if (m_pendingCount >= queuedCount) {
m_pendingCount -= queuedCount;
} else {
m_pendingCount = 0;
}
}
}
void AsyncLoader::Cancel(Core::uint64 requestId) {
std::lock_guard lock(m_queueMutex);
(void)requestId;
std::lock_guard<std::mutex> lock(m_queueMutex);
for (auto it = m_pendingQueue.begin(); it != m_pendingQueue.end(); ++it) {
if (it->requestId == requestId) {
m_pendingQueue.erase(it);
if (m_pendingCount > 0) {
--m_pendingCount;
}
return;
}
}
}
IResourceLoader* AsyncLoader::FindLoader(ResourceType type) const {
return ResourceManager::Get().GetLoader(type);
}
void AsyncLoader::WorkerThread() {
for (;;) {
LoadRequest request;
{
std::unique_lock<std::mutex> lock(m_queueMutex);
m_pendingCondition.wait(lock, [this]() {
return !m_running || !m_pendingQueue.empty();
});
if (!m_running && m_pendingQueue.empty()) {
return;
}
if (m_pendingQueue.empty()) {
continue;
}
request = std::move(m_pendingQueue.front());
m_pendingQueue.pop_front();
}
if (ShouldTraceAsyncPath(request.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] worker-begin id=") +
Containers::String(std::to_string(request.requestId).c_str()) +
" type=" +
GetResourceTypeName(request.type) +
" path=" +
request.path);
}
LoadResult result;
try {
result = ResourceManager::Get().LoadResource(request.path, request.type, request.settings);
} catch (const std::exception& exception) {
Debug::Logger::Get().Error(
Debug::LogCategory::FileSystem,
Containers::String("Async load threw exception for resource: ") +
request.path +
" - " +
exception.what());
result = LoadResult(
Containers::String("Async load threw exception: ") + exception.what());
} catch (...) {
Debug::Logger::Get().Error(
Debug::LogCategory::FileSystem,
Containers::String("Async load threw unknown exception for resource: ") +
request.path);
result = LoadResult("Async load threw unknown exception");
}
if (ShouldTraceAsyncPath(request.path)) {
TraceAsyncLoad(
Containers::String("[AsyncLoader] worker-end id=") +
Containers::String(std::to_string(request.requestId).c_str()) +
" type=" +
GetResourceTypeName(request.type) +
" path=" +
request.path +
" success=" +
Containers::String(result && result.resource != nullptr ? "1" : "0") +
(result.errorMessage.Empty()
? Containers::String()
: Containers::String(" error=") + result.errorMessage));
}
QueueCompleted(std::move(request), std::move(result));
}
}
void AsyncLoader::QueueCompleted(LoadRequest request, LoadResult result) {
std::lock_guard lock(m_completedMutex);
(void)request;
(void)result;
std::lock_guard<std::mutex> lock(m_completedMutex);
m_completedQueue.emplace_back(std::move(request), std::move(result));
}
} // namespace Resources