210 lines
7.0 KiB
Python
210 lines
7.0 KiB
Python
|
|
"""RenderDoc session manager - singleton managing capture file lifecycle."""
|
||
|
|
|
||
|
|
import os
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
from .util import rd, make_error
|
||
|
|
|
||
|
|
|
||
|
|
class RenderDocSession:
|
||
|
|
"""Manages a single RenderDoc capture file's lifecycle."""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
self._initialized: bool = False
|
||
|
|
self._cap = None # CaptureFile
|
||
|
|
self._controller = None # ReplayController
|
||
|
|
self._filepath: Optional[str] = None
|
||
|
|
self._current_event: Optional[int] = None
|
||
|
|
self._action_map = {}
|
||
|
|
self._structured_file = None
|
||
|
|
self._resource_id_cache = {} # str(resourceId) → resourceId
|
||
|
|
self._texture_desc_cache = {} # str(resourceId) → TextureDescription
|
||
|
|
|
||
|
|
def _ensure_initialized(self):
|
||
|
|
"""Initialize the replay system if not already done."""
|
||
|
|
if not self._initialized:
|
||
|
|
rd.InitialiseReplay(rd.GlobalEnvironment(), [])
|
||
|
|
self._initialized = True
|
||
|
|
|
||
|
|
@property
|
||
|
|
def is_open(self) -> bool:
|
||
|
|
return self._controller is not None
|
||
|
|
|
||
|
|
@property
|
||
|
|
def filepath(self) -> Optional[str]:
|
||
|
|
return self._filepath
|
||
|
|
|
||
|
|
@property
|
||
|
|
def controller(self):
|
||
|
|
return self._controller
|
||
|
|
|
||
|
|
@property
|
||
|
|
def structured_file(self):
|
||
|
|
return self._structured_file
|
||
|
|
|
||
|
|
@property
|
||
|
|
def current_event(self) -> Optional[int]:
|
||
|
|
return self._current_event
|
||
|
|
|
||
|
|
@property
|
||
|
|
def driver_name(self) -> str:
|
||
|
|
"""Get the API/driver name of the current capture."""
|
||
|
|
return self._cap.DriverName() if self._cap else "unknown"
|
||
|
|
|
||
|
|
@property
|
||
|
|
def action_map(self):
|
||
|
|
"""Get the event_id -> ActionDescription mapping."""
|
||
|
|
return self._action_map
|
||
|
|
|
||
|
|
def require_open(self) -> Optional[dict]:
|
||
|
|
"""Return an error dict if no capture is open, else None."""
|
||
|
|
if not self.is_open:
|
||
|
|
return make_error(
|
||
|
|
"No capture file is open. Use open_capture first.", "NO_CAPTURE_OPEN"
|
||
|
|
)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def open(self, filepath: str) -> dict:
|
||
|
|
"""Open a .rdc capture file. Closes any previously open capture."""
|
||
|
|
self._ensure_initialized()
|
||
|
|
|
||
|
|
if not os.path.isfile(filepath):
|
||
|
|
return make_error(f"File not found: {filepath}", "API_ERROR")
|
||
|
|
|
||
|
|
# Close any existing capture
|
||
|
|
if self.is_open:
|
||
|
|
self.close()
|
||
|
|
|
||
|
|
cap = rd.OpenCaptureFile()
|
||
|
|
result = cap.OpenFile(filepath, "", None)
|
||
|
|
if result != rd.ResultCode.Succeeded:
|
||
|
|
cap.Shutdown()
|
||
|
|
return make_error(f"Failed to open file: {result}", "API_ERROR")
|
||
|
|
|
||
|
|
if not cap.LocalReplaySupport():
|
||
|
|
cap.Shutdown()
|
||
|
|
return make_error("Capture cannot be replayed on this machine", "API_ERROR")
|
||
|
|
|
||
|
|
result, controller = cap.OpenCapture(rd.ReplayOptions(), None)
|
||
|
|
if result != rd.ResultCode.Succeeded:
|
||
|
|
cap.Shutdown()
|
||
|
|
return make_error(f"Failed to initialize replay: {result}", "API_ERROR")
|
||
|
|
|
||
|
|
self._cap = cap
|
||
|
|
self._controller = controller
|
||
|
|
self._filepath = filepath
|
||
|
|
self._current_event = None
|
||
|
|
self._structured_file = controller.GetStructuredFile()
|
||
|
|
|
||
|
|
# Build action map
|
||
|
|
self._action_map = {}
|
||
|
|
self._build_action_map(controller.GetRootActions())
|
||
|
|
|
||
|
|
# Build resource caches
|
||
|
|
self._resource_id_cache = {}
|
||
|
|
self._texture_desc_cache = {}
|
||
|
|
textures = controller.GetTextures()
|
||
|
|
buffers = controller.GetBuffers()
|
||
|
|
for tex in textures:
|
||
|
|
key = str(tex.resourceId)
|
||
|
|
self._resource_id_cache[key] = tex.resourceId
|
||
|
|
self._texture_desc_cache[key] = tex
|
||
|
|
for buf in buffers:
|
||
|
|
key = str(buf.resourceId)
|
||
|
|
self._resource_id_cache[key] = buf.resourceId
|
||
|
|
for res in controller.GetResources():
|
||
|
|
key = str(res.resourceId)
|
||
|
|
if key not in self._resource_id_cache:
|
||
|
|
self._resource_id_cache[key] = res.resourceId
|
||
|
|
|
||
|
|
# Gather summary
|
||
|
|
root_actions = controller.GetRootActions()
|
||
|
|
|
||
|
|
return {
|
||
|
|
"filepath": filepath,
|
||
|
|
"api": cap.DriverName(),
|
||
|
|
"total_actions": len(self._action_map),
|
||
|
|
"root_actions": len(root_actions),
|
||
|
|
"textures": len(textures),
|
||
|
|
"buffers": len(buffers),
|
||
|
|
}
|
||
|
|
|
||
|
|
def _build_action_map(self, actions):
|
||
|
|
"""Recursively index all actions by event_id."""
|
||
|
|
for a in actions:
|
||
|
|
self._action_map[a.eventId] = a
|
||
|
|
if len(a.children) > 0:
|
||
|
|
self._build_action_map(a.children)
|
||
|
|
|
||
|
|
def resolve_resource_id(self, resource_id_str: str):
|
||
|
|
"""Resolve a resource ID string to a ResourceId object, or None."""
|
||
|
|
return self._resource_id_cache.get(resource_id_str)
|
||
|
|
|
||
|
|
def get_texture_desc(self, resource_id_str: str):
|
||
|
|
"""Get a TextureDescription by resource ID string, or None."""
|
||
|
|
return self._texture_desc_cache.get(resource_id_str)
|
||
|
|
|
||
|
|
def close(self) -> dict:
|
||
|
|
"""Close the current capture."""
|
||
|
|
if not self.is_open:
|
||
|
|
return {"status": "no capture was open"}
|
||
|
|
|
||
|
|
filepath = self._filepath
|
||
|
|
self._controller.Shutdown()
|
||
|
|
self._cap.Shutdown()
|
||
|
|
self._controller = None
|
||
|
|
self._cap = None
|
||
|
|
self._filepath = None
|
||
|
|
self._current_event = None
|
||
|
|
self._action_map = {}
|
||
|
|
self._structured_file = None
|
||
|
|
self._resource_id_cache = {}
|
||
|
|
self._texture_desc_cache = {}
|
||
|
|
return {"status": "closed", "filepath": filepath}
|
||
|
|
|
||
|
|
def set_event(self, event_id: int) -> Optional[dict]:
|
||
|
|
"""Navigate to a specific event. Returns error dict or None on success."""
|
||
|
|
if event_id not in self._action_map:
|
||
|
|
return make_error(f"Event ID {event_id} not found", "INVALID_EVENT_ID")
|
||
|
|
self._controller.SetFrameEvent(event_id, True)
|
||
|
|
self._current_event = event_id
|
||
|
|
return None
|
||
|
|
|
||
|
|
def ensure_event(self, event_id: Optional[int]) -> Optional[dict]:
|
||
|
|
"""If event_id is given, set it. If no event is current, return error.
|
||
|
|
Returns error dict or None on success."""
|
||
|
|
if event_id is not None:
|
||
|
|
return self.set_event(event_id)
|
||
|
|
if self._current_event is None:
|
||
|
|
return make_error(
|
||
|
|
"No event selected. Use set_event or pass event_id.", "INVALID_EVENT_ID"
|
||
|
|
)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_action(self, event_id: int):
|
||
|
|
"""Get an ActionDescription by event_id, or None."""
|
||
|
|
return self._action_map.get(event_id)
|
||
|
|
|
||
|
|
def get_root_actions(self):
|
||
|
|
return self._controller.GetRootActions()
|
||
|
|
|
||
|
|
def shutdown(self):
|
||
|
|
"""Full shutdown - close capture and deinitialize replay."""
|
||
|
|
if self.is_open:
|
||
|
|
self.close()
|
||
|
|
if self._initialized:
|
||
|
|
rd.ShutdownReplay()
|
||
|
|
self._initialized = False
|
||
|
|
|
||
|
|
|
||
|
|
# Module-level singleton
|
||
|
|
_session: Optional[RenderDocSession] = None
|
||
|
|
|
||
|
|
|
||
|
|
def get_session() -> RenderDocSession:
|
||
|
|
"""Get or create the global RenderDocSession singleton."""
|
||
|
|
global _session
|
||
|
|
if _session is None:
|
||
|
|
_session = RenderDocSession()
|
||
|
|
return _session
|