#!/usr/bin/env python3
from __future__ import annotations

import json
import mimetypes
import errno
import os
import hashlib
import subprocess
import sys
import threading
import time
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import quote, unquote, urlparse, parse_qs

sys.path.insert(0, str(Path(__file__).resolve().parent))
from theobject_live_autopilot import kling_pause_status, loop as autopilot_loop

HOT_DISK_MOUNT = Path("/Volumes/Hot Disk")
ROOT = HOT_DISK_MOUNT / "The Object" / "produktion"
CAPTURES = ROOT / "captures"
JOBS = ROOT / "jobs"
PROMPT_FILE = Path("/Users/victorholland/Vibe Coding/The Camera/Input/The Object/Prompt.txt")
KAMERAMOTOR_API = "http://127.0.0.1:8089/api/kameramotor/job"
KLING_STATE = Path("/Users/victorholland/Vibe Coding/dispatcher/kling-kameramotor/state")
KLING_DONE = Path("/Users/victorholland/Vibe Coding/dispatcher/kling-kameramotor/done")
PUBLIC_HOST = (
    os.environ.get("THEOBJECT_PUBLIC_HOST")
    or subprocess.run(
        ["scutil", "--get", "LocalHostName"],
        capture_output=True,
        text=True,
    ).stdout.strip()
    or "Minimac.local"
)
PUBLIC_PORT = 8797
# PUBLISH_RECENT deaktiviert (Victor-Go 2026-06-12): Neue Galerie startet leer.
# Fallback auf alten iCloud-Publish-Pfad würde Altdaten einblenden — verboten.
# published_recent_statuses() liefert recent_statuses() direkt (kein Fallback).
ASSET_CACHE = Path("/tmp/theobject-asset-cache-8797")
ASSET_CACHE_LOCK = threading.Lock()
DIAGNOSTICS = ROOT / "diagnostics" / "app_events.jsonl"
DIAGNOSTICS_LOCK = threading.Lock()
PIPELINE_VERSION = "deterministic_6_images_8_films_v1"


def _check_hot_disk_mount() -> str | None:
    """Gibt None zurück wenn gemountet, sonst Fehlertext."""
    if not os.path.ismount(str(HOT_DISK_MOUNT)):
        return f"Hot Disk nicht gemountet: {HOT_DISK_MOUNT}"
    return None


def is_retryable_file_error(exc: OSError) -> bool:
    return exc.errno in {errno.EDEADLK, errno.EAGAIN, errno.EBUSY, errno.EPERM, 11}


def with_file_retry(fn, *, attempts: int = 18) -> None:
    for attempt in range(1, attempts + 1):
        try:
            fn()
            return
        except OSError as exc:
            if not is_retryable_file_error(exc) or attempt == attempts:
                raise
            time.sleep(min(0.08 * attempt, 0.5))


def now_ms() -> int:
    return int(time.time() * 1000)


def post_json(url: str, payload: dict) -> dict:
    req = urllib.request.Request(
        url,
        data=json.dumps(payload).encode("utf-8"),
        method="POST",
        headers={"content-type": "application/json"},
    )
    with urllib.request.urlopen(req, timeout=25) as response:
        return json.loads(response.read().decode("utf-8"))


