Danke @holli 73 !!!
Neue Version, ein paar Bugfixes und inkl. Teletext:
#!/usr/bin/env python3
orf_wrapper.py – on-demand ffmpeg proxy für regionale ORF2-Streams via TVHeadend HTTP
Umgeht CA-Change-Bugs im HTSP-Protokoll durch kontinuierlichen ffmpeg-Decode-Pass.
Änderungen gegenüber v6.0:
- Stall-Timer wird bei ffmpeg-Restart korrekt zurückgesetzt
- Restart-Event entkoppelt stderr-Thread von _run-Loop
- _broadcast hält Lock nicht mehr während sink.write() läuft (Snapshot-Pattern)
- Graceful Shutdown via SIGINT/SIGTERM mit server.shutdown()
- Log-Level differenziert (INFO für normale Events, ERROR nur für echte Fehler)
- Teletext-Streams werden via -map 0:d? mitgenommen
Änderungen v7.1:
- h264-Fehlerburst-Erkennung: Restart wenn PPS-Fehler anhaltend ohne Recovery
(greift auch bei [h264 @ 0x…], nicht nur bei [NULL @ …])
- remove_client() idempotent per Guard → kein Doppel-Log mehr
"""
import os
import sys
import time
import signal
import logging
import threading
import subprocess
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Dict, Optional, List
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
TVH_BASE = "http://127.0.0.1:9981/stream/service"
PROFILE = "pass"
LOG_PATH = "/var/log/orf_wrapper.log"
BIND_ADDR = os.environ.get("ORF2_BIND", "0.0.0.0")
BIND_PORT = int(os.environ.get("ORF2_PORT", "8800"))
RESTART_BACKOFF_SEC = 1.0
STALL_TIMEOUT_SEC = 6.0
CHUNK_SIZE = 188 * 50 # ~9,4 kB – ein sauberes TS-Vielfaches
# Wie viele aufeinanderfolgende h264-Fehler lösen einen Restart aus.
# Bei ORF2-CA-Changes kommen diese in Bursts von 20-100 Zeilen;
# ein normaler Sync-Fehler am Stream-Anfang bleibt weit darunter.
PPS_ERROR_BURST_LIMIT = 30
SERVICE_MAP: Dict[str, str] = {
"/orf2k.ts": "62b2d62afef48b374003b91d03341959",
"/orf2st.ts": "1c991f2fcf9ec1e6f25de43ec84c91bc",
"/orf2b.ts": "0e86eecf524585e6fe8bb27d9bbe05ec",
"/orf2o.ts": "a02a4f4168239be22b4e2070c4027cc3",
"/orf2s.ts": "c8187e84cfc1af8bcd94a2f5913d61b7",
"/orf2t.ts": "be6e1693dc15631ed4194a3ff76e4e3f",
"/orf2v.ts": "e1d798361dee619fc2a7a28d0946a8cb",
"/orf2n.ts": "920b04e52ac34b3b0d99775420d9446e",
"/orf2w.ts": "10ef17c5bcc0c9e44a22cd673d1facd4",
}
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
def setup_logging() -> logging.Logger:
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
logger = logging.getLogger("orf_wrapper")
logger.setLevel(logging.DEBUG)
if not logger.handlers:
fh = logging.FileHandler(LOG_PATH)
sh = logging.StreamHandler(sys.stdout)
fmt = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s")
fh.setFormatter(fmt)
sh.setFormatter(fmt)
# Datei: alles ab INFO; stdout: nur WARNING+
fh.setLevel(logging.INFO)
sh.setLevel(logging.WARNING)
logger.addHandler(fh)
logger.addHandler(sh)
return logger
LOGGER = setup_logging()
# ---------------------------------------------------------------------------
# ffmpeg-Kommando
# ---------------------------------------------------------------------------
def build_ffmpeg_cmd(service_uuid: str) -> List[str]:
input_url = f"{TVH_BASE}/{service_uuid}?profile={PROFILE}"
return [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-fflags", "+genpts+igndts+discardcorrupt",
"-reconnect", "1",
"-reconnect_streamed", "1",
"-reconnect_delay_max", "5",
"-probesize", "2M",
"-analyzeduration", "2000000",
"-i", input_url,
# Video + Audio + Subtitle (Teletext als dvb_teletext = codec_type subtitle)
# + Data (EPG, sonstige Data-Streams)
"-map", "0:v?",
"-map", "0:a?",
"-map", "0:s?", # dvb_teletext liegt als subtitle vor
"-map", "0:d?", # EPG / sonstige data-Streams
"-c", "copy",
"-mpegts_flags", "+resend_headers",
"-pat_period", "0.2",
"-f", "mpegts",
"pipe:1",
]
# ---------------------------------------------------------------------------
# ClientSink – threadsicher, markiert sich selbst als tot
# ---------------------------------------------------------------------------
class ClientSink:
def __init__(self, handler: BaseHTTPRequestHandler):
self._handler = handler
self._dead = False
self._lock = threading.Lock()
def write(self, data: bytes) -> bool:
"""Gibt False zurück wenn der Client getrennt wurde."""
if self._dead:
return False
with self._lock:
if self._dead:
return False
try:
self._handler.wfile.write(data)
self._handler.wfile.flush()
return True
except OSError:
self._dead = True
return False
# ---------------------------------------------------------------------------
# EndpointStreamer – ein ffmpeg-Prozess pro Endpoint, n Clients
# ---------------------------------------------------------------------------
class EndpointStreamer:
def __init__(self, path: str, service_uuid: str):
self.path = path
self.service_uuid = service_uuid
self._lock = threading.Lock()
self._clients: Dict[int, ClientSink] = {}
self._next_client_id = 1
self._worker_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# --- öffentliche API ---------------------------------------------------
def add_client(self, sink: ClientSink) -> int:
with self._lock:
cid = self._next_client_id
self._next_client_id += 1
self._clients[cid] = sink
LOGGER.info(f"{self.path}: client {cid} connected (total: {len(self._clients)})")
if self._worker_thread is None or not self._worker_thread.is_alive():
self._stop_event.clear()
self._worker_thread = threading.Thread(
target=self._run, name=f"streamer{self.path}", daemon=True
)
self._worker_thread.start()
return cid
def remove_client(self, client_id: int) -> None:
with self._lock:
if client_id not in self._clients:
return # bereits entfernt (z.B. durch _broadcast)
del self._clients[client_id]
remaining = len(self._clients)
LOGGER.info(f"{self.path}: client {client_id} disconnected (total: {remaining})")
if not self._clients:
self._stop_event.set()
# --- interne Hilfsmethoden ---------------------------------------------
def _snapshot_clients(self) -> List[tuple]:
"""Gibt eine Kopie der aktuellen Client-Liste zurück (Lock-frei nutzbar)."""
with self._lock:
return list(self._clients.items())
def _broadcast(self, data: bytes) -> None:
"""
Sendet data an alle Clients.
Lock wird nur für den Snapshot gehalten, NICHT während write() läuft.
Tote Clients werden danach sauber entfernt.
"""
dead = []
for cid, sink in self._snapshot_clients():
if not sink.write(data):
dead.append(cid)
for cid in dead:
self.remove_client(cid)
# --- Worker-Thread -----------------------------------------------------
def _run(self):
LOGGER.info(f"{self.path}: worker started")
while not self._stop_event.is_set():
# Stall-Timer bei jedem (Neu-)Start zurücksetzen
last_valid_ts = time.monotonic()
restart_needed = threading.Event()
cmd = build_ffmpeg_cmd(self.service_uuid)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0,
text=False,
)
LOGGER.info(f"{self.path}: ffmpeg started (pid={proc.pid})")
stop_stderr = threading.Event()
stderr_t = threading.Thread(
target=self._stderr_reader,
args=(proc, stop_stderr, restart_needed),
daemon=True,
)
stderr_t.start()
try:
while not self._stop_event.is_set() and not restart_needed.is_set():
chunk = proc.stdout.read(CHUNK_SIZE)
if chunk:
# Nur bei sauberen TS-Paketen Timer aktualisieren
if len(chunk) % 188 == 0:
last_valid_ts = time.monotonic()
self._broadcast(chunk)
else:
# ffmpeg beendet?
if proc.poll() is not None:
LOGGER.warning(f"{self.path}: ffmpeg exited (rc={proc.returncode})")
break
# Stall: zu lange keine validen TS-Pakete
if time.monotonic() - last_valid_ts > STALL_TIMEOUT_SEC:
LOGGER.error(f"{self.path}: TS stall >{STALL_TIMEOUT_SEC}s, restarting ffmpeg")
break
time.sleep(0.05)
finally:
# ffmpeg sauber beenden
self._terminate_proc(proc)
stop_stderr.set()
stderr_t.join(timeout=2)
# Kein Client mehr? Worker beenden.
with self._lock:
if not self._clients:
break
LOGGER.info(f"{self.path}: restarting in {RESTART_BACKOFF_SEC}s …")
time.sleep(RESTART_BACKOFF_SEC)
with self._lock:
self._worker_thread = None
LOGGER.info(f"{self.path}: worker stopped")
def _terminate_proc(self, proc: subprocess.Popen) -> None:
try:
proc.send_signal(signal.SIGTERM)
except OSError:
pass
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
LOGGER.warning(f"{self.path}: ffmpeg (pid={proc.pid}) didn't exit, killing")
try:
proc.kill()
except OSError:
pass
def _stderr_reader(
self,
proc: subprocess.Popen,
stop_event: threading.Event,
restart_needed: threading.Event,
) -> None:
"""
Liest ffmpeg stderr und erkennt zwei Restart-Auslöser:
1. [NULL @ …] non-existing PPS → kein Decoder-Context, sofortiger Restart.
2. Anhaltender h264-Fehlerburst NACH der Startup-Toleranzzeit:
Beim Stream-Start sind PPS-Fehler normal (CA-Change-Übergang) und
dauern typisch < 2s. Erst danach werden Fehler gezählt.
Überschreitet der Zähler PPS_ERROR_BURST_LIMIT → Restart.
Jede fehlerfreie Zeile setzt den Zähler zurück.
"""
startup_grace_end = time.monotonic() + 3.0 # 3s Toleranz nach ffmpeg-Start
pps_errors = 0
try:
for raw_line in proc.stderr:
if stop_event.is_set():
break
line = raw_line.decode("utf-8", errors="replace").rstrip()
if not line:
continue
LOGGER.info(f"{self.path} ffmpeg[{proc.pid}]: {line}")
# --- Fall 1: NULL-Context, sofortiger Restart ---
if "[NULL @" in line and "non-existing PPS" in line:
LOGGER.error(f"{self.path} ffmpeg[{proc.pid}] CRITICAL NULL-decoder, restarting")
restart_needed.set()
try:
proc.kill()
except OSError:
pass
return
# --- Fall 2: h264-Fehlerburst nach Startup-Toleranzzeit ---
if time.monotonic() < startup_grace_end:
continue # Startup-Phase: Fehler ignorieren
is_pps_error = (
"non-existing PPS" in line
or "decode_slice_header error" in line
or "no frame!" in line
or "Last message repeated" in line
)
if is_pps_error:
pps_errors += 1
if pps_errors >= PPS_ERROR_BURST_LIMIT:
LOGGER.error(
f"{self.path} ffmpeg[{proc.pid}]: "
f"h264 error burst ({pps_errors} lines), restarting"
)
restart_needed.set()
try:
proc.kill()
except OSError:
pass
return
else:
pps_errors = 0 # Recovery: Zähler zurücksetzen
except Exception as exc:
LOGGER.error(f"{self.path}: stderr reader exception: {exc}")
# ---------------------------------------------------------------------------
# Streamer-Tabelle
# ---------------------------------------------------------------------------
STREAMERS: Dict[str, EndpointStreamer] = {
path: EndpointStreamer(path, uuid) for path, uuid in SERVICE_MAP.items()
}
# ---------------------------------------------------------------------------
# HTTP-Handler
# ---------------------------------------------------------------------------
class OrfHandler(BaseHTTPRequestHandler):
server_version = "orf-wrapper/7.1"
protocol_version = "HTTP/1.1" # Keep-Alive-fähig (hilft bei manchen Clients)
def do_GET(self):
path = self.path.split("?")[0] # Query-String ignorieren
if path in ("/", "/index.html"):
self._handle_index()
return
if path not in STREAMERS:
self.send_response(404)
self.send_header("Content-Length", "0")
self.end_headers()
return
# Stream-Antwort – kein Content-Length (unbegrenzt)
self.send_response(200)
self.send_header("Content-Type", "video/MP2T")
self.send_header("Cache-Control", "no-cache, no-store")
self.send_header("Connection", "close")
self.end_headers()
streamer = STREAMERS[path]
sink = ClientSink(self)
cid = streamer.add_client(sink)
try:
# Warten bis Client sich trennt (sink._dead) oder Server stoppt
while not sink._dead:
time.sleep(0.5)
finally:
streamer.remove_client(cid)
def _handle_index(self):
body = "Available endpoints:\n" + "".join(
f" http://{self.headers.get('Host', BIND_ADDR + ':' + str(BIND_PORT))}{p}\n"
for p in sorted(STREAMERS)
)
encoded = body.encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
self.wfile.write(encoded)
def log_message(self, fmt, *args):
# HTTP-Access-Log unterdrücken (alles läuft über LOGGER)
pass
# ---------------------------------------------------------------------------
# Graceful Shutdown
# ---------------------------------------------------------------------------
_SERVER: Optional[ThreadingHTTPServer] = None
def _handle_signal(signum, frame):
sig_name = signal.Signals(signum).name
LOGGER.warning(f"Received {sig_name}, shutting down …")
if _SERVER:
t = threading.Thread(target=_SERVER.shutdown, daemon=True)
t.start()
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
global _SERVER
signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)
LOGGER.warning(f"ORF wrapper v7.0 starting on {BIND_ADDR}:{BIND_PORT}")
LOGGER.info(f"Endpoints: {', '.join(sorted(STREAMERS))}")
_SERVER = ThreadingHTTPServer((BIND_ADDR, BIND_PORT), OrfHandler)
_SERVER.serve_forever()
LOGGER.warning("ORF wrapper stopped.")
if __name__ == "__main__":
main()