diff --git a/app/worker.py b/app/worker.py index 510a401..cc8181f 100644 --- a/app/worker.py +++ b/app/worker.py @@ -6,6 +6,7 @@ from pathlib import Path import math import difflib import time +from collections import deque from faster_whisper import WhisperModel from xml.sax.saxutils import escape as xml_escape @@ -1495,6 +1496,73 @@ def _sanitize_video_preset(encoder: str) -> str | None: return preset +def _parse_ffmpeg_ts(ts: str) -> float: + ts = ts.strip() + if not ts: + return 0.0 + try: + parts = ts.split(":") + if len(parts) == 3: + h, m, s = parts + return int(h) * 3600 + int(m) * 60 + float(s) + if len(parts) == 2: + m, s = parts + return int(m) * 60 + float(s) + return float(ts) + except Exception: + return 0.0 + + +def _ffmpeg_run_with_progress(cmd: list[str], duration: float, label: str) -> None: + if not cmd: + raise ValueError("ffmpeg command is empty") + output_path = cmd[-1] + cmd_progress = cmd[:-1] + ["-progress", "pipe:1", "-nostats", output_path] + + start = time.time() + last_pct = -5 + processed_sec = 0.0 + captured: deque[str] = deque(maxlen=100) + + proc = subprocess.Popen( + cmd_progress, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + + try: + assert proc.stdout is not None + for raw_line in proc.stdout: + line = raw_line.strip() + if not line: + continue + captured.append(line) + if line.startswith("out_time="): + processed_sec = _parse_ffmpeg_ts(line.split("=", 1)[1]) + if duration > 0: + pct = int(min(100, max(0, (processed_sec / duration) * 100))) + if pct >= last_pct + 5: + elapsed = max(0.001, time.time() - start) + speed = processed_sec / elapsed if elapsed > 0 else 0.0 + remaining = max(0.0, duration - processed_sec) + eta = remaining / speed if speed > 0 else 0.0 + print( + f"[normalize] progress {pct:3d}% {label} rtf={speed:0.2f}x eta={_fmt_eta(eta)}", + flush=True, + ) + last_pct = pct + elif line.startswith("progress=") and line.endswith("end"): + break + finally: + proc.wait() + + if proc.returncode != 0: + output = "\n".join(captured) + raise subprocess.CalledProcessError(proc.returncode, cmd_progress, output) + + def _ffprobe_streams(path: Path) -> dict[str, str]: try: out = subprocess.check_output( @@ -1624,6 +1692,8 @@ def _normalize_video_file(path: Path, info: dict[str, str]) -> Path: if tmp_path.exists(): tmp_path.unlink() + duration = media_duration_seconds(path) + def build_cmd(v_encoder: str) -> list[str]: cmd = [ "ffmpeg", "-nostdin", "-y", @@ -1673,7 +1743,7 @@ def _normalize_video_file(path: Path, info: dict[str, str]) -> Path: for idx, enc in enumerate(encoders_to_try): try: print(f"[normalize] video -> {final_path.name} codec={enc}", flush=True) - subprocess.check_call(build_cmd(enc)) + _ffmpeg_run_with_progress(build_cmd(enc), duration, final_path.name) rename_media_sidecars(path, final_path, skip={tmp_path}) return _finalize_normalized_output(path, final_path, tmp_path) except subprocess.CalledProcessError as e: @@ -1706,6 +1776,8 @@ def _normalize_audio_file(path: Path, info: dict[str, str]) -> Path: if tmp_path.exists(): tmp_path.unlink() + duration = media_duration_seconds(path) + cmd = [ "ffmpeg", "-nostdin", "-y", "-i", str(path), @@ -1720,7 +1792,7 @@ def _normalize_audio_file(path: Path, info: dict[str, str]) -> Path: print(f"[normalize] audio -> {final_path.name} codec={AUDIO_NORMALIZE_CODEC}", flush=True) try: - subprocess.check_call(cmd) + _ffmpeg_run_with_progress(cmd, duration, final_path.name) except subprocess.CalledProcessError as e: if tmp_path.exists(): tmp_path.unlink()