def write_json(path: Path, payload: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")


def read_json(path: Path) -> dict | None:
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except Exception:
        return None


def request_host(headers=None) -> str:
    if headers:
        raw = headers.get("Host") or headers.get("host")
        if raw:
            return raw
    return f"{PUBLIC_HOST}:{PUBLIC_PORT}"


def asset_url(path: Path, host: str | None = None) -> str:
    effective_host = host or f"{PUBLIC_HOST}:{PUBLIC_PORT}"
    return f"http://{effective_host}/asset?path={quote(str(path), safe='')}"


def video_from_state(state: dict) -> Path | None:
    downloaded = state.get("downloaded") or {}
    if isinstance(downloaded, dict):
        p = Path(downloaded.get("outPath") or "")
        if p.exists() and p.suffix.lower() == ".mp4":
            return p
    for item in state.get("outputFiles") or []:
        p = Path(item)
        if p.exists() and p.suffix.lower() == ".mp4":
            return p
    return None


def _variant_suffix(capture_id: str) -> tuple[str, str | None]:
    """Return (bare_id, variant_or_None).  Handles --S / --K suffix case-insensitively."""
    upper = capture_id.upper()
    if upper.endswith("--S"):
        return capture_id[:-3], "S"
    if upper.endswith("--K"):
        return capture_id[:-3], "K"
    return capture_id, None


def _filter_status_to_variant(status: dict, variant: str) -> dict:
    """Restrict assets.videos and assets.videoVariants to a single variant (S or K).
    captureId is already set to the suffixed id by the caller."""
    assets = dict(status.get("assets") or {})
    video_variants: dict = dict(assets.get("videoVariants") or {})
    # Build filtered variants dict
    filtered_variants: dict[str, dict] = {}
    filtered_videos: dict[str, str] = {}
    for film_key, variants in video_variants.items():
        v_data = variants.get(variant)
        if v_data:
            filtered_variants[film_key] = {variant: v_data}
            filtered_videos[film_key] = v_data["url"]
            # Also expose slot key if present
            slot = v_data.get("slot")
            if slot:
                filtered_videos[slot] = v_data["url"]
                filtered_videos[f"{film_key}_{variant}"] = v_data["url"]
                filtered_videos[f"{slot}_{variant}"] = v_data["url"]

    assets = dict(assets)
    assets["videos"] = filtered_videos
    assets["videoVariants"] = filtered_variants
    result = dict(status)
    result["assets"] = assets
    result["variant"] = variant
    return result


def status_for_capture(capture_id: str, host: str | None = None) -> dict:
    bare_id, variant = _variant_suffix(capture_id)
    # Search using bare id (case-insensitive)
    for manifest_path in sorted(CAPTURES.glob("*/manifest.json"), reverse=True):
        manifest = read_json(manifest_path)
        if not manifest or str(manifest.get("capture_id", "")).lower() != bare_id.lower():
            continue
        status = status_from_manifest(manifest_path, manifest, capture_id, host=host)
        if variant:
            status = _filter_status_to_variant(status, variant)
        return status
    return {"ok": False, "captureId": capture_id, "status": "missing", "ready": False}


def status_from_manifest(manifest_path: Path, manifest: dict, capture_id: str | None = None, host: str | None = None) -> dict:
        capture_dir = manifest_path.parent
        if manifest.get("ok") is False and manifest.get("error"):
            payload = manifest.get("payload") or {}
            return {
                "ok": True,
                "captureId": capture_id or manifest.get("capture_id"),
                "status": "failed",
                "ready": False,
                "doneVideos": 0,
                "neededVideos": 8,
                "assets": {
                    "originalImage": asset_url(Path(manifest["image"]), host=host) if Path(manifest.get("image", "")).exists() else None,
                    "heroImage": None,
                    "realExplodedImage": None,
                    "unrealExplodedImage": None,
                    "neatifyImage": None,
                    "secretDetailImage": None,
                    "videos": {},
                },
                "orientation": "portrait" if payload.get("ratio") == "9:16" else "landscape",
                "statusNote": str(manifest.get("error") or "failed"),
            }
        fullrun = capture_dir / "fullrun"
        ledger = read_json(fullrun / "RUN_LEDGER.json") or {}
        image_manifest = read_json(fullrun / "manifest_images_cdef.json") or {}
        video_manifest = read_json(fullrun / "manifest_videos_gn.json") or {}
        pause = video_manifest.get("pause") or manifest.get("autopilot_pause")
        anchors = {}
        for source in [ledger, image_manifest]:
            for key in ["A", "B", "C", "D", "E", "F"]:
                value = source.get(key) or (source.get("anchors") or {}).get(key)
                if isinstance(value, str) and Path(value).exists():
                    anchors[key] = value
        images_dir = fullrun / "images"
        fallback_images = {
            "A": images_dir / "A_original_from_iphone.jpg",
            "B": images_dir / "B_hero.png",
            "C": images_dir / "C_exploded_real.png",
            "D": images_dir / "D_exploded_secret.png",
            "E": images_dir / "E_neatify.png",
            "F": images_dir / "F_secret_detail.png",
        }
        for key, path in fallback_images.items():
            if key not in anchors and path.exists():
                anchors[key] = str(path)

        app_ready_videos = ledger.get("app_ready_videos") or video_manifest.get("app_ready_videos") or []
        canonical_video_keys = {
            "G": "film_01_original_to_hero",
            "H": "film_02_hero_orbit_360",
            "I": "film_03_hero_to_exploded_real",
            "J": "film_04_exploded_real_orbit_360",
            "K": "film_05_exploded_real_to_neatify",
            "L": "film_06_hero_to_exploded_secret",
            "M": "film_07_exploded_secret_orbit_360",
            "N": "film_08_exploded_secret_to_secret_detail",
        }
        legacy_video_keys = {
            "G": "01_a_to_b_reality_to_hero",
            "H": "04a_hero_360_clockwork_asmr",
            "I": "02_b_to_z_real_explosion",
            "K": "04b_exploded_real_360_clockwork_asmr",
            "L": "03_b_to_z_unreal_explosion",
            "N": "04c_exploded_unreal_360_clockwork_asmr",
        }
        deterministic_videos: dict[str, str] = {}
        video_variants: dict[str, dict] = {}
        for item in app_ready_videos:
            slot = item.get("slot")
            path = Path(item.get("path") or "")
            if slot in canonical_video_keys and path.exists():
                url = asset_url(path, host=host)
                canonical_key = canonical_video_keys[slot]
                variant = str(item.get("variant") or item.get("badge") or "S").upper()
                badge = str(item.get("badge") or variant)
                provider_name = item.get("provider_name") or ("Seedance 1.5 Pro" if variant == "S" else "Kling 2.5" if variant == "K" else variant)
                video_variants.setdefault(canonical_key, {})[variant] = {
                    "url": url,
                    "badge": badge,
                    "provider": provider_name,
                    "slot": slot,
                }
                deterministic_videos[f"{canonical_key}_{variant}"] = url
                deterministic_videos[f"{slot}_{variant}"] = url
                if canonical_key not in deterministic_videos or variant == "S":
                    deterministic_videos[canonical_key] = url
                    deterministic_videos[slot] = url
                    if slot in legacy_video_keys:
                        deterministic_videos[legacy_video_keys[slot]] = url
                elif slot not in deterministic_videos:
                    deterministic_videos[slot] = url

        # Victor-Grundsatz 2026-06-11: Seedance-only. Nur noch 8 S-Clips nötig (nicht 16).
        # Captures die BEIDE Varianten (S+K) komplett haben bleiben ready — mehr als nötig.
        required_variant_count = len(canonical_video_keys)  # = 8, nicht mehr 16
        done_variant_count = sum(len(variants) for variants in video_variants.values())
        # ready wenn alle 8 Slots mindestens die S-Variante haben
        s_done_count = sum(1 for variants in video_variants.values() if "S" in variants)
        deterministic_ready = (
            all(key in anchors for key in ["A", "B", "C", "D", "E", "F"])
            and s_done_count >= len(canonical_video_keys)
        )
        partial_ready = ledger.get("status") == "partial_ready" and (anchors or deterministic_videos)
        if anchors or deterministic_videos:
            assets = {
                "originalImage": asset_url(Path(anchors["A"]), host=host) if "A" in anchors else (asset_url(Path(manifest["image"]), host=host) if Path(manifest.get("image", "")).exists() else None),
                "heroImage": asset_url(Path(anchors["B"]), host=host) if "B" in anchors else None,
                "realExplodedImage": asset_url(Path(anchors["C"]), host=host) if "C" in anchors else None,
                "unrealExplodedImage": asset_url(Path(anchors["D"]), host=host) if "D" in anchors else None,
                "neatifyImage": asset_url(Path(anchors["E"]), host=host) if "E" in anchors else None,
                "secretDetailImage": asset_url(Path(anchors["F"]), host=host) if "F" in anchors else None,
                "videos": deterministic_videos,
                "videoVariants": video_variants,
            }
            status_value = "ready" if (deterministic_ready or partial_ready) else ("needs_victor" if pause else "running")
            return {
                "ok": True,
                "captureId": capture_id or manifest.get("capture_id"),
                "status": status_value,
                "ready": bool(deterministic_ready or partial_ready),
                "doneVideos": done_variant_count,
                "neededVideos": required_variant_count if not partial_ready else max(1, done_variant_count),
                "assets": assets,
                "orientation": "portrait" if (manifest.get("payload") or {}).get("ratio") == "9:16" else "landscape",
                "statusNote": pause.get("reason") if pause and status_value == "needs_victor" else None,
                "blockedBy": pause.get("blocked_by") if pause and status_value == "needs_victor" else None,
            }

        phase1 = read_json(fullrun / "manifest_phase1.json") or {}
        phase2 = read_json(fullrun / "manifest_phase2_exploded_360.json") or {}
        jobs = list(phase1.get("jobs") or []) + list(phase2.get("jobs") or [])
        videos: dict[str, str] = {}
        done_count = 0
        for job in jobs:
            jid = job.get("id")
            key = job.get("filter")
            if not jid or not key or not (KLING_DONE / f"{jid}.json").exists():
                continue
            state = read_json(KLING_STATE / f"{jid}.json") or {}
            video = video_from_state(state)
            if video:
                videos[key] = asset_url(video, host=host)
                done_count += 1
        anchors = phase1.get("anchors") or {}
        slug = Path(anchors.get("B", capture_dir.name)).name.split("__B_")[0]
        hero_app_poster = fullrun / "app_posters" / f"{slug}__B_from_a_to_b_final_frame.png"
        z_real = fullrun / "anchors_z_from_kling" / f"{Path(anchors.get('B', capture_dir.name)).name.split('__B_')[0]}__Z_real_from_kling_final_frame.png"
        z_unreal = fullrun / "anchors_z_from_kling" / f"{Path(anchors.get('B', capture_dir.name)).name.split('__B_')[0]}__Z_unreal_from_kling_final_frame.png"
        assets = {
            "originalImage": asset_url(Path(manifest["image"]), host=host) if Path(manifest.get("image", "")).exists() else None,
            "heroImage": asset_url(hero_app_poster, host=host) if hero_app_poster.exists() else (asset_url(Path(anchors["B"]), host=host) if anchors.get("B") and Path(anchors["B"]).exists() else (asset_url(Path(manifest["hero_image"]), host=host) if manifest.get("hero_image") and Path(manifest["hero_image"]).exists() else None)),
            "realExplodedImage": asset_url(z_real, host=host) if z_real.exists() else None,
            "unrealExplodedImage": asset_url(z_unreal, host=host) if z_unreal.exists() else None,
            "videos": videos,
        }
        required = [
            "01_a_to_b_reality_to_hero",
            "02_b_to_z_real_explosion",
            "03_b_to_z_unreal_explosion",
            "04a_hero_360_clockwork_asmr",
            "04b_exploded_real_360_clockwork_asmr",
            "04c_exploded_unreal_360_clockwork_asmr",
        ]
        ready = all(key in videos for key in required) and assets["heroImage"] and assets["realExplodedImage"] and assets["unrealExplodedImage"]
        status_value = "ready" if ready else ("needs_victor" if pause else "running")
        return {
            "ok": True,
            "captureId": capture_id or manifest.get("capture_id"),
            "status": status_value,
            "ready": bool(ready),
            "doneVideos": done_count,
            "neededVideos": 6,
            "assets": assets,
            "statusNote": pause.get("reason") if pause and status_value == "needs_victor" else None,
            "blockedBy": pause.get("blocked_by") if pause and status_value == "needs_victor" else None,
        }


def _has_dual_variants(status: dict) -> bool:
    """Return True if every film key in videoVariants has BOTH S and K entries."""
    video_variants = (status.get("assets") or {}).get("videoVariants") or {}
    if not video_variants:
        return False
    return all("S" in variants and "K" in variants for variants in video_variants.values())


def recent_statuses(limit: int | None = None, host: str | None = None) -> dict:
    items = []
    manifest_paths = sorted(
        CAPTURES.glob("*/manifest.json"),
        key=lambda p: ((read_json(p) or {}).get("created_at") or p.parent.name),
        reverse=True,
    )
    for manifest_path in manifest_paths:
        manifest = read_json(manifest_path)
        if not manifest:
            continue
        status = status_from_manifest(manifest_path, manifest, host=host)
        created_at = manifest.get("created_at")
        updated_at = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(manifest_path.stat().st_mtime))
        glow = not bool(status.get("ready")) or (time.time() - manifest_path.stat().st_mtime < 24 * 60 * 60)
        payload = manifest.get("payload") or {}
        orientation = "portrait" if payload.get("ratio") == "9:16" else "landscape"

        # Victor-Grundsatz 2026-06-11: kein Doppel-Listing mehr.
        # Jedes Capture erscheint als EINE Kachel (S-Variante bevorzugt).
        # _filter_status_to_variant / --S/--K in status_for_capture() bleiben
        # als tote Rückwärtskompatibilität für gecachte App-IDs.
        status["createdAt"] = created_at
        status["updatedAt"] = updated_at
        status["glow"] = glow
        status["orientation"] = orientation
        items.append(status)

        if limit is not None and len(items) >= limit:
            break
    return {
        "ok": True,
        "source": "theobject_central_capture_list_v1",
        "captures": items,
    }


