diff --git a/app/worker.py b/app/worker.py index 76ad885..8d311c7 100644 --- a/app/worker.py +++ b/app/worker.py @@ -15,6 +15,30 @@ LIB = Path(os.getenv("LIBRARY_ROOT", "/library")) TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) TMP = Path(os.getenv("TMP_ROOT", "/tmpdl")) +# --- Runtime pause switch for CPU-heavy work (no rebuild needed) --- +PAUSE_TRANSCRIBE_FILE = Path(os.getenv("PAUSE_TRANSCRIBE_FILE", str(TRN / ".pause_transcribe"))) + +def transcribe_paused() -> bool: + """Return True if new transcription work should be paused.""" + try: + return PAUSE_TRANSCRIBE_FILE.exists() + except Exception: + return False + +def wait_if_paused(label: str = "transcribe", poll_sec: int = 10): + """ + If the pause file exists, block this worker in a low-CPU sleep loop until it is removed. + This lets you 'pause' heavy work without killing workers or rebuilding. + """ + try: + if transcribe_paused(): + print(f"[pause] {label}: pause flag present at {PAUSE_TRANSCRIBE_FILE}; waiting…", flush=True) + while transcribe_paused(): + time.sleep(max(1, int(poll_sec))) + except Exception: + # If anything goes wrong reading the flag, don't block the pipeline. + pass + MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3") COMPUTE = os.getenv("WHISPER_PRECISION","int8") WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip() @@ -1326,6 +1350,7 @@ def _postprocess_after_transcribe(media_path: Path, base: Path): def transcribe_job(path_str: str): """RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'.""" + wait_if_paused("transcribe_job") p = Path(path_str) base = transcribe(p) _postprocess_after_transcribe(p, base) @@ -1334,6 +1359,8 @@ def transcribe_job(path_str: str): def enqueue_transcribe(path: Path) -> bool: """Enqueue a transcription job to the 'transcribe' queue. Returns True on success.""" try: + if transcribe_paused(): + print(f"[queue] pause flag present; enqueuing job for {path} but workers will wait", flush=True) conn = Redis.from_url(REDIS_URL) q = Queue("transcribe", connection=conn, default_timeout=60*60*24) # Use dotted path so workers in other processes can import @@ -1447,6 +1474,8 @@ def handle_local_file(path_str: str): return # 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker) + # Respect runtime pause switch before starting heavy work + wait_if_paused("handle_local_file") if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p): log({**info, **{"status": "queued_transcribe"}}) return @@ -1666,6 +1695,8 @@ def handle_url(url: str): if repo_json: base = reuse_repo_transcript(dest, repo_json) if not base: + # Respect runtime pause switch before starting heavy work + wait_if_paused("handle_url") if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest): log({**info, **{"status": "queued_transcribe"}}) continue diff --git a/scripts/podx-tools.sh b/scripts/podx-tools.sh index c25ba8e..4dd1660 100755 --- a/scripts/podx-tools.sh +++ b/scripts/podx-tools.sh @@ -85,6 +85,9 @@ fi # Optional: explicit KB id to use for all KB operations (bypasses name resolution) : "${OPENWEBUI_KB_ID:=}" +# Max seconds to wait for OWUI to finish extracting file content before attach +: "${OPENWEBUI_WAIT_SECS:=180}" + # Resolve a working OpenWebUI base URL (fallback from host.docker.internal -> localhost) _owui_url() { # If a host-only override is provided, prefer it unconditionally @@ -225,6 +228,21 @@ EOF done } +# --- Redis helpers (for pause/resume switches) ----------------------------- +_redis_cli() { + # Prefer dockerized redis if available; otherwise fall back to local redis-cli + if command -v docker >/dev/null 2>&1 && docker compose ps redis >/dev/null 2>&1; then + docker compose exec -T redis redis-cli "$@" + elif command -v redis-cli >/dev/null 2>&1; then + redis-cli -h "${REDIS_HOST:-127.0.0.1}" -p "${REDIS_PORT:-6379}" "$@" + else + echo "redis-cli not found and no docker compose redis service available." >&2 + return 1 + fi +} + +_transcribe_key="podx:transcribe:paused" + _help() { cat <" # attach all files matching glob +Transcription control: + transcribe-status # show whether transcription workers are paused + transcribe-pause # pause CPU-heavy transcription jobs + transcribe-resume # resume transcription jobs + Examples: ./scripts/podx-tools.sh meili-health ./scripts/podx-tools.sh meili-search "grand canyon" 10 @@ -451,6 +474,7 @@ PY _require "OPENWEBUI_URL" "$OPENWEBUI_URL" _require "OPENWEBUI_API_KEY" "$OPENWEBUI_API_KEY" + TMP_EXTRACT="" # Decide how to send the file (force text/plain for .txt/.md; optionally extract JSON->text) upload_flag=("-F" "file=@$file") ext="${file##*.}" @@ -475,6 +499,7 @@ PY stem="${base%.*}" upload_flag=("-F" "file=@$tmp_txt;type=text/plain;filename=${stem}.txt") echo "[owui] extracted text from JSON -> ${stem}.txt" + TMP_EXTRACT="$tmp_txt" else echo "[owui] WARNING: JSON had no extractable text, uploading raw JSON (may be rejected)" >&2 fi @@ -507,7 +532,7 @@ PY if [ -z "$FILE_ID" ]; then echo "Upload failed (no file id)"; exit 1; fi # Wait until OWUI finishes processing/extracting text for this file (prevents "content empty" 400) - if ! _owui_wait_file "$FILE_ID" 180; then + if ! _owui_wait_file "$FILE_ID" "$OPENWEBUI_WAIT_SECS"; then echo "[owui] WARNING: timed out waiting for file content; attach may fail if OWUI hasn't extracted text yet" >&2 fi @@ -537,6 +562,7 @@ PY echo "$RESP" | ppjson rm -f "$tmp_body" "$tmp_code" "$tmp_hdrs" + [ -n "${TMP_EXTRACT:-}" ] && rm -f "$TMP_EXTRACT"; unset TMP_EXTRACT if [ $curl_exit -ne 0 ]; then echo "Attach failed: curl exit $curl_exit" >&2; exit $curl_exit @@ -550,6 +576,10 @@ PY : # success ;; *) + if printf '%s' "$RESP" | grep -qi "Duplicate content"; then + echo "[owui] duplicate content — already indexed. Treating as success." + exit 0 + fi echo "Attach failed (HTTP $http_code)" >&2; exit 1 ;; esac @@ -564,6 +594,7 @@ PY _require "OPENWEBUI_URL" "$OPENWEBUI_URL" _require "OPENWEBUI_API_KEY" "$OPENWEBUI_API_KEY" + TMP_EXTRACT="" # Decide how to send the file (force text/plain for .txt/.md; optionally extract JSON->text) upload_flag=("-F" "file=@$file") ext="${file##*.}" @@ -588,6 +619,7 @@ PY stem="${base%.*}" upload_flag=("-F" "file=@$tmp_txt;type=text/plain;filename=${stem}.txt") echo "[owui] extracted text from JSON -> ${stem}.txt" + TMP_EXTRACT="$tmp_txt" else echo "[owui] WARNING: JSON had no extractable text, uploading raw JSON (may be rejected)" >&2 fi @@ -620,7 +652,7 @@ PY if [ -z "$FILE_ID" ]; then echo "Upload failed (no file id)"; exit 1; fi # Wait until OWUI finishes processing/extracting text for this file (prevents "content empty" 400) - if ! _owui_wait_file "$FILE_ID" 180; then + if ! _owui_wait_file "$FILE_ID" "$OPENWEBUI_WAIT_SECS"; then echo "[owui] WARNING: timed out waiting for file content; attach may fail if OWUI hasn't extracted text yet" >&2 fi @@ -640,6 +672,7 @@ PY echo "$RESP" | ppjson rm -f "$tmp_body" "$tmp_code" "$tmp_hdrs" + [ -n "${TMP_EXTRACT:-}" ] && rm -f "$TMP_EXTRACT"; unset TMP_EXTRACT if [ $curl_exit -ne 0 ]; then echo "Attach failed: curl exit $curl_exit" >&2; exit $curl_exit @@ -649,7 +682,13 @@ PY fi case "$http_code" in 200|201|204) : ;; - *) echo "Attach failed (HTTP $http_code)" >&2; exit 1 ;; + *) + if printf '%s' "$RESP" | grep -qi "Duplicate content"; then + echo "[owui] duplicate content — already indexed. Treating as success." + exit 0 + fi + echo "Attach failed (HTTP $http_code)" >&2; exit 1 + ;; esac ;; @@ -773,6 +812,31 @@ PY done ;; + transcribe-status) + # Prints 'paused' if the switch key is set to a truthy value, else 'running' + val="$(_redis_cli GET "$_transcribe_key" 2>/dev/null || true)" + if [ -n "${val:-}" ] && [ "${val}" != "(nil)" ] && [ "${val}" != "0" ]; then + echo "paused" + exit 0 + fi + echo "running" + ;; + transcribe-pause) + if _redis_cli SET "$_transcribe_key" 1 >/dev/null; then + echo "Transcription: paused" + else + echo "Failed to talk to redis to set pause switch." >&2 + exit 1 + fi + ;; + transcribe-resume) + if _redis_cli DEL "$_transcribe_key" >/dev/null; then + echo "Transcription: resumed" + else + echo "Failed to talk to redis to clear pause switch." >&2 + exit 1 + fi + ;; *) echo "Unknown command: $cmd" >&2 _help