#include #include #include #include namespace XCEngine { namespace Resources { namespace { std::string ToStdString(const Containers::String& value) { return std::string(value.CStr()); } bool ShouldTraceAsyncPath(const Containers::String& path) { const std::string text = ToStdString(path); return text.rfind("builtin://", 0) == 0 || text.find("backpack") != std::string::npos || text.find("New Material.mat") != std::string::npos; } void TraceAsyncLoad(const Containers::String& message) { Debug::Logger::Get().Info(Debug::LogCategory::FileSystem, message); } } // namespace Core::uint64 LoadRequest::GenerateRequestId() { static std::atomic s_requestId{0}; return ++s_requestId; } AsyncLoader& AsyncLoader::Get() { static AsyncLoader instance; return instance; } void AsyncLoader::Initialize(Core::uint32 workerThreadCount) { if (m_running.exchange(true)) { return; } if (workerThreadCount == 0) { workerThreadCount = 1; } m_workerThreads.reserve(workerThreadCount); for (Core::uint32 index = 0; index < workerThreadCount; ++index) { m_workerThreads.emplace_back(&AsyncLoader::WorkerThread, this); } } void AsyncLoader::Shutdown() { { std::lock_guard lock(m_queueMutex); m_running = false; m_pendingQueue.clear(); m_pendingCount = 0; } m_pendingCondition.notify_all(); for (std::thread& workerThread : m_workerThreads) { if (workerThread.joinable()) { workerThread.join(); } } m_workerThreads.clear(); { std::lock_guard lock(m_completedMutex); m_completedQueue.clear(); } m_completedCount = 0; m_totalRequested = 0; } void AsyncLoader::Submit(const Containers::String& path, ResourceType type, std::function callback) { Submit(path, type, nullptr, std::move(callback)); } void AsyncLoader::Submit(const Containers::String& path, ResourceType type, ImportSettings* settings, std::function callback) { LoadRequest request(path, type, std::move(callback), settings); SubmitInternal(std::move(request)); } void AsyncLoader::SubmitInternal(LoadRequest request) { IResourceLoader* loader = FindLoader(request.type); if (!loader) { if (request.callback) { LoadResult result(Containers::String("No loader for type: ") + GetResourceTypeName(request.type)); request.callback(result); } return; } { std::lock_guard lock(m_queueMutex); if (!m_running) { if (request.callback) { request.callback(LoadResult("Async loader is not initialized")); } return; } m_pendingQueue.emplace_back(std::move(request)); ++m_pendingCount; ++m_totalRequested; const LoadRequest& queuedRequest = m_pendingQueue.back(); if (ShouldTraceAsyncPath(queuedRequest.path)) { TraceAsyncLoad( Containers::String("[AsyncLoader] submit id=") + Containers::String(std::to_string(queuedRequest.requestId).c_str()) + " type=" + GetResourceTypeName(queuedRequest.type) + " path=" + queuedRequest.path + " pending=" + Containers::String(std::to_string(m_pendingCount.load()).c_str())); } } m_pendingCondition.notify_one(); } void AsyncLoader::Update() { std::deque completed; { std::lock_guard lock(m_completedMutex); completed.swap(m_completedQueue); } for (CompletedLoadRequest& entry : completed) { if (m_pendingCount > 0) { --m_pendingCount; } ++m_completedCount; if (ShouldTraceAsyncPath(entry.request.path)) { TraceAsyncLoad( Containers::String("[AsyncLoader] dispatch id=") + Containers::String(std::to_string(entry.request.requestId).c_str()) + " type=" + GetResourceTypeName(entry.request.type) + " path=" + entry.request.path + " success=" + Containers::String(entry.result && entry.result.resource != nullptr ? "1" : "0") + " pending=" + Containers::String(std::to_string(m_pendingCount.load()).c_str()) + (entry.result.errorMessage.Empty() ? Containers::String() : Containers::String(" error=") + entry.result.errorMessage)); } if (entry.request.callback) { entry.request.callback(std::move(entry.result)); } } } float AsyncLoader::GetProgress() const { const Core::uint64 totalRequested = m_totalRequested.load(); if (totalRequested == 0) { return 1.0f; } return static_cast(totalRequested - m_pendingCount.load()) / static_cast(totalRequested); } void AsyncLoader::CancelAll() { std::lock_guard lock(m_queueMutex); if (!m_pendingQueue.empty()) { const Core::uint32 queuedCount = static_cast(m_pendingQueue.size()); m_pendingQueue.clear(); if (m_pendingCount >= queuedCount) { m_pendingCount -= queuedCount; } else { m_pendingCount = 0; } } } void AsyncLoader::Cancel(Core::uint64 requestId) { std::lock_guard lock(m_queueMutex); for (auto it = m_pendingQueue.begin(); it != m_pendingQueue.end(); ++it) { if (it->requestId == requestId) { m_pendingQueue.erase(it); if (m_pendingCount > 0) { --m_pendingCount; } return; } } } IResourceLoader* AsyncLoader::FindLoader(ResourceType type) const { return ResourceManager::Get().GetLoader(type); } void AsyncLoader::WorkerThread() { for (;;) { LoadRequest request; { std::unique_lock lock(m_queueMutex); m_pendingCondition.wait(lock, [this]() { return !m_running || !m_pendingQueue.empty(); }); if (!m_running && m_pendingQueue.empty()) { return; } if (m_pendingQueue.empty()) { continue; } request = std::move(m_pendingQueue.front()); m_pendingQueue.pop_front(); } if (ShouldTraceAsyncPath(request.path)) { TraceAsyncLoad( Containers::String("[AsyncLoader] worker-begin id=") + Containers::String(std::to_string(request.requestId).c_str()) + " type=" + GetResourceTypeName(request.type) + " path=" + request.path); } LoadResult result; try { result = ResourceManager::Get().LoadResource(request.path, request.type, request.settings); } catch (const std::exception& exception) { Debug::Logger::Get().Error( Debug::LogCategory::FileSystem, Containers::String("Async load threw exception for resource: ") + request.path + " - " + exception.what()); result = LoadResult( Containers::String("Async load threw exception: ") + exception.what()); } catch (...) { Debug::Logger::Get().Error( Debug::LogCategory::FileSystem, Containers::String("Async load threw unknown exception for resource: ") + request.path); result = LoadResult("Async load threw unknown exception"); } if (ShouldTraceAsyncPath(request.path)) { TraceAsyncLoad( Containers::String("[AsyncLoader] worker-end id=") + Containers::String(std::to_string(request.requestId).c_str()) + " type=" + GetResourceTypeName(request.type) + " path=" + request.path + " success=" + Containers::String(result && result.resource != nullptr ? "1" : "0") + (result.errorMessage.Empty() ? Containers::String() : Containers::String(" error=") + result.errorMessage)); } QueueCompleted(std::move(request), std::move(result)); } } void AsyncLoader::QueueCompleted(LoadRequest request, LoadResult result) { std::lock_guard lock(m_completedMutex); m_completedQueue.emplace_back(std::move(request), std::move(result)); } } // namespace Resources } // namespace XCEngine