def published_recent_statuses(host: str | None = None) -> tuple[dict, int]:
    # Fallback auf alten Publish-Pfad deaktiviert (Victor-Go 2026-06-12).
    # Neue Galerie startet leer — kein Altbestand darf erscheinen.
    # recent_statuses() ist die einzige Wahrheit.
    return recent_statuses(limit=None, host=host), 200


def record_diagnostic(payload: dict, client: str) -> dict:
    event = {
        "receivedAt": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
        "client": client,
        "payload": payload,
    }
    DIAGNOSTICS.parent.mkdir(parents=True, exist_ok=True)
    with DIAGNOSTICS_LOCK:
        line = json.dumps(event, ensure_ascii=False) + "\n"

        def append_event() -> None:
            with DIAGNOSTICS.open("a", encoding="utf-8") as handle:
                handle.write(line)

        with_file_retry(append_event)
    return event


def recent_diagnostics(limit: int = 80) -> dict:
    if not DIAGNOSTICS.exists():
        return {"ok": True, "events": []}
    lines = DIAGNOSTICS.read_text(encoding="utf-8").splitlines()[-limit:]
    events = []
    for line in lines:
        try:
            events.append(json.loads(line))
        except json.JSONDecodeError:
            continue
    return {"ok": True, "events": events}


