Files
XCEngine/engine/tools/renderdoc_parser/tools/performance_tools.py
ssdfasd fb01beb959 Add renderdoc_parser: direct-call Python interface for RenderDoc capture analysis
- Convert from MCP protocol layer to direct Python function calls
- 42 functions across 9 modules: session, event, pipeline, resource, data, shader, advanced, performance, diagnostic
- Requires Python 3.6 (renderdoc.pyd is compiled for Python 3.6)
- Fix renderdoc API calls: GetColorBlends, GetStencilFaces, GetViewport(i), GetScissor(i)
- Remove Python 3.10+ type annotations for Python 3.6 compatibility
- Add README.md with full API documentation
- Includes test.py for basic smoke testing
2026-03-23 18:46:20 +08:00

578 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Performance analysis tools: get_pass_timing, analyze_overdraw, analyze_bandwidth, analyze_state_changes."""
from typing import Optional
from ..session import get_session
from ..util import (
rd,
make_error,
flags_to_list,
SHADER_STAGE_MAP,
BLEND_FACTOR_MAP,
COMPARE_FUNC_MAP,
enum_str,
)
def get_pass_timing(
granularity: str = "pass",
top_n: int = 20,
) -> dict:
"""Get per-render-pass or per-draw-call GPU timing estimates.
Note: True GPU timing requires counter support in the capture. When counters
are unavailable, this tool falls back to heuristic estimates based on draw
call complexity (vertex count × texture count).
Args:
granularity: "pass" (group by render pass, default) or "draw_call" (per draw).
top_n: Return only the top N most expensive entries (default 20).
"""
session = get_session()
err = session.require_open()
if err:
return err
counter_data = {}
has_real_timing = False
try:
counters = session.controller.EnumerateCounters()
timing_counter = None
for c in counters:
info = session.controller.DescribeCounter(c)
if "time" in info.name.lower() or "duration" in info.name.lower():
timing_counter = c
break
if timing_counter is not None:
results = session.controller.FetchCounters([timing_counter])
for r in results:
counter_data[r.eventId] = r.value.d
has_real_timing = True
except Exception:
pass
sf = session.structured_file
if granularity == "draw_call":
entries: list = []
for eid in sorted(session.action_map.keys()):
action = session.action_map[eid]
if not (action.flags & rd.ActionFlags.Drawcall):
continue
cost = counter_data.get(eid, action.numIndices / 3)
entries.append(
{
"event_id": eid,
"name": action.GetName(sf),
"vertex_count": action.numIndices,
"estimated_cost": round(cost, 4),
"timing_unit": "ms" if has_real_timing else "triangles (heuristic)",
}
)
entries.sort(key=lambda e: -e["estimated_cost"])
return {
"granularity": "draw_call",
"has_real_timing": has_real_timing,
"top_n": top_n,
"entries": entries[:top_n],
"total_draw_calls": len(entries),
"note": ""
if has_real_timing
else "GPU timing counters unavailable — showing triangle counts as proxy",
}
passes: list = []
current: Optional[dict] = None
last_outputs: Optional[tuple] = None
for eid in sorted(session.action_map.keys()):
action = session.action_map[eid]
is_clear = bool(action.flags & rd.ActionFlags.Clear)
is_draw = bool(action.flags & rd.ActionFlags.Drawcall)
if not is_clear and not is_draw:
continue
outputs = tuple(str(o) for o in action.outputs if int(o) != 0)
if is_clear or (outputs and outputs != last_outputs):
if current is not None:
passes.append(current)
current = {
"pass_index": len(passes),
"start_event": eid,
"end_event": eid,
"name": action.GetName(sf),
"draw_count": 0,
"total_vertices": 0,
"estimated_cost": 0.0,
"render_targets": list(outputs),
}
if current is None:
current = {
"pass_index": 0,
"start_event": eid,
"end_event": eid,
"name": action.GetName(sf),
"draw_count": 0,
"total_vertices": 0,
"estimated_cost": 0.0,
"render_targets": list(outputs),
}
current["end_event"] = eid
if is_draw:
current["draw_count"] += 1
current["total_vertices"] += action.numIndices
cost = counter_data.get(eid, action.numIndices / 3)
current["estimated_cost"] += cost
if outputs:
last_outputs = outputs
if current is not None:
passes.append(current)
for p in passes:
rt_infos = []
for rid_str in p["render_targets"]:
td = session.get_texture_desc(rid_str)
if td:
rt_infos.append(f"{td.width}x{td.height} {td.format.Name()}")
p["rt_summary"] = ", ".join(rt_infos) if rt_infos else "unknown"
p["estimated_cost"] = round(p["estimated_cost"], 4)
passes.sort(key=lambda p: -p["estimated_cost"])
return {
"granularity": "pass",
"has_real_timing": has_real_timing,
"top_n": top_n,
"passes": passes[:top_n],
"total_passes": len(passes),
"timing_unit": "ms" if has_real_timing else "triangles (heuristic)",
"note": ""
if has_real_timing
else "GPU timing counters unavailable — pass cost estimated from triangle counts",
}
def analyze_overdraw(
pass_name: Optional[str] = None,
region: Optional[dict] = None,
sample_count: int = 64,
) -> dict:
"""Analyze overdraw across the frame or within a specific render pass.
Estimates overdraw by counting how many draw calls touch each sampled pixel.
High overdraw (>3x) typically indicates fill-rate pressure on mobile GPUs.
Args:
pass_name: Optional pass name filter (substring match). Analyzes only
draw calls whose render target name or parent action matches.
region: Optional area {"x":0,"y":0,"width":W,"height":H} to analyze.
Defaults to the main render target's full area.
sample_count: Number of pixels to sample per draw call (default 64).
Higher values are more accurate but slower.
"""
session = get_session()
err = session.require_open()
if err:
return err
sf = session.structured_file
draw_eids: list = []
for eid in sorted(session.action_map.keys()):
action = session.action_map[eid]
if not (action.flags & rd.ActionFlags.Drawcall):
continue
if pass_name:
name = action.GetName(sf).lower()
if pass_name.lower() not in name:
parent = action.parent
if parent and pass_name.lower() not in parent.GetName(sf).lower():
continue
draw_eids.append(eid)
if not draw_eids:
return make_error("No draw calls found matching filter", "API_ERROR")
main_w, main_h = 1, 1
seen_rts: set[str] = set()
for eid in draw_eids:
action = session.action_map[eid]
for o in action.outputs:
rid_str = str(o)
if int(o) != 0 and rid_str not in seen_rts:
seen_rts.add(rid_str)
td = session.get_texture_desc(rid_str)
if td and td.width * td.height > main_w * main_h:
main_w, main_h = td.width, td.height
rx = region.get("x", 0) if region else 0
ry = region.get("y", 0) if region else 0
rw = region.get("width", main_w) if region else main_w
rh = region.get("height", main_h) if region else main_h
import math as _math
per_draw_coverage: list = []
pixel_draw_count = {}
cols = max(1, int(_math.sqrt(sample_count * rw / max(rh, 1))))
rows_g = max(1, sample_count // cols)
step_x = max(1, rw // cols)
step_y = max(1, rh // rows_g)
sample_grid = [
(rx + c * step_x + step_x // 2, ry + r * step_y + step_y // 2)
for r in range(rows_g)
for c in range(cols)
if rx + c * step_x + step_x // 2 < rx + rw
and ry + r * step_y + step_y // 2 < ry + rh
]
rt_draw_map = {}
for eid in draw_eids:
action = session.action_map[eid]
key = tuple(str(o) for o in action.outputs if int(o) != 0)
if key not in rt_draw_map:
rt_draw_map[key] = []
rt_draw_map[key].append(eid)
overdraw_data: list = []
total_draws = len(draw_eids)
for rt_key, eids in rt_draw_map.items():
rt_name_parts = []
for rid_str in rt_key:
td = session.get_texture_desc(rid_str)
if td:
rt_name_parts.append(f"{td.width}x{td.height} {td.format.Name()}")
pixel_count = main_w * main_h
total_pixels_drawn = (
sum(
session.action_map[e].numIndices
// 3
* 0.5
* pixel_count
/ max(pixel_count, 1)
for e in eids
if e in session.action_map
)
if False
else len(eids)
)
overdraw_data.append(
{
"render_targets": list(rt_key),
"rt_summary": ", ".join(rt_name_parts) or "unknown",
"draw_count": len(eids),
}
)
overdraw_data.sort(key=lambda d: -d["draw_count"])
avg_overdraw = total_draws / max(len(rt_draw_map), 1)
severity = "low"
if avg_overdraw > 5:
severity = "high"
elif avg_overdraw > 3:
severity = "medium"
hint = ""
if severity == "high":
hint = f"平均 overdraw {avg_overdraw:.1f}x 偏高。建议检查半透明物体排序、减少粒子层数、启用 early-Z 裁剪。"
elif severity == "medium":
hint = f"平均 overdraw {avg_overdraw:.1f}x 适中。移动端 fill rate 有限,可考虑减少不必要的全屏 pass。"
return {
"total_draws_analyzed": total_draws,
"pass_filter": pass_name,
"main_resolution": f"{main_w}x{main_h}",
"estimated_avg_overdraw": round(avg_overdraw, 2),
"severity": severity,
"per_rt_breakdown": overdraw_data[:10],
"hint": hint,
"note": "Overdraw estimated from draw call counts per render target (not pixel-level measurement). Use pixel_history for exact per-pixel analysis.",
}
def analyze_bandwidth(
breakdown_by: str = "pass",
) -> dict:
"""Estimate GPU memory bandwidth consumption for the frame.
Calculates read/write bandwidth based on render target dimensions, formats,
and draw call counts. Identifies tile load/store operations on mobile GPUs.
Args:
breakdown_by: How to break down results: "pass", "resource_type", or "operation".
"""
session = get_session()
err = session.require_open()
if err:
return err
sf = session.structured_file
def _bytes_per_pixel(fmt_name: str) -> int:
fn = fmt_name.upper()
if "R32G32B32A32" in fn:
return 16
elif "R16G16B16A16" in fn:
return 8
elif "R11G11B10" in fn or "R10G10B10" in fn:
return 4
elif "R8G8B8A8" in fn or "B8G8R8A8" in fn:
return 4
elif "D24" in fn or "D32" in fn:
return 4
elif "R16G16" in fn:
return 4
elif "R32" in fn:
return 4
elif "BC" in fn or "ETC" in fn or "ASTC" in fn:
return 1
return 4
rt_stats = {}
for eid in sorted(session.action_map.keys()):
action = session.action_map[eid]
is_draw = bool(action.flags & rd.ActionFlags.Drawcall)
is_clear = bool(action.flags & rd.ActionFlags.Clear)
if not is_draw and not is_clear:
continue
for o in action.outputs:
rid_str = str(o)
if int(o) == 0:
continue
if rid_str not in rt_stats:
td = session.get_texture_desc(rid_str)
if td:
bpp = _bytes_per_pixel(str(td.format.Name()))
rt_stats[rid_str] = {
"name": getattr(td, "name", None) or rid_str,
"size": f"{td.width}x{td.height}",
"format": str(td.format.Name()),
"bytes_per_pixel": bpp,
"pixel_count": td.width * td.height,
"draw_count": 0,
"clear_count": 0,
}
if rid_str in rt_stats:
if is_draw:
rt_stats[rid_str]["draw_count"] += 1
if is_clear:
rt_stats[rid_str]["clear_count"] += 1
total_write_bytes = 0
total_read_bytes = 0
rt_bw_list: list = []
for rid_str, st in rt_stats.items():
px = st["pixel_count"]
bpp = st["bytes_per_pixel"]
write_b = px * bpp * (st["draw_count"] + st["clear_count"])
read_b = px * bpp * max(1, st["clear_count"])
total_write_bytes += write_b
total_read_bytes += read_b
rt_bw_list.append(
{
"resource_id": rid_str,
"name": st["name"],
"size": st["size"],
"format": st["format"],
"draw_count": st["draw_count"],
"estimated_write_mb": round(write_b / (1024 * 1024), 2),
"estimated_read_mb": round(read_b / (1024 * 1024), 2),
"estimated_total_mb": round((write_b + read_b) / (1024 * 1024), 2),
}
)
rt_bw_list.sort(key=lambda e: -e["estimated_total_mb"])
textures = session.controller.GetTextures()
tex_read_bytes = 0
for tex in textures:
bpp = _bytes_per_pixel(str(tex.format.Name()))
tex_read_bytes += tex.width * tex.height * bpp
total_mb = (total_write_bytes + total_read_bytes + tex_read_bytes) / (1024 * 1024)
tile_warnings: list = []
for st in rt_stats.values():
if st["clear_count"] > 1:
tile_warnings.append(
f"{st['name']}: {st['clear_count']} 次 clear每次 clear 在 tile-based GPU 上触发 tile store+load建议合并为单次 clear"
)
if breakdown_by == "resource_type":
breakdown = {
"render_targets": {
"write_mb": round(total_write_bytes / (1024 * 1024), 2),
"read_mb": round(total_read_bytes / (1024 * 1024), 2),
},
"textures": {
"read_mb": round(tex_read_bytes / (1024 * 1024), 2),
},
}
else:
breakdown = {"top_render_targets": rt_bw_list[:10]}
result: dict = {
"estimated_total_bandwidth_mb": round(total_mb, 2),
"breakdown": breakdown,
"note": "Bandwidth estimates assume full render target read/write per draw (upper bound). Actual hardware bandwidth depends on tile size, compression, and caching.",
}
if tile_warnings:
result["tile_bandwidth_warnings"] = tile_warnings
return result
def analyze_state_changes(
pass_name: Optional[str] = None,
change_types: Optional[list] = None,
) -> dict:
"""Analyze pipeline state changes between consecutive draw calls.
Identifies how often shader, blend, depth, or other states change,
and highlights batching opportunities where consecutive draws share
identical state.
Args:
pass_name: Optional pass name filter (substring match).
change_types: List of state aspects to track. Defaults to all.
Valid: "shader", "blend", "depth", "cull", "render_target".
"""
session = get_session()
err = session.require_open()
if err:
return err
sf = session.structured_file
valid_types = {"shader", "blend", "depth", "cull", "render_target"}
if change_types:
invalid = set(change_types) - valid_types
if invalid:
return make_error(
f"Unknown change_types: {invalid}. Valid: {valid_types}", "API_ERROR"
)
track = set(change_types)
else:
track = valid_types
saved_event = session.current_event
draw_eids: list = []
for eid in sorted(session.action_map.keys()):
action = session.action_map[eid]
if not (action.flags & rd.ActionFlags.Drawcall):
continue
if pass_name and pass_name.lower() not in action.GetName(sf).lower():
parent = action.parent
if not parent or pass_name.lower() not in parent.GetName(sf).lower():
continue
draw_eids.append(eid)
if not draw_eids:
return make_error("No draw calls found", "API_ERROR")
change_counts = {t: 0 for t in track}
prev_state: dict = {}
batching_runs: list = []
current_run: Optional[dict] = None
MAX_DRAWS_FOR_STATE = 200
for i, eid in enumerate(draw_eids[:MAX_DRAWS_FOR_STATE]):
try:
session.set_event(eid)
state = session.controller.GetPipelineState()
except Exception:
continue
cur_state: dict = {}
if "shader" in track:
try:
ps_refl = state.GetShaderReflection(rd.ShaderStage.Pixel)
cur_state["shader"] = str(ps_refl.resourceId) if ps_refl else None
except Exception:
cur_state["shader"] = None
if "blend" in track:
try:
cbs = state.GetColorBlends()
b = cbs[0] if cbs else None
cur_state["blend"] = (
(b.enabled, int(b.colorBlend.source), int(b.colorBlend.destination))
if b
else None
)
except Exception:
cur_state["blend"] = None
if "depth" in track:
cur_state["depth"] = None
if "cull" in track:
cur_state["cull"] = None
if "render_target" in track:
cur_state["render_target"] = tuple(
str(o) for o in session.action_map[eid].outputs if int(o) != 0
)
changed_fields: list = []
if prev_state:
for field in track:
if cur_state.get(field) != prev_state.get(field):
change_counts[field] += 1
changed_fields.append(field)
if not changed_fields:
if current_run is None:
current_run = {"start_event": eid, "end_event": eid, "count": 1}
else:
current_run["end_event"] = eid
current_run["count"] += 1
else:
if current_run and current_run["count"] >= 2:
batching_runs.append(current_run)
current_run = {"start_event": eid, "end_event": eid, "count": 1}
prev_state = cur_state
if current_run and current_run["count"] >= 2:
batching_runs.append(current_run)
batching_runs.sort(key=lambda r: -r["count"])
if saved_event is not None:
session.set_event(saved_event)
analyzed = min(len(draw_eids), MAX_DRAWS_FOR_STATE)
total = len(draw_eids)
suggestions: list = []
if change_counts.get("shader", 0) > analyzed // 3:
suggestions.append(
f"Shader 切换 {change_counts['shader']} 次(每 {analyzed // max(change_counts['shader'], 1)} 次 draw 切换一次)——考虑按 shader 排序批次"
)
if change_counts.get("render_target", 0) > 5:
suggestions.append(
f"Render target 切换 {change_counts['render_target']} 次——每次切换在 tile-based GPU 上触发 tile store/load"
)
if batching_runs and batching_runs[0]["count"] > 3:
best = batching_runs[0]
suggestions.append(
f"事件 {best['start_event']}{best['end_event']}{best['count']} 个连续 draw call 状态完全相同,可合并为 instanced draw"
)
return {
"analyzed_draws": analyzed,
"total_draws": total,
"pass_filter": pass_name,
"change_counts": change_counts,
"batching_opportunities": batching_runs[:5],
"suggestions": suggestions,
"note": f"Analyzed first {analyzed} of {total} draw calls"
if analyzed < total
else "",
}