"""Advanced tools: pixel_history, get_post_vs_data, diff_draw_calls, analyze_render_passes, sample_pixel_region, debug_shader_at_pixel.""" import math import struct from typing import Optional from ..session import get_session from ..util import ( rd, make_error, flags_to_list, MESH_DATA_STAGE_MAP, serialize_shader_variable, SHADER_STAGE_MAP, ) def sample_pixel_region( event_id: Optional[int] = None, resource_id: Optional[str] = None, region: Optional[dict] = None, sample_count: int = 256, anomaly_threshold: float = 10.0, ) -> dict: """Batch-sample pixels from a render target and auto-detect anomalies. Scans for NaN / Inf / negative values and extreme-bright pixels. Best tool for locating anomalous color values and IBL / HDR issues. Args: event_id: Sample at this event's render target state. Uses current event if omitted. resource_id: Specific render target resource ID. If omitted, uses the first color output of the current event. region: Optional sampling region {"x":0,"y":0,"width":W,"height":H}. If omitted, samples the full texture uniformly. sample_count: Number of sample points (default 256, max 1024). anomaly_threshold: Pixels with any channel exceeding this value are flagged as extreme-bright (useful for HDR targets). Default 10.0. """ session = get_session() err = session.require_open() if err: return err err = session.ensure_event(event_id) if err: return err if resource_id is not None: tex_id = session.resolve_resource_id(resource_id) if tex_id is None: return make_error( f"Resource '{resource_id}' not found", "INVALID_RESOURCE_ID" ) tex_desc = session.get_texture_desc(resource_id) else: state = session.controller.GetPipelineState() try: outputs = state.GetOutputTargets() color_target = next((o for o in outputs if int(o.resource) != 0), None) except Exception: color_target = None if color_target is None: return make_error("No color render target at current event", "API_ERROR") rid_str = str(color_target.resource) tex_id = session.resolve_resource_id(rid_str) tex_desc = session.get_texture_desc(rid_str) resource_id = rid_str if tex_desc is None: return make_error("Could not get texture description", "API_ERROR") tex_w, tex_h = tex_desc.width, tex_desc.height if region: rx = max(0, region.get("x", 0)) ry = max(0, region.get("y", 0)) rw = min(region.get("width", tex_w), tex_w - rx) rh = min(region.get("height", tex_h), tex_h - ry) else: rx, ry, rw, rh = 0, 0, tex_w, tex_h sample_count = min(sample_count, 1024) import math as _math cols = max(1, int(_math.sqrt(sample_count * rw / max(rh, 1)))) rows = max(1, sample_count // cols) step_x = max(1, rw // cols) step_y = max(1, rh // rows) sample_points: list = [] for r in range(rows): for c in range(cols): px = rx + c * step_x + step_x // 2 py = ry + r * step_y + step_y // 2 if px < rx + rw and py < ry + rh: sample_points.append((px, py)) nan_count = inf_count = neg_count = bright_count = 0 hotspots: list = [] values_r: list = [] values_g: list = [] values_b: list = [] for px, py in sample_points: try: val = session.controller.PickPixel( tex_id, px, py, rd.Subresource(0, 0, 0), rd.CompType.Typeless ) r, g, b = val.floatValue[0], val.floatValue[1], val.floatValue[2] except Exception: continue pixel = [round(r, 6), round(g, 6), round(b, 6)] anomaly_type: Optional[str] = None if any(_math.isnan(v) for v in [r, g, b]): nan_count += 1 anomaly_type = "NaN" elif any(_math.isinf(v) for v in [r, g, b]): inf_count += 1 anomaly_type = "Inf" pixel = [ "+Inf" if _math.isinf(v) and v > 0 else ("-Inf" if _math.isinf(v) else v) for v in pixel ] elif any(v < 0 for v in [r, g, b]): neg_count += 1 anomaly_type = "negative" elif max(r, g, b) > anomaly_threshold: bright_count += 1 anomaly_type = "extreme_bright" if anomaly_type: hotspots.append({"pixel": [px, py], "value": pixel, "type": anomaly_type}) if not any(_math.isnan(v) or _math.isinf(v) for v in [r, g, b]): values_r.append(r) values_g.append(g) values_b.append(b) total = len(sample_points) anomaly_total = nan_count + inf_count + neg_count + bright_count stats: dict = {} for ch, vals in [("r", values_r), ("g", values_g), ("b", values_b)]: if vals: stats[ch] = { "min": round(min(vals), 6), "max": round(max(vals), 6), "mean": round(sum(vals) / len(vals), 6), } MAX_HOTSPOTS = 20 hotspots_out = hotspots[:MAX_HOTSPOTS] hints: list = [] if inf_count > 0: hints.append( f"Inf 像素 {inf_count} 个——建议对这些像素执行 pixel_history 追踪来源" ) if neg_count > 0: hints.append( f"负值像素 {neg_count} 个——可能来自 IBL SH 采样或 HDR 溢出,建议检查写入该 RT 的 draw call" ) if nan_count > 0: hints.append(f"NaN 像素 {nan_count} 个——通常由除以零或 0/0 运算引起") if bright_count > 0: hints.append( f"极亮像素 {bright_count} 个(>{anomaly_threshold})——可能溢出或曝光异常" ) result = { "resource_id": resource_id, "render_target": f"{getattr(tex_desc, 'name', None) or resource_id} ({tex_desc.format.Name()}) {tex_w}x{tex_h}", "total_samples": total, "anomalies": { "nan_count": nan_count, "inf_count": inf_count, "negative_count": neg_count, "extreme_bright_count": bright_count, "anomaly_total": anomaly_total, "anomaly_rate": f"{anomaly_total / max(total, 1) * 100:.1f}%", }, "hotspots": hotspots_out, "statistics": stats, } if hints: result["diagnosis_hint"] = " | ".join(hints) return result def debug_shader_at_pixel( event_id: int, pixel_x: int, pixel_y: int, stage: str = "pixel", watch_variables: Optional[list] = None, ) -> dict: """Debug the shader at a specific pixel using RenderDoc's shader debugger. Executes the shader for the given pixel and returns intermediate variable values at each step. Useful for tracing negative values, IBL computation errors, TAA artifacts, and other precision issues. Note: Shader debugging may not be supported for all API/GPU combinations. If unsupported, falls back to reporting pixel value and bound shader info. Args: event_id: The event ID of the draw call. pixel_x: X coordinate (render target space, origin top-left). pixel_y: Y coordinate. stage: Shader stage to debug: "pixel" (default) or "vertex". watch_variables: Optional list of variable names to focus on (e.g. ["color", "iblDiffuse", "exposure"]). If omitted, all variables are returned (may be large). """ session = get_session() err = session.require_open() if err: return err err = session.set_event(event_id) if err: return err stage_enum = SHADER_STAGE_MAP.get(stage.lower()) if stage_enum is None: return make_error(f"Unknown shader stage: {stage}", "API_ERROR") state = session.controller.GetPipelineState() refl = state.GetShaderReflection(stage_enum) if refl is None: return make_error(f"No shader bound at stage '{stage}'", "API_ERROR") pixel_val_info: dict = {} try: outputs = state.GetOutputTargets() if outputs: first_rt = next((o for o in outputs if int(o.resource) != 0), None) if first_rt: rt_id = first_rt.resource pv = session.controller.PickPixel( rt_id, pixel_x, pixel_y, rd.Subresource(0, 0, 0), rd.CompType.Typeless, ) pixel_val_info = { "current_pixel_rgba": [ round(pv.floatValue[0], 6), round(pv.floatValue[1], 6), round(pv.floatValue[2], 6), round(pv.floatValue[3], 6), ], } r, g, b = pv.floatValue[0], pv.floatValue[1], pv.floatValue[2] anomalies = [] if any(math.isnan(v) for v in [r, g, b]): anomalies.append("NaN detected in output pixel") if any(math.isinf(v) for v in [r, g, b]): anomalies.append("Inf detected in output pixel") if any(v < 0 for v in [r, g, b]): anomalies.append( f"Negative value in output: [{r:.4f}, {g:.4f}, {b:.4f}]" ) if anomalies: pixel_val_info["pixel_anomalies"] = anomalies except Exception: pass trace_result: Optional[dict] = None debug_error: Optional[str] = None try: if stage_enum == rd.ShaderStage.Pixel: shader_debug = session.controller.DebugPixel( pixel_x, pixel_y, rd.DebugPixelInputs(), ) else: shader_debug = None if ( shader_debug is not None and hasattr(shader_debug, "states") and shader_debug.states ): states = shader_debug.states steps: list = [] for step_state in states: step: dict = {} try: changed = [] if hasattr(step_state, "locals"): for var in step_state.locals: vname = var.name if watch_variables and not any( w.lower() in vname.lower() for w in watch_variables ): continue sv = serialize_shader_variable(var) val = sv.get("value", []) if isinstance(val, list): flat = [v for v in val if isinstance(v, (int, float))] if any(math.isnan(v) for v in flat): sv["warning"] = f"⚠️ {vname} 包含 NaN" elif any(math.isinf(v) for v in flat): sv["warning"] = f"⚠️ {vname} 包含 Inf" elif any(v < 0 for v in flat): sv["warning"] = f"⚠️ {vname} 包含负值: {flat}" changed.append(sv) if changed: step["variables"] = changed except Exception: pass if step: steps.append(step) trace_result = { "steps_with_changes": len(steps), "trace": steps[:200], } else: debug_error = "DebugPixel returned no trace data (may not be supported for this API/GPU)" except Exception as e: debug_error = f"Shader debugging failed: {type(e).__name__}: {e}" result: dict = { "event_id": event_id, "pixel": [pixel_x, pixel_y], "stage": stage, "shader_resource_id": str(refl.resourceId), "entry_point": refl.entryPoint, } result.update(pixel_val_info) if trace_result is not None: result["debug_trace"] = trace_result else: result["debug_note"] = debug_error or "Shader trace unavailable" result["fallback_info"] = { "constant_blocks": [cb.name for cb in refl.constantBlocks], "read_only_resources": [r.name for r in refl.readOnlyResources], "suggestion": "Use get_cbuffer_contents and pixel_history for manual investigation", } return result def pixel_history( resource_id: str, x: int, y: int, event_id: Optional[int] = None, ) -> dict: """Get the full modification history of a pixel across all events in the frame. Shows every event that wrote to this pixel, with before/after values and pass/fail status (depth test, stencil test, etc.). Args: resource_id: The texture resource ID (must be a render target). x: X coordinate of the pixel. y: Y coordinate of the pixel. event_id: Optional event ID to navigate to first. """ session = get_session() err = session.require_open() if err: return err err = session.ensure_event(event_id) if err: return err tex_id = session.resolve_resource_id(resource_id) if tex_id is None: return make_error( f"Texture resource '{resource_id}' not found", "INVALID_RESOURCE_ID" ) history = session.controller.PixelHistory( tex_id, x, y, rd.Subresource(0, 0, 0), rd.CompType.Typeless, ) results = [] for mod in history: passed = mod.Passed() entry: dict = { "event_id": mod.eventId, "passed": passed, } if not passed: failure_reasons = [] try: if mod.backfaceCulled: failure_reasons.append("backface_culled") if mod.depthTestFailed: failure_reasons.append("depth_test_failed") if mod.stencilTestFailed: failure_reasons.append("stencil_test_failed") if mod.scissorClipped: failure_reasons.append("scissor_clipped") if mod.shaderDiscarded: failure_reasons.append("shader_discarded") if mod.depthClipped: failure_reasons.append("depth_clipped") except Exception: pass if failure_reasons: entry["failure_reasons"] = failure_reasons pre = mod.preMod entry["pre_value"] = { "r": pre.col.floatValue[0], "g": pre.col.floatValue[1], "b": pre.col.floatValue[2], "a": pre.col.floatValue[3], "depth": pre.depth, "stencil": pre.stencil, } post = mod.postMod entry["post_value"] = { "r": post.col.floatValue[0], "g": post.col.floatValue[1], "b": post.col.floatValue[2], "a": post.col.floatValue[3], "depth": post.depth, "stencil": post.stencil, } entry["pixel_changed"] = ( pre.col.floatValue[0] != post.col.floatValue[0] or pre.col.floatValue[1] != post.col.floatValue[1] or pre.col.floatValue[2] != post.col.floatValue[2] or pre.col.floatValue[3] != post.col.floatValue[3] ) results.append(entry) return { "resource_id": resource_id, "x": x, "y": y, "modifications": results, "count": len(results), } def get_post_vs_data( stage: str = "vsout", max_vertices: int = 100, event_id: Optional[int] = None, ) -> dict: """Get post-vertex-shader transformed vertex data for the current draw call. Args: stage: Data stage: "vsin" (vertex input), "vsout" (after vertex shader), "gsout" (after geometry shader). Default: "vsout". max_vertices: Maximum number of vertices to return (default 100). event_id: Optional event ID to navigate to first. """ session = get_session() err = session.require_open() if err: return err err = session.ensure_event(event_id) if err: return err mesh_stage = MESH_DATA_STAGE_MAP.get(stage.lower()) if mesh_stage is None: return make_error( f"Unknown mesh stage: {stage}. Valid: {list(MESH_DATA_STAGE_MAP.keys())}", "API_ERROR", ) postvs = session.controller.GetPostVSData(0, 0, mesh_stage) if postvs.vertexResourceId == rd.ResourceId.Null(): return make_error("No post-VS data available for current event", "API_ERROR") state = session.controller.GetPipelineState() if stage.lower() == "vsin": attrs = state.GetVertexInputs() attr_info = [{"name": a.name, "format": str(a.format.Name())} for a in attrs] else: refl_stage = rd.ShaderStage.Vertex if stage.lower() == "gsout": gs_refl = state.GetShaderReflection(rd.ShaderStage.Geometry) if gs_refl is not None: refl_stage = rd.ShaderStage.Geometry vs_refl = state.GetShaderReflection(refl_stage) if vs_refl is None: return make_error("No shader bound for requested stage", "API_ERROR") attr_info = [] for sig in vs_refl.outputSignature: name = sig.semanticIdxName if sig.varName == "" else sig.varName attr_info.append( { "name": name, "var_type": str(sig.varType), "comp_count": sig.compCount, "system_value": str(sig.systemValue), } ) num_verts = min(postvs.numIndices, max_vertices) data = session.controller.GetBufferData( postvs.vertexResourceId, postvs.vertexByteOffset, num_verts * postvs.vertexByteStride, ) vertices = [] floats_per_vertex = postvs.vertexByteStride // 4 if floats_per_vertex == 0: return make_error( f"Invalid vertex stride ({postvs.vertexByteStride} bytes), cannot parse vertex data", "API_ERROR", ) for i in range(num_verts): offset = i * postvs.vertexByteStride if offset + postvs.vertexByteStride > len(data): break vertex_floats = list(struct.unpack_from(f"{floats_per_vertex}f", data, offset)) vertices.append([round(f, 6) for f in vertex_floats]) return { "stage": stage, "event_id": session.current_event, "attributes": attr_info, "vertex_stride": postvs.vertexByteStride, "total_vertices": postvs.numIndices, "returned_vertices": len(vertices), "vertices": vertices, } def diff_draw_calls(eid1: int, eid2: int) -> dict: """Compare two draw calls and return their state differences. Useful for understanding what changed between two similar draw calls. Args: eid1: Event ID of the first draw call. eid2: Event ID of the second draw call. """ from ..tools.pipeline_tools import _get_draw_state_dict session = get_session() err = session.require_open() if err: return err state1 = _get_draw_state_dict(session, eid1) if "error" in state1: return state1 state2 = _get_draw_state_dict(session, eid2) if "error" in state2: return state2 raw_diff = _diff_dicts(state1, state2) differences = _add_implications(raw_diff) return { "eid1": eid1, "eid2": eid2, "differences": differences, "identical": len(differences) == 0, "summary": ( f"发现 {len(differences)} 处差异" if differences else "两个 draw call 的 pipeline state 完全相同" ), } def analyze_render_passes() -> dict: """Auto-detect render pass boundaries and summarize each pass. Detects passes by Clear actions and output target changes. Returns a list of render passes with draw count, RT info, and event range. """ session = get_session() err = session.require_open() if err: return err sf = session.structured_file passes: list = [] current_pass: Optional[dict] = None last_outputs: Optional[tuple] = None for eid in sorted(session.action_map.keys()): action = session.action_map[eid] is_clear = bool(action.flags & rd.ActionFlags.Clear) is_draw = bool(action.flags & rd.ActionFlags.Drawcall) if not is_clear and not is_draw: continue outputs = tuple(str(o) for o in action.outputs if int(o) != 0) new_pass = False if is_clear: new_pass = True elif outputs and outputs != last_outputs: new_pass = True if new_pass: if current_pass is not None: passes.append(current_pass) current_pass = { "pass_index": len(passes), "start_event": eid, "end_event": eid, "start_action": action.GetName(sf), "draw_count": 0, "clear_count": 0, "render_targets": list(outputs) if outputs else [], } if current_pass is None: current_pass = { "pass_index": 0, "start_event": eid, "end_event": eid, "start_action": action.GetName(sf), "draw_count": 0, "clear_count": 0, "render_targets": list(outputs) if outputs else [], } current_pass["end_event"] = eid if is_draw: current_pass["draw_count"] += 1 if is_clear: current_pass["clear_count"] += 1 if outputs: last_outputs = outputs for o in outputs: if o not in current_pass["render_targets"]: current_pass["render_targets"].append(o) if current_pass is not None: passes.append(current_pass) for p in passes: rt_info = [] for rid_str in p["render_targets"]: entry: dict = {"resource_id": rid_str} tex_desc = session.get_texture_desc(rid_str) if tex_desc is not None: entry["size"] = f"{tex_desc.width}x{tex_desc.height}" entry["format"] = str(tex_desc.format.Name()) rt_info.append(entry) p["render_target_info"] = rt_info return { "passes": passes, "total_passes": len(passes), } def _diff_dicts(d1: dict, d2: dict, path: str = "") -> dict: diff: dict = {} all_keys = set(d1.keys()) | set(d2.keys()) for key in all_keys: key_path = f"{path}.{key}" if path else key v1 = d1.get(key) v2 = d2.get(key) if v1 == v2: continue if isinstance(v1, dict) and isinstance(v2, dict): sub = _diff_dicts(v1, v2, key_path) if sub: diff[key] = sub elif isinstance(v1, list) and isinstance(v2, list): if v1 != v2: diff[key] = {"eid1": v1, "eid2": v2} else: diff[key] = {"eid1": v1, "eid2": v2} return diff def _implication_for(suffix: str, v1, v2) -> Optional[str]: s1, s2 = str(v1), str(v2) if suffix == "blend.enabled": detail = ( "透明度混合激活,颜色会与背景混合叠加" if v2 else "关闭 blend,输出将直接覆盖目标像素(不透明模式)" ) return f"Blend toggle: {s1}→{s2}。{detail}" if suffix == "blend.color_src": return f"颜色源混合因子: {s1}→{s2}。可能导致输出颜色亮度/透明度变化" if suffix == "blend.color_dst": return f"颜色目标混合因子: {s1}→{s2}。影响背景色在最终结果中的权重" if suffix == "blend.color_op": return f"颜色混合运算: {s1}→{s2}。运算方式改变可能导致颜色整体偏移" if suffix == "blend.alpha_src": return f"Alpha 源混合因子: {s1}→{s2}" if suffix == "depth.test": detail = ( "关闭后物体可能穿透其他几何体" if not v2 else "开启后需要确保深度 buffer 正确初始化" ) return f"深度测试: {s1}→{s2}。{detail}" if suffix == "depth.write": detail = ( "关闭后该 draw call 不更新深度 buffer,适用于透明物体" if not v2 else "开启后会更新深度值" ) return f"深度写入: {s1}→{s2}。{detail}" if suffix == "depth.func": return f"深度比较函数: {s1}→{s2}。可能导致物体遮挡关系或消失问题" if suffix == "stencil.enabled": return f"模板测试: {s1}→{s2}" if suffix == "rasterizer.cull": return ( f"剔除模式: {s1}→{s2}。可能影响背面/正面可见性,倒影 pass 通常需要反转 cull" ) if suffix == "rasterizer.front_ccw": return f"正面朝向: {s1}→{s2}。CCW/CW 切换后背面剔除方向翻转" if suffix == "topology": return f"图元拓扑: {s1}→{s2}" return None _IMPLICATION_SUFFIXES = [ "blend.enabled", "blend.color_src", "blend.color_dst", "blend.color_op", "blend.alpha_src", "depth.test", "depth.write", "depth.func", "stencil.enabled", "rasterizer.cull", "rasterizer.front_ccw", "topology", ] def _add_implications(diff: dict) -> list: results: list = [] def _flatten(d: dict, path: str): for key, val in d.items(): key_path = f"{path}.{key}" if path else key if isinstance(val, dict): if "eid1" in val or "eid2" in val: v1 = val.get("eid1") v2 = val.get("eid2") entry: dict = {"field": key_path, "eid1": v1, "eid2": v2} for suffix in _IMPLICATION_SUFFIXES: if key_path.endswith(suffix): imp = _implication_for(suffix, v1, v2) if imp: entry["implication"] = imp break results.append(entry) else: _flatten(val, key_path) _flatten(diff, "") return results