Adding ability to pause transcription
This commit is contained in:
@@ -15,6 +15,30 @@ LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
|
||||
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
|
||||
TMP = Path(os.getenv("TMP_ROOT", "/tmpdl"))
|
||||
|
||||
# --- Runtime pause switch for CPU-heavy work (no rebuild needed) ---
|
||||
PAUSE_TRANSCRIBE_FILE = Path(os.getenv("PAUSE_TRANSCRIBE_FILE", str(TRN / ".pause_transcribe")))
|
||||
|
||||
def transcribe_paused() -> bool:
|
||||
"""Return True if new transcription work should be paused."""
|
||||
try:
|
||||
return PAUSE_TRANSCRIBE_FILE.exists()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def wait_if_paused(label: str = "transcribe", poll_sec: int = 10):
|
||||
"""
|
||||
If the pause file exists, block this worker in a low-CPU sleep loop until it is removed.
|
||||
This lets you 'pause' heavy work without killing workers or rebuilding.
|
||||
"""
|
||||
try:
|
||||
if transcribe_paused():
|
||||
print(f"[pause] {label}: pause flag present at {PAUSE_TRANSCRIBE_FILE}; waiting…", flush=True)
|
||||
while transcribe_paused():
|
||||
time.sleep(max(1, int(poll_sec)))
|
||||
except Exception:
|
||||
# If anything goes wrong reading the flag, don't block the pipeline.
|
||||
pass
|
||||
|
||||
MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3")
|
||||
COMPUTE = os.getenv("WHISPER_PRECISION","int8")
|
||||
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip()
|
||||
@@ -1326,6 +1350,7 @@ def _postprocess_after_transcribe(media_path: Path, base: Path):
|
||||
|
||||
def transcribe_job(path_str: str):
|
||||
"""RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'."""
|
||||
wait_if_paused("transcribe_job")
|
||||
p = Path(path_str)
|
||||
base = transcribe(p)
|
||||
_postprocess_after_transcribe(p, base)
|
||||
@@ -1334,6 +1359,8 @@ def transcribe_job(path_str: str):
|
||||
def enqueue_transcribe(path: Path) -> bool:
|
||||
"""Enqueue a transcription job to the 'transcribe' queue. Returns True on success."""
|
||||
try:
|
||||
if transcribe_paused():
|
||||
print(f"[queue] pause flag present; enqueuing job for {path} but workers will wait", flush=True)
|
||||
conn = Redis.from_url(REDIS_URL)
|
||||
q = Queue("transcribe", connection=conn, default_timeout=60*60*24)
|
||||
# Use dotted path so workers in other processes can import
|
||||
@@ -1447,6 +1474,8 @@ def handle_local_file(path_str: str):
|
||||
return
|
||||
|
||||
# 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker)
|
||||
# Respect runtime pause switch before starting heavy work
|
||||
wait_if_paused("handle_local_file")
|
||||
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
|
||||
log({**info, **{"status": "queued_transcribe"}})
|
||||
return
|
||||
@@ -1666,6 +1695,8 @@ def handle_url(url: str):
|
||||
if repo_json:
|
||||
base = reuse_repo_transcript(dest, repo_json)
|
||||
if not base:
|
||||
# Respect runtime pause switch before starting heavy work
|
||||
wait_if_paused("handle_url")
|
||||
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
|
||||
log({**info, **{"status": "queued_transcribe"}})
|
||||
continue
|
||||
|
Reference in New Issue
Block a user