def cached_asset_path(path: Path) -> Path:
    source = path.resolve()
    stat = source.stat()
    digest = hashlib.sha256(str(source).encode("utf-8")).hexdigest()[:24]
    cached = ASSET_CACHE / f"{digest}-{stat.st_size}-{int(stat.st_mtime)}{source.suffix.lower()}"
    if cached.exists() and cached.stat().st_size == stat.st_size:
        return cached

    ASSET_CACHE.mkdir(parents=True, exist_ok=True)
    temp = cached.with_suffix(cached.suffix + ".tmp")
    with ASSET_CACHE_LOCK:
        if cached.exists() and cached.stat().st_size == stat.st_size:
            return cached
        try:
            with source.open("rb") as src, temp.open("wb") as dst:
                while True:
                    for attempt in range(8):
                        try:
                            chunk = src.read(256 * 1024)
                            break
                        except OSError as exc:
                            if exc.errno != errno.EDEADLK or attempt == 7:
                                raise
                            time.sleep(0.08 * (attempt + 1))
                    if not chunk:
                        break
                    dst.write(chunk)
        except OSError as exc:
            if exc.errno != errno.EDEADLK:
                raise
            try:
                temp.unlink()
            except FileNotFoundError:
                pass
            try:
                subprocess.run(["/bin/cp", str(source), str(temp)], check=True)
            except subprocess.CalledProcessError:
                subprocess.run(
                    ["/usr/bin/ditto", "--norsrc", "--noextattr", "--noqtn", "--noacl", "--noclone", str(source), str(temp)],
                    check=True,
                )
        temp.replace(cached)
    return cached


