import argparse import csv import json import random from collections import Counter, defaultdict from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Tuple import matplotlib import matplotlib.pyplot as plt from matplotlib import font_manager def configure_fonts() -> None: candidates = ["Microsoft YaHei", "SimHei", "PingFang SC", "Noto Sans CJK SC", "Noto Sans CJK TC"] all_fonts = {f.name for f in font_manager.fontManager.ttflist} for font in candidates: if font in all_fonts: matplotlib.rcParams["font.sans-serif"] = [font] matplotlib.rcParams["axes.unicode_minus"] = False break configure_fonts() def safe_ratio(numerator: float, denominator: float) -> float: return numerator / denominator if denominator > 0 else 0.0 def clamp01(value: float) -> float: return max(0.0, min(1.0, value)) def lerp(a: float, b: float, t: float) -> float: return a + (b - a) * clamp01(t) def clamp_float(value: float, min_v: float, max_v: float) -> float: return max(min_v, min(max_v, value)) NETWORK_LABELS = ("wifi", "4g", "3g", "2g") NETWORK_COUNT = len(NETWORK_LABELS) def percentile_95(values: List[float]) -> float: if not values: return 0.0 sorted_values = sorted(values) idx = max(0, int(len(sorted_values) * 0.95) - 1) return float(sorted_values[idx]) @dataclass class Policy: ad_type: int scene: str base_probability: float preload_threshold: float cooldown_seconds: int min_samples_for_confidence: int decay_half_life_hours: float @dataclass class SceneProfile: ad_type: int scene: str true_request_probability: float fill_success_probability: float mean_load_ms: float per_session_enter_probability: float @dataclass class NetworkCondition: name: str request_factor: float fill_success_factor: float load_ms_factor: float @dataclass class RobotProfile: robot_id: str archetype: str enter_multipliers: Tuple[float, float, float] request_multipliers: Tuple[float, float, float] fill_affinity: float latency_affinity: float retention_affinity: float network_weights: Tuple[float, float, float, float] @dataclass class SceneState: ad_type: int scene: str policy: Policy enter_count: int = 0 play_request_count: int = 0 preload_request_count: int = 0 preload_success_count: int = 0 preload_failure_count: int = 0 show_request_count: int = 0 show_start_count: int = 0 show_failure_count: int = 0 last_updated_ts: float = 0.0 last_preload_ts: float = 0.0 preload_expire_ts: float = -1.0 preload_ready: bool = False preload_origin_network: int = -1 @dataclass class RunResult: mode: str retention: float threshold: float cooldown_seconds: int total_users: int total_sessions: int total_scene_entries: int total_show_requests: int total_preload_requests: int total_preload_success: int total_preload_failure: int total_show_success: int total_show_fail: int total_preload_waste: int immediate_play_count: int avg_wait_ms: float p95_wait_ms: float wasted_ratio: float preload_success_rate: float show_success_rate: float network_show_requests: Tuple[int, int, int, int] network_immediate_show: Tuple[int, int, int, int] network_preload_requests: Tuple[int, int, int, int] network_preload_success: Tuple[int, int, int, int] network_preload_waste: Tuple[int, int, int, int] network_avg_wait_ms: Tuple[float, float, float, float] network_p95_wait_ms: Tuple[float, float, float, float] def default_policy_by_ad_type() -> Dict[Tuple[int, str], Policy]: return { (0, "__default__"): Policy(0, "__default__", 0.08, 0.75, 120, 8, 72.0), # Reward (1, "__default__"): Policy(1, "__default__", 0.08, 0.75, 120, 8, 72.0), # Splash (2, "__default__"): Policy(2, "__default__", 0.08, 0.75, 120, 8, 72.0), # Interaction } def parse_policy_config(path: Path) -> Dict[Tuple[int, str], Policy]: if not path.exists(): return default_policy_by_ad_type() with path.open("r", encoding="utf-8") as f: payload = json.loads(f.read()) result: Dict[Tuple[int, str], Policy] = {} scene_items = payload.get("ScenePolicies", []) or [] for item in scene_items: ad_type = int(item.get("AdType", -1)) scene = str(item.get("Scene", "")).strip() if ad_type < 0 or not scene: continue result[(ad_type, scene)] = Policy( ad_type=ad_type, scene=scene, base_probability=float(item.get("BaseProbability", 0.08)), preload_threshold=float(item.get("PreloadThreshold", 0.75)), cooldown_seconds=int(item.get("CooldownSeconds", 120)), min_samples_for_confidence=max(1, int(item.get("MinSamplesForConfidence", 8))), decay_half_life_hours=max(1.0, float(item.get("DecayHalfLifeHours", 72.0))), ) default = payload.get("GlobalDefault", {}) if isinstance(payload, dict) else {} for ad_type in (0, 1, 2): result[(ad_type, "__default__")] = Policy( ad_type=ad_type, scene="__default__", base_probability=float(default.get("BaseProbability", 0.08)), preload_threshold=float(default.get("PreloadThreshold", 0.75)), cooldown_seconds=int(default.get("CooldownSeconds", 120)), min_samples_for_confidence=max(1, int(default.get("MinSamplesForConfidence", 8))), decay_half_life_hours=max(1.0, float(default.get("DecayHalfLifeHours", 72.0))), ) return result def estimate_probability(state: SceneState, now: float, policy: Policy) -> float: if state.enter_count <= 0: return policy.base_probability observed = safe_ratio(state.play_request_count, state.enter_count) trust = min(1.0, state.enter_count / max(1.0, policy.min_samples_for_confidence)) age_hours = max(0.0, (now - state.last_updated_ts) / 3600.0) decay = 0.5 ** (age_hours / max(1e-6, policy.decay_half_life_hours)) trust *= decay return lerp(policy.base_probability, observed, trust) def should_preload(state: SceneState, now: float, threshold: float, cooldown_seconds: int) -> bool: policy = state.policy if now - state.last_preload_ts < cooldown_seconds: return False if state.preload_ready and state.preload_expire_ts >= now: return False return estimate_probability(state, now, policy) >= threshold def apply_waste_decay(state: SceneState, now: float) -> Tuple[int, int]: if state.preload_ready and state.preload_expire_ts > 0 and now >= state.preload_expire_ts: state.preload_ready = False state.preload_expire_ts = -1 return 1, max(0, min(NETWORK_COUNT - 1, state.preload_origin_network)) return 0, -1 def policy_for_scene(policy_map: Dict[Tuple[int, str], Policy], ad_type: int, scene: str) -> Policy: return policy_map.get((ad_type, scene)) or policy_map.get((ad_type, "__default__")) or next( iter(default_policy_by_ad_type().values()) ) def pick_network( conditions: List[NetworkCondition], weights: Tuple[float, float, float, float], rng: random.Random, ) -> Tuple[int, NetworkCondition]: roll = rng.random() cum = 0.0 for idx, w in enumerate(weights): cum += max(0.0, w) if roll <= cum: i = min(idx, len(conditions) - 1) return i, conditions[i] return len(conditions) - 1, conditions[-1] def normalize_weights(values: Tuple[float, float, float, float], epsilon: float = 1e-6) -> Tuple[float, float, float, float]: total = sum(max(0.0, v) for v in values) if total <= epsilon: return (0.25, 0.25, 0.25, 0.25) return tuple(v / total for v in values) def generate_robot_profiles(users: int, seed: int) -> List[RobotProfile]: rng = random.Random(seed) # 4 类广告网络环境偏好:Wi-Fi, 4G, 3G, 2G archetypes = [ { "name": "reward_heavy", "weight": 0.25, "enter": (1.45, 0.82, 0.80), "request": (1.55, 0.76, 0.62), "fill_aff": 1.04, "latency_aff": 0.88, "retention_aff": 1.18, "network": (0.60, 0.28, 0.10, 0.02), }, { "name": "interstitial_focus", "weight": 0.22, "enter": (0.72, 1.48, 0.83), "request": (0.78, 1.57, 0.70), "fill_aff": 0.96, "latency_aff": 1.02, "retention_aff": 1.05, "network": (0.32, 0.42, 0.20, 0.06), }, { "name": "splash_driven", "weight": 0.22, "enter": (0.88, 0.92, 1.52), "request": (1.02, 1.03, 1.40), "fill_aff": 0.91, "latency_aff": 1.01, "retention_aff": 0.96, "network": (0.75, 0.18, 0.05, 0.02), }, { "name": "balanced", "weight": 0.20, "enter": (1.05, 1.12, 1.10), "request": (1.05, 1.00, 1.00), "fill_aff": 1.00, "latency_aff": 1.00, "retention_aff": 1.00, "network": (0.40, 0.32, 0.22, 0.06), }, { "name": "churn_sensitive", "weight": 0.11, "enter": (0.55, 0.64, 0.45), "request": (0.60, 0.55, 0.46), "fill_aff": 0.72, "latency_aff": 1.18, "retention_aff": 0.58, "network": (0.24, 0.26, 0.30, 0.20), }, { "name": "network_bound", "weight": 0.10, "enter": (0.85, 0.85, 0.80), "request": (0.88, 0.78, 0.72), "fill_aff": 0.68, "latency_aff": 1.22, "retention_aff": 0.82, "network": (0.12, 0.18, 0.30, 0.40), }, ] total_weight = sum(a["weight"] for a in archetypes) running = [] cur = 0.0 for a in archetypes: cur += a["weight"] / total_weight running.append((cur, a)) robots = [] for idx in range(users): roll = rng.random() picked = archetypes[-1] for threshold, arche in running: if roll <= threshold: picked = arche break def jitter(v: float, span: float) -> float: return clamp01(v + rng.uniform(-span, span)) enters = tuple(clamp_float(jitter(base, 0.24), 0.2, 1.9) for base in picked["enter"]) requests = tuple(clamp_float(jitter(base, 0.22), 0.15, 1.9) for base in picked["request"]) nw = [ max(0.01, jitter(weight, 0.15)) for weight in picked["network"] ] robots.append( RobotProfile( robot_id=f"bot_{idx:05d}", archetype=picked["name"], enter_multipliers=enters, request_multipliers=requests, fill_affinity=clamp_float(jitter(picked["fill_aff"], 0.16), 0.45, 1.55), latency_affinity=clamp_float(jitter(picked["latency_aff"], 0.2), 0.6, 1.8), retention_affinity=clamp_float(jitter(picked["retention_aff"], 0.12), 0.35, 1.45), network_weights=normalize_weights(tuple(nw)), ) ) return robots def simulate_once( profiles: List[SceneProfile], policy_map: Dict[Tuple[int, str], Policy], robots: List[RobotProfile], threshold: float, cooldown_seconds: int, retention_next_day: float, seed: int, smart_enabled: bool, mode: str, ) -> RunResult: behavior_rng = random.Random(seed) outcome_rng = random.Random(seed + 998244353) states: Dict[str, SceneState] = {} network_conditions = [ NetworkCondition("wifi", request_factor=1.0, fill_success_factor=1.0, load_ms_factor=0.85), NetworkCondition("4g", request_factor=0.96, fill_success_factor=0.98, load_ms_factor=1.08), NetworkCondition("3g", request_factor=0.90, fill_success_factor=0.86, load_ms_factor=1.45), NetworkCondition("2g", request_factor=0.78, fill_success_factor=0.73, load_ms_factor=2.00), ] def state_of(profile: SceneProfile, profile_index: int) -> SceneState: key = f"{profile.ad_type}|{profile.scene}" if key not in states: states[key] = SceneState( ad_type=profile.ad_type, scene=profile.scene, policy=policy_for_scene(policy_map, profile.ad_type, profile.scene), ) return states[key] total_wait_times: List[float] = [] network_wait_times: List[List[float]] = [[] for _ in range(NETWORK_COUNT)] total_preload_waste = 0 total_scene_entries = 0 total_show_requests = 0 total_show_success = 0 total_show_fail = 0 total_preload_requests = 0 total_preload_success = 0 total_preload_failure = 0 total_sessions = 0 immediate_play_count = 0 network_show_requests = [0, 0, 0, 0] network_immediate_show = [0, 0, 0, 0] network_preload_requests = [0, 0, 0, 0] network_preload_success = [0, 0, 0, 0] network_preload_waste = [0, 0, 0, 0] now = 0.0 for robot in robots: sessions = 1 retention_prob = clamp01(retention_next_day * robot.retention_affinity) while behavior_rng.random() < retention_prob: sessions += 1 total_sessions += sessions for session_index in range(sessions): now += 0.3 + behavior_rng.random() * 0.7 network_idx, network = pick_network(network_conditions, robot.network_weights, behavior_rng) for idx, profile in enumerate(profiles): state = state_of(profile, idx) now += behavior_rng.uniform(0.3, 2.0) total_scene_entries += 1 state.enter_count += 1 state.last_updated_ts = now waste, waste_network = apply_waste_decay(state, now) total_preload_waste += waste if waste > 0 and 0 <= waste_network < NETWORK_COUNT: network_preload_waste[waste_network] += waste enter_prob = clamp01( profile.per_session_enter_probability * robot.enter_multipliers[idx] * network.request_factor ) if behavior_rng.random() > enter_prob: continue if smart_enabled and should_preload(state, now, threshold, cooldown_seconds): total_preload_requests += 1 state.preload_request_count += 1 state.last_preload_ts = now state.last_updated_ts = now network_preload_requests[network_idx] += 1 fill_success = clamp01(profile.fill_success_probability * robot.fill_affinity * network.fill_success_factor) if outcome_rng.random() < fill_success: state.preload_success_count += 1 total_preload_success += 1 network_preload_success[network_idx] += 1 state.preload_ready = True state.preload_expire_ts = now + cooldown_seconds state.preload_origin_network = network_idx else: state.preload_failure_count += 1 total_preload_failure += 1 request_prob = clamp01(profile.true_request_probability * robot.request_multipliers[idx]) if behavior_rng.random() < request_prob: state.play_request_count += 1 state.show_request_count += 1 total_show_requests += 1 network_show_requests[network_idx] += 1 fill_success = clamp01(profile.fill_success_probability * robot.fill_affinity * network.fill_success_factor) if state.preload_ready and state.preload_expire_ts >= now: state.preload_ready = False state.show_start_count += 1 total_show_success += 1 immediate_play_count += 1 network_immediate_show[network_idx] += 1 total_wait_times.append(0.0) network_wait_times[network_idx].append(0.0) state.last_updated_ts = now else: load_ms = max( 250.0, outcome_rng.gauss( profile.mean_load_ms * robot.latency_affinity * network.load_ms_factor, 220.0, ), ) if outcome_rng.random() < fill_success: state.show_start_count += 1 total_show_success += 1 else: state.show_failure_count += 1 total_show_fail += 1 total_wait_times.append(load_ms) network_wait_times[network_idx].append(load_ms) state.last_updated_ts = now # 每个 session 间停留(含加载、浏览等) now += behavior_rng.uniform(8.0, 16.0) # 网络状态抖动:同一用户跨会话可变更 if session_index % 2 == 1 and outcome_rng.random() < 0.08: now += 0.0 avg_wait = safe_ratio(sum(total_wait_times), len(total_wait_times)) p95_wait = percentile_95(total_wait_times) total_requests = max(1, total_show_requests) total_preloads = max(1, total_preload_requests) network_avg_wait = [] network_p95_wait = [] for nw_idx in range(NETWORK_COUNT): network_avg_wait.append(safe_ratio(sum(network_wait_times[nw_idx]), max(1, len(network_wait_times[nw_idx])))) network_p95_wait.append(percentile_95(network_wait_times[nw_idx])) return RunResult( mode=mode, retention=retention_next_day, threshold=threshold, cooldown_seconds=cooldown_seconds, total_users=len(robots), total_sessions=total_sessions, total_scene_entries=total_scene_entries, total_show_requests=total_show_requests, total_preload_requests=total_preload_requests, total_preload_success=total_preload_success, total_preload_failure=total_preload_failure, total_show_success=total_show_success, total_show_fail=total_show_fail, total_preload_waste=total_preload_waste, immediate_play_count=immediate_play_count, avg_wait_ms=avg_wait, p95_wait_ms=p95_wait, wasted_ratio=safe_ratio(total_preload_waste, total_preloads), preload_success_rate=safe_ratio(total_preload_success, total_preloads), show_success_rate=safe_ratio(total_show_success, total_requests), network_show_requests=tuple(network_show_requests), network_immediate_show=tuple(network_immediate_show), network_preload_requests=tuple(network_preload_requests), network_preload_success=tuple(network_preload_success), network_preload_waste=tuple(network_preload_waste), network_avg_wait_ms=(network_avg_wait[0], network_avg_wait[1], network_avg_wait[2], network_avg_wait[3]), network_p95_wait_ms=(network_p95_wait[0], network_p95_wait[1], network_p95_wait[2], network_p95_wait[3]), ) def run_sensitivity( profiles: List[SceneProfile], policy_map: Dict[Tuple[int, str], Policy], robots: List[RobotProfile], threshold_grid: List[float], cooldown_grid: List[int], retention_rates: List[float], out_dir: Path, mode_manual_seed: int, ) -> Tuple[List[RunResult], Dict[float, RunResult]]: baseline_by_retention: Dict[float, RunResult] = {} smart_results: List[RunResult] = [] for ridx, retention in enumerate(retention_rates): baseline_by_retention[retention] = simulate_once( profiles=profiles, policy_map=policy_map, robots=robots, threshold=1.0, cooldown_seconds=max(cooldown_grid), retention_next_day=retention, seed=mode_manual_seed + ridx * 1000, smart_enabled=False, mode="manual", ) for ridx, retention in enumerate(retention_rates): for tidx, threshold in enumerate(threshold_grid): for cidx, cooldown in enumerate(cooldown_grid): smart_results.append( simulate_once( profiles=profiles, policy_map=policy_map, robots=robots, threshold=threshold, cooldown_seconds=cooldown, retention_next_day=retention, seed=mode_manual_seed + 8000 + ridx * 1000 + tidx * 37 + cidx, smart_enabled=True, mode="smart", ) ) out_dir.mkdir(parents=True, exist_ok=True) summary_csv = out_dir / "smartload_sensitivity_summary.csv" with summary_csv.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow( [ "mode", "retention", "threshold", "cooldown", "total_show_requests", "total_show_success", "total_show_fail", "immediate_show", "avg_wait_ms", "p95_wait_ms", "total_preload_requests", "preload_success", "preload_fail", "preload_waste", "show_success_rate", "preload_success_rate", "wasted_ratio", "baseline_immediate_show", "baseline_avg_wait_ms", "delta_immediate_show", "delta_wait_ms", "delta_wait_pct", "delta_show_success", ] ) for retention in retention_rates: manual = baseline_by_retention[retention] m_immediate = safe_ratio(manual.immediate_play_count, manual.total_show_requests) w.writerow( [ "manual", f"{retention:.4f}", "1.00", max(cooldown_grid), manual.total_show_requests, manual.total_show_success, manual.total_show_fail, f"{m_immediate:.6f}", f"{manual.avg_wait_ms:.2f}", f"{manual.p95_wait_ms:.2f}", manual.total_preload_requests, manual.total_preload_success, manual.total_preload_failure, f"{manual.total_preload_waste}", f"{manual.show_success_rate:.6f}", f"{manual.preload_success_rate:.6f}", f"{manual.wasted_ratio:.6f}", f"{m_immediate:.6f}", f"{manual.avg_wait_ms:.2f}", f"{0:.6f}", f"{0:.2f}", f"{0:.6f}", f"{0:.6f}", ] ) for r in smart_results: b = baseline_by_retention[r.retention] immediate = safe_ratio(r.immediate_play_count, r.total_show_requests) baseline_immediate = safe_ratio(b.immediate_play_count, b.total_show_requests) delta_immediate = immediate - baseline_immediate delta_wait = r.avg_wait_ms - b.avg_wait_ms delta_wait_pct = safe_ratio(delta_wait, b.avg_wait_ms) if b.avg_wait_ms > 0 else 0.0 w.writerow( [ r.mode, f"{r.retention:.4f}", f"{r.threshold:.4f}", r.cooldown_seconds, r.total_show_requests, r.total_show_success, r.total_show_fail, f"{immediate:.6f}", f"{r.avg_wait_ms:.2f}", f"{r.p95_wait_ms:.2f}", r.total_preload_requests, r.total_preload_success, r.total_preload_failure, r.total_preload_waste, f"{r.show_success_rate:.6f}", f"{r.preload_success_rate:.6f}", f"{r.wasted_ratio:.6f}", f"{baseline_immediate:.6f}", f"{b.avg_wait_ms:.2f}", f"{delta_immediate:.6f}", f"{delta_wait:.2f}", f"{delta_wait_pct:.6f}", f"{(r.show_success_rate - b.show_success_rate):.6f}", ] ) rank_csv = write_rank_csv(smart_results, baseline_by_retention, out_dir) write_mode_comparison_csv(baseline_by_retention, smart_results, out_dir) write_robot_summary_csv(robots, out_dir) write_mode_lines(baseline_by_retention, smart_results, out_dir) write_network_mode_comparison_csv(baseline_by_retention, smart_results, out_dir) draw_network_mode_lines(baseline_by_retention, smart_results, out_dir) return smart_results, baseline_by_retention def write_rank_csv(results: List[RunResult], baseline_by_retention: Dict[float, RunResult], out_dir: Path) -> Path: rank_csv = out_dir / "smartload_retention_rank.csv" with rank_csv.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow( [ "retention", "rank", "threshold", "cooldown", "immediate_show", "avg_wait_ms", "p95_wait_ms", "waste_ratio", "show_success_rate", "delta_immediate", "delta_wait_ms", "delta_wait_pct", "objective_score", ] ) for retention in sorted(baseline_by_retention.keys()): rows = [] for r in [x for x in results if x.retention == retention]: b = baseline_by_retention[retention] immediate = safe_ratio(r.immediate_play_count, r.total_show_requests) baseline_immediate = safe_ratio(b.immediate_play_count, b.total_show_requests) delta_immediate = immediate - baseline_immediate delta_wait = r.avg_wait_ms - b.avg_wait_ms score = 0.7 * delta_immediate + 0.2 * safe_ratio((1 - r.wasted_ratio), 1.0) - 0.001 * r.avg_wait_ms / 10.0 rows.append((score, r)) rows.sort(key=lambda x: x[0], reverse=True) for idx, (_, r) in enumerate(rows, start=1): immediate = safe_ratio(r.immediate_play_count, r.total_show_requests) b = baseline_by_retention[retention] delta_immediate = immediate - safe_ratio(b.immediate_play_count, b.total_show_requests) delta_wait = r.avg_wait_ms - b.avg_wait_ms delta_wait_pct = safe_ratio(delta_wait, b.avg_wait_ms) if b.avg_wait_ms > 0 else 0.0 score = 0.7 * delta_immediate + 0.2 * (1 - r.wasted_ratio) - 0.001 * r.avg_wait_ms / 10.0 w.writerow( [ f"{retention:.2f}", idx, f"{r.threshold:.2f}", r.cooldown_seconds, f"{immediate:.6f}", f"{r.avg_wait_ms:.2f}", f"{r.p95_wait_ms:.2f}", f"{r.wasted_ratio:.6f}", f"{r.show_success_rate:.6f}", f"{delta_immediate:.6f}", f"{delta_wait:.2f}", f"{delta_wait_pct:.6f}", f"{score:.6f}", ] ) return rank_csv def write_mode_comparison_csv( baseline_by_retention: Dict[float, RunResult], smart_results: List[RunResult], out_dir: Path, ) -> Path: by_retention = defaultdict(list) for result in smart_results: by_retention[result.retention].append(result) out = out_dir / "smartload_mode_comparison.csv" with out.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow( [ "retention", "manual_immediate", "manual_avg_wait_ms", "manual_waste_ratio", "manual_show_success_rate", "smart_threshold", "smart_cooldown", "smart_immediate", "smart_avg_wait_ms", "smart_waste_ratio", "smart_show_success_rate", "delta_immediate", "delta_wait_ms", ] ) for retention, b in sorted(baseline_by_retention.items()): manual_immediate = safe_ratio(b.immediate_play_count, b.total_show_requests) candidates = by_retention[retention] if not candidates: continue best = max(candidates, key=lambda item: safe_ratio(item.immediate_play_count, item.total_show_requests)) smart_immediate = safe_ratio(best.immediate_play_count, best.total_show_requests) w.writerow( [ f"{retention:.2f}", f"{manual_immediate:.6f}", f"{b.avg_wait_ms:.2f}", f"{b.wasted_ratio:.6f}", f"{b.show_success_rate:.6f}", f"{best.threshold:.2f}", best.cooldown_seconds, f"{smart_immediate:.6f}", f"{best.avg_wait_ms:.2f}", f"{best.wasted_ratio:.6f}", f"{best.show_success_rate:.6f}", f"{smart_immediate - manual_immediate:.6f}", f"{best.avg_wait_ms - b.avg_wait_ms:.2f}", ] ) return out def write_network_mode_comparison_csv( baseline_by_retention: Dict[float, RunResult], smart_results: List[RunResult], out_dir: Path, ) -> Path: by_retention = defaultdict(list) for result in smart_results: by_retention[result.retention].append(result) out = out_dir / "smartload_network_mode_comparison.csv" with out.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow( [ "retention", "network", "manual_show_requests", "manual_immediate", "manual_avg_wait_ms", "manual_p95_wait_ms", "manual_waste_ratio", "manual_preload_success", "smart_threshold", "smart_cooldown", "smart_show_requests", "smart_immediate", "smart_avg_wait_ms", "smart_p95_wait_ms", "smart_waste_ratio", "smart_preload_success", "delta_immediate", "delta_wait_ms", ] ) for retention, b in sorted(baseline_by_retention.items()): candidates = by_retention[retention] for idx, network_name in enumerate(NETWORK_LABELS): manual_immediate = safe_ratio(b.network_immediate_show[idx], b.network_show_requests[idx]) manual_wait = b.network_avg_wait_ms[idx] manual_p95 = b.network_p95_wait_ms[idx] manual_waste = safe_ratio(b.network_preload_waste[idx], max(1, b.network_preload_requests[idx])) manual_preload_success = safe_ratio(b.network_preload_success[idx], max(1, b.network_preload_requests[idx])) best = None if candidates: best = max(candidates, key=lambda item: safe_ratio(item.network_immediate_show[idx], item.network_show_requests[idx])) if best is None: w.writerow( [ f"{retention:.2f}", network_name, b.network_show_requests[idx], f"{manual_immediate:.6f}", f"{manual_wait:.2f}", f"{manual_p95:.2f}", f"{manual_waste:.6f}", f"{manual_preload_success:.6f}", "N/A", "N/A", 0, f"{0:.6f}", f"{0:.2f}", f"{0:.2f}", f"{0:.6f}", f"{0:.6f}", f"{0:.6f}", f"{0:.2f}", ] ) continue smart_immediate = safe_ratio(best.network_immediate_show[idx], best.network_show_requests[idx]) smart_wait = best.network_avg_wait_ms[idx] smart_p95 = best.network_p95_wait_ms[idx] smart_waste = safe_ratio(best.network_preload_waste[idx], max(1, best.network_preload_requests[idx])) smart_preload_success = safe_ratio(best.network_preload_success[idx], max(1, best.network_preload_requests[idx])) w.writerow( [ f"{retention:.2f}", network_name, b.network_show_requests[idx], f"{manual_immediate:.6f}", f"{manual_wait:.2f}", f"{manual_p95:.2f}", f"{manual_waste:.6f}", f"{manual_preload_success:.6f}", f"{best.threshold:.2f}", best.cooldown_seconds, best.network_show_requests[idx], f"{smart_immediate:.6f}", f"{smart_wait:.2f}", f"{smart_p95:.2f}", f"{smart_waste:.6f}", f"{smart_preload_success:.6f}", f"{smart_immediate - manual_immediate:.6f}", f"{smart_wait - manual_wait:.2f}", ] ) return out def write_robot_summary_csv(robots: List[RobotProfile], out_dir: Path) -> Path: summary_csv = out_dir / "smartload_robot_cohorts.csv" with summary_csv.open("w", encoding="utf-8", newline="") as f: w = csv.writer(f) w.writerow( [ "robot_id", "archetype", "enter_reward", "enter_interstitial", "enter_splash", "request_reward", "request_interstitial", "request_splash", "fill_affinity", "latency_affinity", "retention_affinity", "network_wifi", "network_4g", "network_3g", "network_2g", ] ) for robot in robots: w.writerow( [ robot.robot_id, robot.archetype, f"{robot.enter_multipliers[0]:.4f}", f"{robot.enter_multipliers[1]:.4f}", f"{robot.enter_multipliers[2]:.4f}", f"{robot.request_multipliers[0]:.4f}", f"{robot.request_multipliers[1]:.4f}", f"{robot.request_multipliers[2]:.4f}", f"{robot.fill_affinity:.4f}", f"{robot.latency_affinity:.4f}", f"{robot.retention_affinity:.4f}", f"{robot.network_weights[0]:.6f}", f"{robot.network_weights[1]:.6f}", f"{robot.network_weights[2]:.6f}", f"{robot.network_weights[3]:.6f}", ] ) # 一页式摘要也可用于报告快速读取。 summary_text = out_dir / "smartload_robot_cohort_summary.txt" counter = Counter(robot.archetype for robot in robots) with summary_text.open("w", encoding="utf-8") as out: out.write("Robot Cohort Summary\n") out.write("===================\n") for name, count in counter.most_common(): out.write(f"{name}: {count}\n") return summary_csv def draw_heatmaps(results: List[RunResult], retention: float, out_dir: Path): filtered = [r for r in results if abs(r.retention - retention) < 1e-6] if not filtered: return thresholds = sorted({r.threshold for r in filtered}) cooldowns = sorted({r.cooldown_seconds for r in filtered}) index = {(r.threshold, r.cooldown_seconds): r for r in filtered} def matrix(metric): data = [[0.0 for _ in cooldowns] for _ in thresholds] for ti, t in enumerate(thresholds): for ci, c in enumerate(cooldowns): r = index[(t, c)] if metric == "immediate": data[ti][ci] = safe_ratio(r.immediate_play_count, r.total_show_requests) elif metric == "wait": data[ti][ci] = r.avg_wait_ms elif metric == "waste": data[ti][ci] = r.wasted_ratio elif metric == "preload": data[ti][ci] = r.preload_success_rate return data def plot(title, data, fname, label): plt.figure(figsize=(8, 6)) im = plt.imshow(data, origin="lower", aspect="auto", cmap="viridis") plt.yticks(range(len(thresholds)), [f"{t:.2f}" for t in thresholds]) plt.xticks(range(len(cooldowns)), [str(c) for c in cooldowns]) plt.xlabel("冷却时间(秒)") plt.ylabel("预加载阈值") plt.title(title) cbar = plt.colorbar(im) cbar.set_label(label) plt.tight_layout() plt.savefig(out_dir / fname, dpi=150) plt.close() plot(f"智能预加载即时命中率(次留={retention:.0%})", matrix("immediate"), f"heatmap_immediate_r_{retention:.2f}.png", "即时命中率") plot(f"智能预加载平均等待(次留={retention:.0%})", matrix("wait"), f"heatmap_wait_ms_r_{retention:.2f}.png", "平均等待(ms)") plot(f"智能预加载浪费比例(次留={retention:.0%})", matrix("waste"), f"heatmap_waste_ratio_r_{retention:.2f}.png", "浪费比例") plot(f"智能预加载预加载成功率(次留={retention:.0%})", matrix("preload"), f"heatmap_preload_success_r_{retention:.2f}.png", "预加载成功率") def draw_retention_lines(results: List[RunResult], out_dir: Path): thresholds = sorted({r.threshold for r in results}) cooldowns = sorted({r.cooldown_seconds for r in results}) retentions = sorted({r.retention for r in results}) baseline_threshold = thresholds[len(thresholds) // 2] plt.figure(figsize=(10, 6)) for c in cooldowns: xs = [] ys = [] for r in sorted([x for x in results if x.cooldown_seconds == c and x.threshold == baseline_threshold], key=lambda x: x.retention): xs.append(r.retention) ys.append(safe_ratio(r.immediate_play_count, r.total_show_requests)) plt.plot(xs, ys, marker="o", label=f"cd={c}s") plt.title("敏感度:次留与即时命中率") plt.xlabel("次留(D2)") plt.ylabel("即时命中率") plt.grid(True) plt.legend(fontsize=8) plt.tight_layout() plt.savefig(out_dir / "line_immediate_vs_retention.png", dpi=150) plt.close() plt.figure(figsize=(10, 6)) for c in cooldowns: xs = [] ys = [] for r in sorted([x for x in results if x.cooldown_seconds == c and x.threshold == baseline_threshold], key=lambda x: x.retention): xs.append(r.retention) ys.append(r.avg_wait_ms) plt.plot(xs, ys, marker="s", label=f"cd={c}s") plt.title("敏感度:次留与平均等待") plt.xlabel("次留(D2)") plt.ylabel("平均等待(ms)") plt.grid(True) plt.legend(fontsize=8) plt.tight_layout() plt.savefig(out_dir / "line_wait_vs_retention.png", dpi=150) plt.close() for retention in retentions: same_r = sorted([x for x in results if abs(x.retention - retention) < 1e-6 and x.threshold == baseline_threshold], key=lambda x: x.cooldown_seconds) if not same_r: continue fig, ax1 = plt.subplots(figsize=(10, 6)) ax2 = ax1.twinx() cooldown_values = [x.cooldown_seconds for x in same_r] immediate = [safe_ratio(x.immediate_play_count, x.total_show_requests) for x in same_r] waste = [x.wasted_ratio for x in same_r] ax1.plot(cooldown_values, immediate, marker="o", label="即时命中率") ax1.set_xlabel("冷却时间(秒)") ax1.set_ylabel("即时命中率", color="#1f77b4") ax2.plot(cooldown_values, waste, marker="x", color="#F28D35", label="浪费比例") ax2.set_ylabel("浪费比例", color="#F28D35") ax1.set_title(f"次留={retention:.0%},阈值={baseline_threshold:.2f}") ax1.grid(True) ax1.set_xlim(left=min(cooldown_values), right=max(cooldown_values)) fig.tight_layout() fig.savefig(out_dir / f"line_threshold_{baseline_threshold:.2f}_retention_{retention:.2f}.png", dpi=150) plt.close(fig) def write_mode_lines( baseline_by_retention: Dict[float, RunResult], smart_results: List[RunResult], out_dir: Path, ) -> None: best_by_retention: Dict[float, RunResult] = {} for retention, baseline in baseline_by_retention.items(): candidates = [r for r in smart_results if r.retention == retention] if not candidates: continue best = max(candidates, key=lambda x: safe_ratio(x.immediate_play_count, x.total_show_requests)) best_by_retention[retention] = best retentions = sorted(baseline_by_retention.keys()) plt.figure(figsize=(10, 6)) xs = [r for r in retentions] ys_manual = [safe_ratio(baseline_by_retention[r].immediate_play_count, baseline_by_retention[r].total_show_requests) for r in retentions] ys_smart = [safe_ratio(best_by_retention[r].immediate_play_count, best_by_retention[r].total_show_requests) for r in retentions] plt.plot(xs, ys_manual, marker="o", label="手动(无智能)") plt.plot(xs, ys_smart, marker="o", label="智能(最优配置)") plt.title("模式对比:次留与即时播放率") plt.xlabel("次留(D2)") plt.ylabel("即时播放率") plt.grid(True) plt.legend() plt.tight_layout() plt.savefig(out_dir / "line_mode_immediate_vs_retention.png", dpi=150) plt.close() plt.figure(figsize=(10, 6)) ys_manual_wait = [baseline_by_retention[r].avg_wait_ms for r in retentions] ys_smart_wait = [best_by_retention[r].avg_wait_ms for r in retentions] plt.plot(xs, ys_manual_wait, marker="s", label="手动(无智能)") plt.plot(xs, ys_smart_wait, marker="s", label="智能(最优配置)") plt.title("模式对比:次留与平均等待") plt.xlabel("次留(D2)") plt.ylabel("平均等待(ms)") plt.grid(True) plt.legend() plt.tight_layout() plt.savefig(out_dir / "line_mode_wait_vs_retention.png", dpi=150) plt.close() def draw_network_mode_lines( baseline_by_retention: Dict[float, RunResult], smart_results: List[RunResult], out_dir: Path, ) -> None: by_retention = defaultdict(list) for result in smart_results: by_retention[result.retention].append(result) retentions = sorted(baseline_by_retention.keys()) plt.figure(figsize=(11, 7)) for idx, network_name in enumerate(NETWORK_LABELS): xs = [] ys_manual = [] ys_smart = [] for retention in retentions: b = baseline_by_retention[retention] xs.append(retention) ys_manual.append( safe_ratio( b.network_immediate_show[idx], b.network_show_requests[idx], ) ) candidates = by_retention[retention] if candidates: best = max( candidates, key=lambda item: safe_ratio(item.network_immediate_show[idx], item.network_show_requests[idx]), ) ys_smart.append( safe_ratio( best.network_immediate_show[idx], best.network_show_requests[idx], ) ) else: ys_smart.append(0.0) plt.plot(xs, ys_manual, marker="o", label=f"{network_name.upper()} - 手动") plt.plot(xs, ys_smart, marker="s", linestyle="--", label=f"{network_name.upper()} - 智能") plt.title("网络分层即时命中率:次留对比(D2)") plt.xlabel("次留(D2)") plt.ylabel("即时命中率") plt.grid(True) plt.legend(fontsize=8) plt.tight_layout() plt.savefig(out_dir / "line_network_immediate_vs_retention.png", dpi=150) plt.close() plt.figure(figsize=(11, 7)) for idx, network_name in enumerate(NETWORK_LABELS): xs = [] ys_manual = [] ys_smart = [] for retention in retentions: b = baseline_by_retention[retention] xs.append(retention) ys_manual.append(b.network_avg_wait_ms[idx]) candidates = by_retention[retention] if candidates: best = max( candidates, key=lambda item: safe_ratio(item.network_immediate_show[idx], item.network_show_requests[idx]), ) ys_smart.append(best.network_avg_wait_ms[idx]) else: ys_smart.append(0.0) plt.plot(xs, ys_manual, marker="o", label=f"{network_name.upper()} - 手动") plt.plot(xs, ys_smart, marker="s", linestyle="--", label=f"{network_name.upper()} - 智能") plt.title("网络分层平均等待:次留对比(D2)") plt.xlabel("次留(D2)") plt.ylabel("平均等待(ms)") plt.grid(True) plt.legend(fontsize=8) plt.tight_layout() plt.savefig(out_dir / "line_network_wait_vs_retention.png", dpi=150) plt.close() def build_report( results: List[RunResult], baseline_by_retention: Dict[float, RunResult], out_dir: Path, robot_count: int, baseline_retention_default: float = 0.35, ) -> Path: report = out_dir / "TapADN_智能预加载_敏感度验收报告.md" ranked: Dict[float, List[RunResult]] = defaultdict(list) for r in results: ranked[r.retention].append(r) ranked_sorted: Dict[float, List[RunResult]] = {} for retention, rows in ranked.items(): ranked_sorted[retention] = sorted(rows, key=lambda item: safe_ratio(item.immediate_play_count, item.total_show_requests), reverse=True) def pick_balanced(rows: List[RunResult], waste_cap: float = 0.12) -> RunResult | None: filtered = [r for r in rows if r.wasted_ratio <= waste_cap] if not filtered: return None return max(filtered, key=lambda item: safe_ratio(item.immediate_play_count, item.total_show_requests)) def format_table_rows(rows: List[RunResult], count: int = 8) -> List[str]: lines = [] for idx, r in enumerate(rows[:count], 1): immediate = safe_ratio(r.immediate_play_count, r.total_show_requests) lines.append( f"| {idx} | {r.threshold:.2f} | {r.cooldown_seconds} | {immediate:.2%} | {r.avg_wait_ms:.0f} | {r.p95_wait_ms:.0f} | {r.wasted_ratio:.2%} | {r.show_success_rate:.2%} |" ) return lines best_retention_map: Dict[float, RunResult] = {} for retention, rows in ranked_sorted.items(): best_retention_map[retention] = rows[0] default_baseline = baseline_by_retention[baseline_retention_default] default_rows = ranked_sorted[baseline_retention_default] best_default = default_rows[0] balanced_default = pick_balanced(default_rows, waste_cap=0.12) b_immediate = safe_ratio(default_baseline.immediate_play_count, default_baseline.total_show_requests) with report.open("w", encoding="utf-8") as f: f.write("# TapADN 智能预加载策略验收报告(含随机偏好机器人 + 网络环境)\n\n") f.write("## 仿真前提\n") f.write("- 模型定位:IAA 场景中 3 类广告位(激励/开屏/插屏)统一参与策略决策。\n") f.write("- 次留默认验收基线:35%\n") f.write(f"- 随机机器人数量:{robot_count}\n") f.write("- 场景进入->展示请求->预加载决策->展示时延为核心链路。\n") f.write("- 每次样本运行前先生成一批“偏好机器人”,再在其上分别运行:纯手动(无 smart)与智能预加载两种模式。\n") f.write("- 预加载触发后在 cooldown 内有效一次,不命中将视作普通 load。\n") f.write("- `Immediate`:请求时已命中可直接播放的占比(值越高越好)\n") f.write("- `Waste`:预加载后在 cooldown 内未被消费即失效的比例(值越低越好)\n") f.write("- 网络环境:Wi-Fi / 4G / 3G / 2G 按机器人偏好加权采样,逐回合独立变化。\n\n") f.write("## 机器人与网络设置\n") f.write("- 机器人类型:随机采样以下偏好族(reward_heavy、interstitial_focus、splash_driven、balanced、churn_sensitive、network_bound)。\n") f.write("- 每类机器人具有独立的场景进入偏好、请求偏好、fill 成功倍率、加载时延倍率和留存倍率。\n") f.write("- 网络环境以会话粒度采样,低网速会同步影响网络请求率、fill 成功率和加载耗时。\n\n") f.write("## 参数扫描范围\n") f.write("- Threshold: `0.20~0.90` 步长 0.05\n") f.write("- Cooldown: `30,60,90,120,180,240,300`\n") f.write("- 次留:`20%,25%,30%,35%,40%,50%,60%`\n\n") f.write("## 基线对照\n") f.write(f"- 次留 35% 基线(纯手动,无 smart)即时命中率:{b_immediate:.2%}\n") f.write(f"- 次留 35% 基线平均时延:{default_baseline.avg_wait_ms:.0f} ms(p95 {default_baseline.p95_wait_ms:.0f})\n") f.write(f"- 次留 35% 建议起点(当前模型):`threshold={best_default.threshold:.2f}, cooldown={best_default.cooldown_seconds}s`\n") f.write( f" - 对比基线即时命中提升:`{safe_ratio(best_default.immediate_play_count, best_default.total_show_requests)-b_immediate:.2%}`\n" ) f.write(f" - 对比基线时延变化:`{best_default.avg_wait_ms - default_baseline.avg_wait_ms:.0f} ms`\n") if balanced_default is not None: f.write("- 平衡候选(Waste<=12%):\n") f.write( f" - threshold={balanced_default.threshold:.2f}, cd={balanced_default.cooldown_seconds}s," f"Immediate {safe_ratio(balanced_default.immediate_play_count, balanced_default.total_show_requests):.2%}," f"Waste {balanced_default.wasted_ratio:.2%}\n" ) f.write("\n## 最优/最差(次留=35%)\n") f.write("| 排名 | Threshold | Cooldown | 即时命中率 | 平均等待 | P95等待 | 浪费率 | 播放成功率 |\n") f.write("|---|---:|---:|---:|---:|---:|---:|---:|\n") f.writelines([line + "\n" for line in format_table_rows(default_rows)]) f.write("\n### 最差 8 组\n") f.write("| 排名 | Threshold | Cooldown | 即时命中率 | 平均等待 | P95等待 | 浪费率 | 播放成功率 |\n") f.write("|---|---:|---:|---:|---:|---:|---:|---:|\n") f.writelines([line + "\n" for line in format_table_rows(list(reversed(default_rows)), count=8)]) f.write("\n## 与纯手动模式对比(次留35%)\n") best_35 = best_retention_map[0.35] f.write("- 纯手动:") f.write(f"Immediate `{safe_ratio(default_baseline.immediate_play_count, default_baseline.total_show_requests):.2%}`,") f.write(f"AvgWait `{default_baseline.avg_wait_ms:.0f}`ms,Waste `{default_baseline.wasted_ratio:.2%}`。\n") f.write("- smart 最优:") f.write(f"阈值 `{best_35.threshold:.2f}`,cd `{best_35.cooldown_seconds}`,") f.write(f"Immediate `{safe_ratio(best_35.immediate_play_count, best_35.total_show_requests):.2%}`,") f.write(f"AvgWait `{best_35.avg_wait_ms:.0f}`ms,Waste `{best_35.wasted_ratio:.2%}`。\n") f.write(f"- 增益:`{safe_ratio(best_35.immediate_play_count, best_35.total_show_requests)-b_immediate:.2%}`。 \n\n") f.write("### 网络分层对比(次留35%)\n") for idx, network_name in enumerate(NETWORK_LABELS): candidates = [r for r in results if r.retention == baseline_retention_default] best_net = max( candidates, key=lambda item: safe_ratio(item.network_immediate_show[idx], item.network_show_requests[idx]), ) manual_immediate = safe_ratio(default_baseline.network_immediate_show[idx], default_baseline.network_show_requests[idx]) manual_wait = default_baseline.network_avg_wait_ms[idx] smart_immediate = safe_ratio(best_net.network_immediate_show[idx], best_net.network_show_requests[idx]) smart_wait = best_net.network_avg_wait_ms[idx] f.write( f"- {network_name.upper()}:手动即时 `{manual_immediate:.2%}` / 等待 `{manual_wait:.0f}ms`;" ) f.write( f"智能(`threshold={best_net.threshold:.2f}, cd={best_net.cooldown_seconds}s`)即时 `{smart_immediate:.2%}` / 等待 `{smart_wait:.0f}ms`;" ) f.write(f"提升 `{smart_immediate-manual_immediate:.2%}`。\n") f.write("## 次留敏感度(35%基线)\n") for retention in sorted(ranked_sorted.keys()): best = ranked_sorted[retention][0] worst = ranked_sorted[retention][-1] b = baseline_by_retention[retention] f.write( f"- 次留 {retention:.0%}:手动基线时延 {b.avg_wait_ms:.0f}ms,智能最佳 `threshold={best.threshold:.2f}, cd={best.cooldown_seconds}s`," ) f.write( f"最佳 Immediate {safe_ratio(best.immediate_play_count, best.total_show_requests):.2%},Waste {best.wasted_ratio:.2%};" ) f.write( f"最差 `threshold={worst.threshold:.2f}, cd={worst.cooldown_seconds}s`," f"Immediate {safe_ratio(worst.immediate_play_count, worst.total_show_requests):.2%},Waste {worst.wasted_ratio:.2%}。\n" ) f.write("\n## 交叉参数观察\n") f.write("- 在同一保留率下,阈值下调能显著提高 `Immediate`,但通常也抬高 `Waste`。\n") f.write("- cooldown 拉长可降低 waste(减少重复预加载/空耗),但可能提高用户等待。\n") f.write("- 在弱网(尤其低分配机器人更多时)场景,建议提高 waste 上限约束后再考虑 lower threshold。\n\n") f.write("## 图表示例(输出文件)\n") for p in sorted(out_dir.glob("*.png")): f.write(f"- `{p.name}`\n") f.write(f" ![]({p.name})\n\n") f.write("## 验收结论\n") if balanced_default is not None: f.write( f"- 建议首轮灰度点:`threshold={balanced_default.threshold:.2f}`, `cooldown={balanced_default.cooldown_seconds}s`(兼顾即时性与 waste)。\n" ) else: f.write(f"- 建议首轮灰度点:`threshold={best_default.threshold:.2f}`, `cooldown={best_default.cooldown_seconds}s`(当前样本未出现满足 12% Waste 约束的点)。\n") f.write(f"- 次留 35% 下智能策略最佳立即命中来自 `threshold={best_default.threshold:.2f}, cd={best_default.cooldown_seconds}s`,即时率 `{safe_ratio(best_default.immediate_play_count, best_default.total_show_requests):.2%}`,Waste `{best_default.wasted_ratio:.2%}`。\n") f.write("- 建议将 `smart_preload` 配置作为实验变量:先以纯手动为对照,再按次留分层和网络监控逐步放量。\n") return report def main() -> None: parser = argparse.ArgumentParser(description="TapADN SmartLoad sensitivity simulation") parser.add_argument("--users", type=int, default=3000) parser.add_argument("--policy-json", default=str(Path("Assets/Tapadn_Adapter/Runtime/Resources/TapadnSmartLoadPolicy_Default.json"))) parser.add_argument("--out-dir", default="Tools/SmartLoadSensitivity/output") parser.add_argument("--robot-seed", type=int, default=20260604) parser.add_argument("--base-seed", type=int, default=1000) parser.add_argument("--include-line-mode", action="store_true") args = parser.parse_args() policy_path = Path(args.policy_json) out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) profiles = [ SceneProfile( ad_type=0, scene="reward", true_request_probability=0.42, fill_success_probability=0.86, mean_load_ms=2100.0, per_session_enter_probability=0.55, ), SceneProfile( ad_type=2, scene="interstitial", true_request_probability=0.56, fill_success_probability=0.82, mean_load_ms=1750.0, per_session_enter_probability=0.72, ), SceneProfile( ad_type=1, scene="splash", true_request_probability=0.96, fill_success_probability=0.78, mean_load_ms=1350.0, per_session_enter_probability=1.0, ), ] robots = generate_robot_profiles(args.users, args.robot_seed) policy_map = parse_policy_config(policy_path) threshold_grid = [round(0.20 + i * 0.05, 2) for i in range(0, 15)] cooldown_grid = [30, 60, 90, 120, 180, 240, 300] retention_rates = [0.20, 0.25, 0.30, 0.35, 0.40, 0.50, 0.60] results, baseline = run_sensitivity( profiles=profiles, policy_map=policy_map, robots=robots, threshold_grid=threshold_grid, cooldown_grid=cooldown_grid, retention_rates=retention_rates, out_dir=out_dir, mode_manual_seed=args.base_seed, ) draw_heatmaps(results, 0.35, out_dir) draw_heatmaps(results, 0.20, out_dir) draw_heatmaps(results, 0.60, out_dir) draw_retention_lines(results, out_dir) report = build_report(results, baseline, out_dir, robot_count=len(robots)) print(f"[SmartLoadSimulation] generated outputs in: {out_dir}") print(f"[SmartLoadSimulation] report: {report}") if __name__ == "__main__": main()