From 12d9ff52004aae4f34a3978ce56ac9bc69194690 Mon Sep 17 00:00:00 2001 From: Tomas Kracmar Date: Thu, 11 Sep 2025 16:02:39 +0200 Subject: [PATCH] Adding instant pause --- app/worker.py | 52 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/app/worker.py b/app/worker.py index 8d311c7..50ffec9 100644 --- a/app/worker.py +++ b/app/worker.py @@ -39,6 +39,11 @@ def wait_if_paused(label: str = "transcribe", poll_sec: int = 10): # If anything goes wrong reading the flag, don't block the pipeline. pass +# --- Exception to abort transcription when pause is requested --- +class PauseInterrupt(Exception): + """Raised to cooperatively abort a running transcription when pause is requested.""" + pass + MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3") COMPUTE = os.getenv("WHISPER_PRECISION","int8") WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip() @@ -944,6 +949,22 @@ def transcribe(media_path: Path): segs.append(seg) text_parts.append(s.text) + # --- Cooperative pause: save checkpoint and abort as soon as pause is requested --- + if transcribe_paused(): + try: + pct = int(min(100, max(0, (end / dur) * 100))) if dur > 0 else 0 + except Exception: + pct = 0 + _save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs) + log({ + "status": "paused", + "path": str(media_path), + "title": title, + "progress": pct + }) + print(f"[pause] transcribe: pause requested mid-run; aborting at ~{end:.2f}s for {media_path}", flush=True) + raise PauseInterrupt("pause requested") + if WHISPER_LOG_SEGMENTS: print(f"[whisper] {start:8.2f}–{end:8.2f} {s.text.strip()}", flush=True) @@ -1350,9 +1371,16 @@ def _postprocess_after_transcribe(media_path: Path, base: Path): def transcribe_job(path_str: str): """RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'.""" - wait_if_paused("transcribe_job") + # Do NOT block when paused; exit quickly so CPU-heavy work stops ASAP. + if transcribe_paused(): + print(f"[pause] transcribe_job: pause is active; skipping start for {path_str}", flush=True) + return "paused" p = Path(path_str) - base = transcribe(p) + try: + base = transcribe(p) + except PauseInterrupt: + print(f"[pause] transcribe_job: paused mid-run for {p}", flush=True) + return "paused" _postprocess_after_transcribe(p, base) return str(base) @@ -1474,8 +1502,14 @@ def handle_local_file(path_str: str): return # 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker) - # Respect runtime pause switch before starting heavy work - wait_if_paused("handle_local_file") + # If paused, do not block; either enqueue (so worker will pause) or skip now. + if transcribe_paused(): + if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p): + log({**info, **{"status": "queued_transcribe"}}) + return + log({**info, **{"status": "paused"}}) + print(f"[pause] handle_local_file: pause active; not starting {p}", flush=True) + return if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p): log({**info, **{"status": "queued_transcribe"}}) return @@ -1695,8 +1729,14 @@ def handle_url(url: str): if repo_json: base = reuse_repo_transcript(dest, repo_json) if not base: - # Respect runtime pause switch before starting heavy work - wait_if_paused("handle_url") + # If paused, do not block; either enqueue (so worker will pause) or skip now. + if transcribe_paused(): + if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest): + log({**info, **{"status": "queued_transcribe"}}) + continue + log({**info, **{"status": "paused"}}) + print(f"[pause] handle_url: pause active; not starting {dest}", flush=True) + continue if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest): log({**info, **{"status": "queued_transcribe"}}) continue