class Handler(BaseHTTPRequestHandler):
    server_version = "TheObjectIngest/1.0"

    def do_GET(self) -> None:
        parsed = urlparse(self.path)
        if parsed.path == "/health":
            mount_err = _check_hot_disk_mount()
            if mount_err:
                self.send_json({"ok": False, "service": "theobject-ingest", "root": str(ROOT), "error": mount_err}, status=503)
                return
            self.send_json({"ok": True, "service": "theobject-ingest", "root": str(ROOT)})
            return
        if parsed.path == "/status":
            capture_id = (parse_qs(parsed.query).get("captureId") or [""])[0]
            self.send_json(status_for_capture(capture_id, host=request_host(self.headers)))
            return
        if parsed.path == "/recent":
            self.send_json(recent_statuses(host=request_host(self.headers)))
            return
        if parsed.path == "/objects/recent":
            payload, status = published_recent_statuses(host=request_host(self.headers))
            self.send_json(payload, status=status)
            return
        if parsed.path == "/diagnostics/recent":
            self.send_json(recent_diagnostics())
            return
        if parsed.path == "/asset":
            path = Path(unquote((parse_qs(parsed.query).get("path") or [""])[0]))
            if not path.exists() or ROOT not in path.resolve().parents:
                self.send_error(404)
                return
            self.send_asset(path)
            return
        if parsed.path != "/health":
            self.send_error(404)
            return

    def send_asset(self, path: Path) -> None:
        source_name = path.name
        try:
            path = cached_asset_path(path)
        except (OSError, subprocess.CalledProcessError):
            self.send_error(503, "asset temporarily unavailable")
            return
        content_type = mimetypes.guess_type(source_name)[0] or ("video/mp4" if path.suffix.lower() == ".mp4" else "application/octet-stream")
        size = path.stat().st_size
        range_header = self.headers.get("Range")
        if range_header and range_header.startswith("bytes="):
            start_s, _, end_s = range_header.removeprefix("bytes=").partition("-")
            try:
                start = int(start_s) if start_s else 0
                end = int(end_s) if end_s else size - 1
                start = max(0, min(start, size - 1))
                end = max(start, min(end, size - 1))
            except ValueError:
                self.send_error(416)
                return
            length = end - start + 1
            self.send_response(206)
            self.send_header("content-type", content_type)
            self.send_header("accept-ranges", "bytes")
            self.send_header("content-range", f"bytes {start}-{end}/{size}")
            self.send_header("content-length", str(length))
            self.end_headers()
            with path.open("rb") as f:
                f.seek(start)
                self.wfile.write(f.read(length))
            return
        self.send_response(200)
        self.send_header("content-type", content_type)
        self.send_header("accept-ranges", "bytes")
        self.send_header("content-length", str(size))
        self.end_headers()
        with path.open("rb") as f:
            chunk_size = 64 * 1024
            while True:
                for attempt in range(5):
                    try:
                        chunk = f.read(chunk_size)
                        break
                    except OSError as exc:
                        if exc.errno != errno.EDEADLK or attempt == 4:
                            raise
                        time.sleep(0.05 * (attempt + 1))
                if not chunk:
                    break
                self.wfile.write(chunk)

    def do_POST(self) -> None:
        path = urlparse(self.path).path
        if path == "/diagnostics":
            self.receive_diagnostic()
            return
        if path != "/upload":
            self.send_error(404)
            return

        mount_err = _check_hot_disk_mount()
        if mount_err:
            self.send_json({"ok": False, "error": mount_err, "retry": True}, status=503)
            return

        length = int(self.headers.get("content-length", "0"))
        if length <= 0 or length > 40_000_000:
            self.send_error(400, "bad image size")
            return

        capture_id = self.headers.get("x-theobject-capture-id") or f"capture-{now_ms()}"
        orientation = self.headers.get("x-theobject-orientation") or "unknown"
        requested_ratio = self.headers.get("x-theobject-ratio") or ""
        safe_id = "".join(ch for ch in capture_id.lower() if ch.isalnum() or ch in "-_")[:80]
        batch = time.strftime("live-%Y%m%d-%H%M%S")
        capture_dir = CAPTURES / f"{batch}-{safe_id}"
        capture_dir.mkdir(parents=True, exist_ok=True)

        image_path = capture_dir / "original_from_iphone.jpg"
        image_path.write_bytes(self.rfile.read(length))

        ratio = requested_ratio if requested_ratio in {"16:9", "9:16"} else ("9:16" if orientation == "portrait" else "16:9")
        output_dir = capture_dir / "hero_magnific"
        payload = {
            "image": str(image_path),
            "prompt_file": str(PROMPT_FILE),
            "ratio": ratio,
            "pipeline_version": PIPELINE_VERSION,
            "num_images": 1,
            "resolution": "4k",
            "thinking_level": "high",
            "generator": "nano",
            "provider": "magnific",
            "mode": "imagen-nano-banana-2-flash",
            "stem": f"The Object live photo {batch}",
            "filter": "theobject-hero-360-production",
            "source": "The Object iPhone live ingest",
            "output_dir": str(output_dir),
            "group_id": f"theobject-live:{batch}:{safe_id}:hero",
        }

        manifest = {
            "ok": False,
            "pipeline_version": PIPELINE_VERSION,
            "capture_id": capture_id,
            "batch": batch,
            "created_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
            "image": str(image_path),
            "prompt_file": str(PROMPT_FILE),
            "payload": payload,
        }

        try:
            response = post_json(KAMERAMOTOR_API, payload)
            manifest["ok"] = True
            manifest["kameramotor_response"] = response
            job_id = response.get("job_id")
            write_json(capture_dir / "manifest.json", manifest)
            write_json(JOBS / f"{batch}-{safe_id}.json", manifest)
            self.send_json({"ok": True, "captureId": capture_id, "jobId": job_id})
        except Exception as exc:
            manifest["error"] = str(exc)
            write_json(capture_dir / "manifest.json", manifest)
            self.send_json({"ok": False, "captureId": capture_id, "error": str(exc)}, status=502)

    def receive_diagnostic(self) -> None:
        length = int(self.headers.get("content-length", "0"))
        if length <= 0 or length > 64_000:
            self.send_json({"ok": False, "error": "bad diagnostic size"}, status=400)
            return
        try:
            payload = json.loads(self.rfile.read(length).decode("utf-8"))
        except Exception:
            self.send_json({"ok": False, "error": "bad diagnostic json"}, status=400)
            return
        event = record_diagnostic(payload if isinstance(payload, dict) else {"value": payload}, self.client_address[0])
        self.send_json({"ok": True, "receivedAt": event["receivedAt"]})

    def send_json(self, payload: dict, status: int = 200) -> None:
        data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
        self.send_response(status)
        self.send_header("content-type", "application/json; charset=utf-8")
        self.send_header("content-length", str(len(data)))
        self.end_headers()
        self.wfile.write(data)

    def log_message(self, format: str, *args) -> None:
        return


def main() -> int:
    mount_err = _check_hot_disk_mount()
    if mount_err:
        raise SystemExit(f"FATAL: {mount_err}")
    ROOT.mkdir(parents=True, exist_ok=True)
    if not PROMPT_FILE.exists():
        raise SystemExit(f"Prompt fehlt: {PROMPT_FILE}")
    threading.Thread(target=autopilot_loop, name="theobject-live-autopilot", daemon=True).start()
    server = ThreadingHTTPServer(("0.0.0.0", 8797), Handler)
    print(json.dumps({"ok": True, "url": "http://0.0.0.0:8797", "root": str(ROOT)}, ensure_ascii=False))
    server.serve_forever()
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
