From 688d0406c095c3ac9e1561a970935a19bb8af271 Mon Sep 17 00:00:00 2001 From: Tomas Kracmar Date: Mon, 8 Sep 2025 16:02:42 +0200 Subject: [PATCH] Actual worker differentiation --- app/worker.py | 150 +++++++++++++++++++++++++++++---------------- docker-compose.yml | 4 ++ 2 files changed, 101 insertions(+), 53 deletions(-) diff --git a/app/worker.py b/app/worker.py index 981a80c..9529193 100644 --- a/app/worker.py +++ b/app/worker.py @@ -1,4 +1,6 @@ -import os, subprocess, shutil, json, re, orjson, requests +import os, subprocess, shutil, json, re, orjson, requests, unicodedata +from rq import Queue +from redis import Redis from pathlib import Path import math import difflib @@ -36,6 +38,10 @@ OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/") OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "") OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library") +# Redis-backed job queue settings and offload toggle +REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0").strip() +OFFLOAD_TRANSCRIBE = os.getenv("OFFLOAD_TRANSCRIBE", "1").lower() not in ("0", "false", "no") + # Worker role selection WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe' JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()] @@ -1122,6 +1128,15 @@ def slugify(text): text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_') return text[:120] or 'page' +def _norm(s: str | None) -> str: + """Normalize strings for stable comparisons across Unicode lookalikes and stray whitespace.""" + if s is None: + return "" + try: + return unicodedata.normalize("NFKC", s).strip() + except Exception: + return (s or "").strip() + def save_web_snapshot(url: str): r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"}) r.raise_for_status() @@ -1183,8 +1198,9 @@ def owui_get_or_create_kb(): r.raise_for_status() body = r.json() items = body if isinstance(body, list) else body.get("data", []) - # Prefer exact name match; if multiple, pick the most recently updated - matches = [kb for kb in items if (kb.get("name") or "") == OWUI_KB] + # Prefer exact normalized name match; if multiple, pick the most recently updated + kb_target = _norm(OWUI_KB) + matches = [kb for kb in items if _norm(kb.get("name")) == kb_target] if matches: try: matches.sort(key=lambda k: k.get("updated_at") or 0, reverse=True) @@ -1213,8 +1229,9 @@ def owui_get_or_create_kb(): rr.raise_for_status() body = rr.json() items = body if isinstance(body, list) else body.get("data", []) + kb_target = _norm(OWUI_KB) for kb in items: - if (kb.get("name") or "") == OWUI_KB: + if _norm(kb.get("name")) == kb_target: return kb.get("id") except Exception: pass @@ -1224,14 +1241,21 @@ def owui_upload_and_attach(path: Path, kb_id: str): with open(path, "rb") as f: r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10) r.raise_for_status() - file_id = r.json()["data"]["id"] + up = r.json() + file_id = (up.get("id") or (up.get("data") or {}).get("id")) + if not file_id: + raise RuntimeError(f"OWUI upload: could not get file id from response: {up}") r = requests.post( f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add", headers={**owui_headers(), "Content-Type": "application/json"}, data=orjson.dumps({"file_id": file_id}), - timeout=60, + timeout=180, ) r.raise_for_status() + try: + time.sleep(0.5) + except Exception: + pass return True def publish_to_openwebui(paths): @@ -1239,6 +1263,9 @@ def publish_to_openwebui(paths): return try: kb_id = owui_get_or_create_kb() + if not kb_id: + print("[owui] KB resolve failed; skipping attach to avoid accidental duplicates", flush=True) + return for p in paths: p = Path(p) if not p.exists(): @@ -1250,6 +1277,61 @@ def publish_to_openwebui(paths): except Exception as e: log({"status": "owui_error", "error": str(e)}) +# --------- Post-transcribe pipeline and job/queue helpers --------- + +def _postprocess_after_transcribe(media_path: Path, base: Path): + """Common steps after we have a `base` transcript path: index, publish, NFO, artwork.""" + try: + index_meili(base.with_suffix(".json")) + except Exception as e: + print(f"[post] meili index failed: {e}", flush=True) + try: + publish_to_openwebui([base.with_suffix(".txt")]) + except Exception as e: + print(f"[post] owui publish failed: {e}", flush=True) + # Build metadata using existing helper + try: + title = media_path.stem + fallback = { + "title": title, + "episode_title": title, + "show": media_path.parent.name, + "description": "", + "pubdate": _extract_date_from_stem(title), + "duration_sec": media_duration_seconds(media_path), + "image": "", + "guid": "", + } + meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None) + ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8") + write_episode_nfo(media_path, meta, ttxt) + try: + save_episode_artwork(meta.get("image"), media_path, meta.get("show")) + except Exception: + pass + except Exception as e: + print(f"[post] NFO write failed: {e}", flush=True) + +def transcribe_job(path_str: str): + """RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'.""" + p = Path(path_str) + base = transcribe(p) + _postprocess_after_transcribe(p, base) + return str(base) + +def enqueue_transcribe(path: Path) -> bool: + """Enqueue a transcription job to the 'transcribe' queue. Returns True on success.""" + try: + conn = Redis.from_url(REDIS_URL) + q = Queue("transcribe", connection=conn, default_timeout=60*60*24) + # Use dotted path so workers in other processes can import + q.enqueue("worker.transcribe_job", str(path), job_timeout=60*60*24) + print(f"[queue] enqueued transcribe job for {path}", flush=True) + return True + except Exception as e: + print(f"[queue] enqueue failed, will transcribe inline: {e}", flush=True) + return False + def handle_local_file(path_str: str): """Transcribe & index a local media file that already exists in /library. If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper. @@ -1352,30 +1434,12 @@ def handle_local_file(path_str: str): log({**info, **{"status": "done", "note": "reused_repo_transcript"}}) return - # 2) Otherwise, run transcription + # 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker) + if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p): + log({**info, **{"status": "queued_transcribe"}}) + return base = transcribe(p) - index_meili(base.with_suffix(".json")) - publish_to_openwebui([base.with_suffix(".txt")]) - try: - fallback = { - "title": title, - "episode_title": title, - "show": p.parent.name, - "description": "", - "pubdate": _extract_date_from_stem(title), - "duration_sec": media_duration_seconds(p), - "image": "", - "guid": "", - } - meta = build_meta_from_sources(p, p.parent.name, fallback, ep=None) - ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8") - write_episode_nfo(p, meta, ttxt) - try: - save_episode_artwork(meta.get("image"), p, meta.get("show")) - except Exception: - pass - except Exception as e: - print(f"[post] NFO write failed: {e}", flush=True) + _postprocess_after_transcribe(p, base) log({**info, **{"status": "done"}}) except Exception as e: log({"url": path_str, "status": "error", "error": str(e)}) @@ -1590,31 +1654,11 @@ def handle_url(url: str): if repo_json: base = reuse_repo_transcript(dest, repo_json) if not base: + if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest): + log({**info, **{"status": "queued_transcribe"}}) + continue base = transcribe(dest) - index_meili(base.with_suffix(".json")) - publish_to_openwebui([base.with_suffix(".txt")]) - try: - # Build metadata from RSS (if matched), yt-dlp info.json, and sensible fallbacks - fallback = { - "title": dest.stem, - "episode_title": dest.stem, - "show": uploader, - "description": "", - "pubdate": _extract_date_from_stem(dest.stem), - "duration_sec": media_duration_seconds(dest), - "image": "", - "guid": "", - } - meta = build_meta_from_sources(dest, uploader, fallback, ep if 'ep' in locals() else None) - ttxt = base.with_suffix(".txt").read_text(encoding="utf-8") - write_episode_nfo(dest, meta, ttxt) - # Save local artwork for Plex/Kodi from meta image url - try: - save_episode_artwork(meta.get("image"), dest, meta.get("show")) - except Exception: - pass - except Exception as e: - print(f"[post] NFO write failed: {e}", flush=True) + _postprocess_after_transcribe(dest, base) log({**info, **{"status":"done"}}) except Exception as e: log({"url": url, "status":"error", "error": str(e)}) diff --git a/docker-compose.yml b/docker-compose.yml index 33103c4..26c410e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,6 +32,7 @@ services: timeout: 5s retries: 3 + # Main worker: handles downloads, indexing, RSS, OWUI, etc. (no heavy Whisper) podx-worker: build: ./app container_name: podx-worker @@ -71,6 +72,7 @@ services: extra_hosts: - host.docker.internal:host-gateway + # Transcribe-only worker: listens to the "transcribe" queue and runs Whisper jobs podx-worker-transcribe: build: ./app container_name: podx-worker-transcribe @@ -150,6 +152,7 @@ services: # - /mnt/secure/cookies.txt:/config/cookies.txt:ro restart: unless-stopped + # Scanner: watches /library and enqueues jobs (heavy jobs go to "transcribe" queue) podx-scanner: build: ./app container_name: podx-scanner @@ -160,6 +163,7 @@ services: REDIS_URL: redis://redis:6379/0 LIBRARY_ROOT: /library TRANSCRIPT_ROOT: /transcripts + TRANSCRIBE_QUEUE: transcribe SCAN_INTERVAL: 30 JOB_TIMEOUT: ${JOB_TIMEOUT:-14400} JOB_TTL: ${JOB_TTL:-86400}