Actual worker differentiation
This commit is contained in:
150
app/worker.py
150
app/worker.py
@@ -1,4 +1,6 @@
|
|||||||
import os, subprocess, shutil, json, re, orjson, requests
|
import os, subprocess, shutil, json, re, orjson, requests, unicodedata
|
||||||
|
from rq import Queue
|
||||||
|
from redis import Redis
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import math
|
import math
|
||||||
import difflib
|
import difflib
|
||||||
@@ -36,6 +38,10 @@ OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/")
|
|||||||
OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "")
|
OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "")
|
||||||
OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library")
|
OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library")
|
||||||
|
|
||||||
|
# Redis-backed job queue settings and offload toggle
|
||||||
|
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0").strip()
|
||||||
|
OFFLOAD_TRANSCRIBE = os.getenv("OFFLOAD_TRANSCRIBE", "1").lower() not in ("0", "false", "no")
|
||||||
|
|
||||||
# Worker role selection
|
# Worker role selection
|
||||||
WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe'
|
WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe'
|
||||||
JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()]
|
JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()]
|
||||||
@@ -1122,6 +1128,15 @@ def slugify(text):
|
|||||||
text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_')
|
text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_')
|
||||||
return text[:120] or 'page'
|
return text[:120] or 'page'
|
||||||
|
|
||||||
|
def _norm(s: str | None) -> str:
|
||||||
|
"""Normalize strings for stable comparisons across Unicode lookalikes and stray whitespace."""
|
||||||
|
if s is None:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
return unicodedata.normalize("NFKC", s).strip()
|
||||||
|
except Exception:
|
||||||
|
return (s or "").strip()
|
||||||
|
|
||||||
def save_web_snapshot(url: str):
|
def save_web_snapshot(url: str):
|
||||||
r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"})
|
r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"})
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
@@ -1183,8 +1198,9 @@ def owui_get_or_create_kb():
|
|||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
body = r.json()
|
body = r.json()
|
||||||
items = body if isinstance(body, list) else body.get("data", [])
|
items = body if isinstance(body, list) else body.get("data", [])
|
||||||
# Prefer exact name match; if multiple, pick the most recently updated
|
# Prefer exact normalized name match; if multiple, pick the most recently updated
|
||||||
matches = [kb for kb in items if (kb.get("name") or "") == OWUI_KB]
|
kb_target = _norm(OWUI_KB)
|
||||||
|
matches = [kb for kb in items if _norm(kb.get("name")) == kb_target]
|
||||||
if matches:
|
if matches:
|
||||||
try:
|
try:
|
||||||
matches.sort(key=lambda k: k.get("updated_at") or 0, reverse=True)
|
matches.sort(key=lambda k: k.get("updated_at") or 0, reverse=True)
|
||||||
@@ -1213,8 +1229,9 @@ def owui_get_or_create_kb():
|
|||||||
rr.raise_for_status()
|
rr.raise_for_status()
|
||||||
body = rr.json()
|
body = rr.json()
|
||||||
items = body if isinstance(body, list) else body.get("data", [])
|
items = body if isinstance(body, list) else body.get("data", [])
|
||||||
|
kb_target = _norm(OWUI_KB)
|
||||||
for kb in items:
|
for kb in items:
|
||||||
if (kb.get("name") or "") == OWUI_KB:
|
if _norm(kb.get("name")) == kb_target:
|
||||||
return kb.get("id")
|
return kb.get("id")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
@@ -1224,14 +1241,21 @@ def owui_upload_and_attach(path: Path, kb_id: str):
|
|||||||
with open(path, "rb") as f:
|
with open(path, "rb") as f:
|
||||||
r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10)
|
r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
file_id = r.json()["data"]["id"]
|
up = r.json()
|
||||||
|
file_id = (up.get("id") or (up.get("data") or {}).get("id"))
|
||||||
|
if not file_id:
|
||||||
|
raise RuntimeError(f"OWUI upload: could not get file id from response: {up}")
|
||||||
r = requests.post(
|
r = requests.post(
|
||||||
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
|
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
|
||||||
headers={**owui_headers(), "Content-Type": "application/json"},
|
headers={**owui_headers(), "Content-Type": "application/json"},
|
||||||
data=orjson.dumps({"file_id": file_id}),
|
data=orjson.dumps({"file_id": file_id}),
|
||||||
timeout=60,
|
timeout=180,
|
||||||
)
|
)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
|
try:
|
||||||
|
time.sleep(0.5)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def publish_to_openwebui(paths):
|
def publish_to_openwebui(paths):
|
||||||
@@ -1239,6 +1263,9 @@ def publish_to_openwebui(paths):
|
|||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
kb_id = owui_get_or_create_kb()
|
kb_id = owui_get_or_create_kb()
|
||||||
|
if not kb_id:
|
||||||
|
print("[owui] KB resolve failed; skipping attach to avoid accidental duplicates", flush=True)
|
||||||
|
return
|
||||||
for p in paths:
|
for p in paths:
|
||||||
p = Path(p)
|
p = Path(p)
|
||||||
if not p.exists():
|
if not p.exists():
|
||||||
@@ -1250,6 +1277,61 @@ def publish_to_openwebui(paths):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
log({"status": "owui_error", "error": str(e)})
|
log({"status": "owui_error", "error": str(e)})
|
||||||
|
|
||||||
|
# --------- Post-transcribe pipeline and job/queue helpers ---------
|
||||||
|
|
||||||
|
def _postprocess_after_transcribe(media_path: Path, base: Path):
|
||||||
|
"""Common steps after we have a `base` transcript path: index, publish, NFO, artwork."""
|
||||||
|
try:
|
||||||
|
index_meili(base.with_suffix(".json"))
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[post] meili index failed: {e}", flush=True)
|
||||||
|
try:
|
||||||
|
publish_to_openwebui([base.with_suffix(".txt")])
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[post] owui publish failed: {e}", flush=True)
|
||||||
|
# Build metadata using existing helper
|
||||||
|
try:
|
||||||
|
title = media_path.stem
|
||||||
|
fallback = {
|
||||||
|
"title": title,
|
||||||
|
"episode_title": title,
|
||||||
|
"show": media_path.parent.name,
|
||||||
|
"description": "",
|
||||||
|
"pubdate": _extract_date_from_stem(title),
|
||||||
|
"duration_sec": media_duration_seconds(media_path),
|
||||||
|
"image": "",
|
||||||
|
"guid": "",
|
||||||
|
}
|
||||||
|
meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None)
|
||||||
|
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
|
||||||
|
write_episode_nfo(media_path, meta, ttxt)
|
||||||
|
try:
|
||||||
|
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[post] NFO write failed: {e}", flush=True)
|
||||||
|
|
||||||
|
def transcribe_job(path_str: str):
|
||||||
|
"""RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'."""
|
||||||
|
p = Path(path_str)
|
||||||
|
base = transcribe(p)
|
||||||
|
_postprocess_after_transcribe(p, base)
|
||||||
|
return str(base)
|
||||||
|
|
||||||
|
def enqueue_transcribe(path: Path) -> bool:
|
||||||
|
"""Enqueue a transcription job to the 'transcribe' queue. Returns True on success."""
|
||||||
|
try:
|
||||||
|
conn = Redis.from_url(REDIS_URL)
|
||||||
|
q = Queue("transcribe", connection=conn, default_timeout=60*60*24)
|
||||||
|
# Use dotted path so workers in other processes can import
|
||||||
|
q.enqueue("worker.transcribe_job", str(path), job_timeout=60*60*24)
|
||||||
|
print(f"[queue] enqueued transcribe job for {path}", flush=True)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[queue] enqueue failed, will transcribe inline: {e}", flush=True)
|
||||||
|
return False
|
||||||
|
|
||||||
def handle_local_file(path_str: str):
|
def handle_local_file(path_str: str):
|
||||||
"""Transcribe & index a local media file that already exists in /library.
|
"""Transcribe & index a local media file that already exists in /library.
|
||||||
If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper.
|
If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper.
|
||||||
@@ -1352,30 +1434,12 @@ def handle_local_file(path_str: str):
|
|||||||
log({**info, **{"status": "done", "note": "reused_repo_transcript"}})
|
log({**info, **{"status": "done", "note": "reused_repo_transcript"}})
|
||||||
return
|
return
|
||||||
|
|
||||||
# 2) Otherwise, run transcription
|
# 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker)
|
||||||
|
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
|
||||||
|
log({**info, **{"status": "queued_transcribe"}})
|
||||||
|
return
|
||||||
base = transcribe(p)
|
base = transcribe(p)
|
||||||
index_meili(base.with_suffix(".json"))
|
_postprocess_after_transcribe(p, base)
|
||||||
publish_to_openwebui([base.with_suffix(".txt")])
|
|
||||||
try:
|
|
||||||
fallback = {
|
|
||||||
"title": title,
|
|
||||||
"episode_title": title,
|
|
||||||
"show": p.parent.name,
|
|
||||||
"description": "",
|
|
||||||
"pubdate": _extract_date_from_stem(title),
|
|
||||||
"duration_sec": media_duration_seconds(p),
|
|
||||||
"image": "",
|
|
||||||
"guid": "",
|
|
||||||
}
|
|
||||||
meta = build_meta_from_sources(p, p.parent.name, fallback, ep=None)
|
|
||||||
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
|
|
||||||
write_episode_nfo(p, meta, ttxt)
|
|
||||||
try:
|
|
||||||
save_episode_artwork(meta.get("image"), p, meta.get("show"))
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[post] NFO write failed: {e}", flush=True)
|
|
||||||
log({**info, **{"status": "done"}})
|
log({**info, **{"status": "done"}})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log({"url": path_str, "status": "error", "error": str(e)})
|
log({"url": path_str, "status": "error", "error": str(e)})
|
||||||
@@ -1590,31 +1654,11 @@ def handle_url(url: str):
|
|||||||
if repo_json:
|
if repo_json:
|
||||||
base = reuse_repo_transcript(dest, repo_json)
|
base = reuse_repo_transcript(dest, repo_json)
|
||||||
if not base:
|
if not base:
|
||||||
|
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
|
||||||
|
log({**info, **{"status": "queued_transcribe"}})
|
||||||
|
continue
|
||||||
base = transcribe(dest)
|
base = transcribe(dest)
|
||||||
index_meili(base.with_suffix(".json"))
|
_postprocess_after_transcribe(dest, base)
|
||||||
publish_to_openwebui([base.with_suffix(".txt")])
|
|
||||||
try:
|
|
||||||
# Build metadata from RSS (if matched), yt-dlp info.json, and sensible fallbacks
|
|
||||||
fallback = {
|
|
||||||
"title": dest.stem,
|
|
||||||
"episode_title": dest.stem,
|
|
||||||
"show": uploader,
|
|
||||||
"description": "",
|
|
||||||
"pubdate": _extract_date_from_stem(dest.stem),
|
|
||||||
"duration_sec": media_duration_seconds(dest),
|
|
||||||
"image": "",
|
|
||||||
"guid": "",
|
|
||||||
}
|
|
||||||
meta = build_meta_from_sources(dest, uploader, fallback, ep if 'ep' in locals() else None)
|
|
||||||
ttxt = base.with_suffix(".txt").read_text(encoding="utf-8")
|
|
||||||
write_episode_nfo(dest, meta, ttxt)
|
|
||||||
# Save local artwork for Plex/Kodi from meta image url
|
|
||||||
try:
|
|
||||||
save_episode_artwork(meta.get("image"), dest, meta.get("show"))
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
|
||||||
print(f"[post] NFO write failed: {e}", flush=True)
|
|
||||||
log({**info, **{"status":"done"}})
|
log({**info, **{"status":"done"}})
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log({"url": url, "status":"error", "error": str(e)})
|
log({"url": url, "status":"error", "error": str(e)})
|
||||||
|
@@ -32,6 +32,7 @@ services:
|
|||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 3
|
retries: 3
|
||||||
|
|
||||||
|
# Main worker: handles downloads, indexing, RSS, OWUI, etc. (no heavy Whisper)
|
||||||
podx-worker:
|
podx-worker:
|
||||||
build: ./app
|
build: ./app
|
||||||
container_name: podx-worker
|
container_name: podx-worker
|
||||||
@@ -71,6 +72,7 @@ services:
|
|||||||
extra_hosts:
|
extra_hosts:
|
||||||
- host.docker.internal:host-gateway
|
- host.docker.internal:host-gateway
|
||||||
|
|
||||||
|
# Transcribe-only worker: listens to the "transcribe" queue and runs Whisper jobs
|
||||||
podx-worker-transcribe:
|
podx-worker-transcribe:
|
||||||
build: ./app
|
build: ./app
|
||||||
container_name: podx-worker-transcribe
|
container_name: podx-worker-transcribe
|
||||||
@@ -150,6 +152,7 @@ services:
|
|||||||
# - /mnt/secure/cookies.txt:/config/cookies.txt:ro
|
# - /mnt/secure/cookies.txt:/config/cookies.txt:ro
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
|
# Scanner: watches /library and enqueues jobs (heavy jobs go to "transcribe" queue)
|
||||||
podx-scanner:
|
podx-scanner:
|
||||||
build: ./app
|
build: ./app
|
||||||
container_name: podx-scanner
|
container_name: podx-scanner
|
||||||
@@ -160,6 +163,7 @@ services:
|
|||||||
REDIS_URL: redis://redis:6379/0
|
REDIS_URL: redis://redis:6379/0
|
||||||
LIBRARY_ROOT: /library
|
LIBRARY_ROOT: /library
|
||||||
TRANSCRIPT_ROOT: /transcripts
|
TRANSCRIPT_ROOT: /transcripts
|
||||||
|
TRANSCRIBE_QUEUE: transcribe
|
||||||
SCAN_INTERVAL: 30
|
SCAN_INTERVAL: 30
|
||||||
JOB_TIMEOUT: ${JOB_TIMEOUT:-14400}
|
JOB_TIMEOUT: ${JOB_TIMEOUT:-14400}
|
||||||
JOB_TTL: ${JOB_TTL:-86400}
|
JOB_TTL: ${JOB_TTL:-86400}
|
||||||
|
Reference in New Issue
Block a user