Files
podx/app/worker.py
2025-09-11 16:30:35 +02:00

1779 lines
72 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os, subprocess, shutil, json, re, orjson, requests, unicodedata
from rq import Queue
from redis import Redis
from pathlib import Path
import math
import difflib
import time
from faster_whisper import WhisperModel
from xml.sax.saxutils import escape as xml_escape
MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700")
MEILI_KEY = os.getenv("MEILI_KEY", "")
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
TMP = Path(os.getenv("TMP_ROOT", "/tmpdl"))
# --- Runtime pause switch for CPU-heavy work (no rebuild needed) ---
PAUSE_TRANSCRIBE_FILE = Path(os.getenv("PAUSE_TRANSCRIBE_FILE", str(TRN / ".pause_transcribe")))
# Redis-backed pause flag (podx-tools compatible)
PAUSE_TRANSCRIBE_REDIS_KEY = os.getenv("PAUSE_TRANSCRIBE_REDIS_KEY", "podx:transcribe:paused").strip()
def _pause_flag_redis() -> bool:
"""Return True if a truthy pause flag is set in Redis under PAUSE_TRANSCRIBE_REDIS_KEY."""
try:
from redis import Redis as _R
val = _R.from_url(REDIS_URL).get(PAUSE_TRANSCRIBE_REDIS_KEY)
if not val:
return False
v = val.decode("utf-8", "ignore").strip().lower()
return v not in ("", "0", "false", "no", "(nil)")
except Exception:
return False
def transcribe_paused() -> bool:
"""Return True if new transcription work should be paused (file flag or Redis flag)."""
try:
if PAUSE_TRANSCRIBE_FILE.exists():
return True
except Exception:
pass
# Fall back to Redis-based switch used by podx-tools
return _pause_flag_redis()
def wait_if_paused(label: str = "transcribe", poll_sec: int = 10):
"""
If the pause file exists, block this worker in a low-CPU sleep loop until it is removed.
This lets you 'pause' heavy work without killing workers or rebuilding.
"""
try:
if transcribe_paused():
print(f"[pause] {label}: pause flag present at {PAUSE_TRANSCRIBE_FILE}; waiting…", flush=True)
while transcribe_paused():
time.sleep(max(1, int(poll_sec)))
except Exception:
# If anything goes wrong reading the flag, don't block the pipeline.
pass
# --- Exception to abort transcription when pause is requested ---
class PauseInterrupt(Exception):
"""Raised to cooperatively abort a running transcription when pause is requested."""
pass
MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3")
COMPUTE = os.getenv("WHISPER_PRECISION","int8")
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip()
# Whisper device/config controls
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "auto").strip()
WHISPER_DEVICE_INDEX = int(os.getenv("WHISPER_DEVICE_INDEX", "0"))
WHISPER_CPU_THREADS = int(os.getenv("WHISPER_CPU_THREADS", "4"))
# --- Host load guards / thread limits ---
# Limit ffmpeg threads (helps keep CPU in check when multiple workers run)
FFMPEG_THREADS = int(os.getenv("FFMPEG_THREADS", "1"))
# Tame BLAS/threadpools that libraries may spin up implicitly
import os as _os_threads
_os_threads.environ.setdefault("OMP_NUM_THREADS", str(WHISPER_CPU_THREADS))
_os_threads.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
_os_threads.environ.setdefault("MKL_NUM_THREADS", "1")
_os_threads.environ.setdefault("NUMEXPR_NUM_THREADS", "1")
# Whisper logging & resume controls
WHISPER_LOG_SEGMENTS = os.getenv("WHISPER_LOG_SEGMENTS", "1") not in ("0", "false", "False")
WHISPER_RESUME = os.getenv("WHISPER_RESUME", "1") not in ("0", "false", "False")
PARTIAL_SAVE_EVERY_SEGS = int(os.getenv("WHISPER_PARTIAL_SAVE_EVERY_SEGS", "20"))
# RSS resolver config
RSS_INDEX_PATH = Path(os.getenv("RSS_INDEX_PATH", "/transcripts/rss_index.json"))
RSS_DURATION_TOLERANCE = int(os.getenv("RSS_DURATION_TOLERANCE", "150")) # seconds
DEFAULT_TRANSCRIPT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/")
OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "")
OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library")
# Redis-backed job queue settings and offload toggle
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0").strip()
OFFLOAD_TRANSCRIBE = os.getenv("OFFLOAD_TRANSCRIBE", "1").lower() not in ("0", "false", "no")
# Worker role selection
WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe'
JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()]
def _mode_allows(task: str) -> bool:
"""Gate tasks by worker role. In 'transcribe' mode only allow transcription of local files
(including indexing and OWUI publish). "task" is one of: 'download','web','local','transcribe'."""
if WORKER_MODE == "transcribe":
return task in {"local", "transcribe"}
return True
TRN.mkdir(parents=True, exist_ok=True)
LIB.mkdir(parents=True, exist_ok=True)
TMP.mkdir(parents=True, exist_ok=True)
# Lazy Whisper model loader so the worker can start even if model download/setup is slow
_model = None
def get_model():
global _model
if _model is None:
print(f"[whisper] loading model='{MODEL_NAME}' device='{WHISPER_DEVICE}' idx={WHISPER_DEVICE_INDEX} compute='{COMPUTE}' threads={WHISPER_CPU_THREADS}", flush=True)
_model = WhisperModel(
MODEL_NAME,
device=WHISPER_DEVICE,
device_index=WHISPER_DEVICE_INDEX,
compute_type=COMPUTE,
cpu_threads=WHISPER_CPU_THREADS,
)
return _model
# --- Helper: Reset model with new device and device_index ---
def reset_model(device: str, device_index: int | None = None):
"""Reset the global _model to a new WhisperModel with the given device and device_index."""
global _model
idx = device_index if device_index is not None else WHISPER_DEVICE_INDEX
print(f"[whisper] resetting model='{MODEL_NAME}' device='{device}' idx={idx} compute='{COMPUTE}' threads={WHISPER_CPU_THREADS}", flush=True)
_model = WhisperModel(
MODEL_NAME,
device=device,
device_index=idx,
compute_type=COMPUTE,
cpu_threads=WHISPER_CPU_THREADS,
)
# --- Helper: Run transcribe with fallback to CPU on GPU/oom errors ---
def run_transcribe_with_fallback(wav_path: Path, lang):
"""
Try to transcribe with current model; on GPU/CUDA/HIP/ROCm/OOM errors, reset to CPU and retry once.
Returns (segments, info) or raises exception.
"""
model = get_model()
try:
return model.transcribe(str(wav_path), vad_filter=True, language=lang)
except Exception as e:
msg = str(e)
gpu_errs = [
"CUDA", "cublas", "out of memory", "HIP", "ROCm", "device-side assert", "CUDNN", "cudaError", "cuda runtime", "cudaMalloc"
]
if any(err.lower() in msg.lower() for err in gpu_errs):
print(f"[whisper] GPU error detected: '{msg}'. Retrying on CPU...", flush=True)
reset_model("cpu", 0)
try:
model = get_model()
return model.transcribe(str(wav_path), vad_filter=True, language=lang)
except Exception as e2:
print(f"[whisper] CPU fallback also failed: {e2}", flush=True)
raise
raise
def log(feed):
try:
with open(TRN / "_feed.log", "a", encoding="utf-8") as f:
f.write(orjson.dumps(feed).decode()+"\n")
except Exception:
pass
def sanitize(name):
return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip()
# ---------- RSS transcript resolver ----------
def _normalize_title(t: str) -> str:
t = (t or "").lower()
t = re.sub(r"\s+", " ", t)
# remove punctuation-ish
t = re.sub(r"[^a-z0-9 _-]+", "", t)
return t.strip()
def _stem_without_date(stem: str) -> str:
# drop leading YYYYMMDD - from filenames created by yt-dlp template
m = re.match(r"^\d{8}\s*-\s*(.*)$", stem)
return m.group(1) if m else stem
def _extract_date_from_stem(stem: str) -> str | None:
m = re.search(r"\b(\d{8})\b", stem)
return m.group(1) if m else None
def _best_title_match(title: str, candidates: list[str]) -> tuple[str, float]:
"""Return (best_title, score 0..1) using difflib SequenceMatcher."""
if not candidates:
return "", 0.0
norm_title = _normalize_title(title)
best = ("", 0.0)
for c in candidates:
score = difflib.SequenceMatcher(None, norm_title, _normalize_title(c)).ratio()
if score > best[1]:
best = (c, score)
return best
def _load_rss_index() -> list[dict]:
try:
if RSS_INDEX_PATH.exists():
data = json.loads(RSS_INDEX_PATH.read_text(encoding="utf-8"))
# supports {"episodes":[...]} or a flat list
if isinstance(data, dict) and "episodes" in data:
return data["episodes"] or []
if isinstance(data, list):
return data
except Exception as e:
print(f"[resolver] failed to load RSS index: {e}", flush=True)
return []
def match_media_to_rss(media_path: Path) -> dict | None:
"""Try to match a local media file to an RSS episode entry."""
episodes = _load_rss_index()
if not episodes:
return None
stem = media_path.stem
title_no_date = _stem_without_date(stem)
file_date = _extract_date_from_stem(stem)
# duration tolerance
media_dur = media_duration_seconds(media_path)
# Candidates: filter by date if present, else all
if file_date:
pool = [e for e in episodes if (str(e.get("date", "")) == file_date or str(e.get("pubdate", "")) == file_date)]
if not pool:
pool = episodes
else:
pool = episodes
# Pick best by (title similarity, duration proximity)
best_ep, best_score = None, -1.0
for ep in pool:
ep_title = ep.get("title") or ep.get("itunes_title") or ""
sim = _best_title_match(title_no_date, [ep_title])[1]
dur = float(ep.get("duration_sec") or ep.get("duration") or 0.0)
dur_ok = True
if media_dur and dur:
dur_ok = abs(media_dur - dur) <= RSS_DURATION_TOLERANCE
score = sim + (0.1 if dur_ok else 0.0)
if score > best_score:
best_score, best_ep = score, ep
if best_ep and best_score >= 0.5:
print(f"[resolver] matched '{stem}' -> '{best_ep.get('title','')}' score={best_score:.2f}", flush=True)
return best_ep
return None
def _choose_transcript_url(ep: dict) -> tuple[str, str] | tuple[None, None]:
"""Return (url, kind) preferring txt, vtt, then srt. 'kind' in {'txt','vtt','srt'}."""
# unified structure from rss_ingest.py: ep["transcripts"] = [{"url":..., "type": ...}, ...]
items = ep.get("transcripts") or []
# some ingesters store separate keys
if not items:
for key, kind in [("transcript_txt","txt"), ("transcript_vtt","vtt"), ("transcript_srt","srt")]:
if ep.get(key):
items.append({"url": ep[key], "type": kind})
# preference order
for kind in ["txt", "vtt", "srt"]:
for it in items:
t = (it.get("type") or "").lower()
u = it.get("url") or ""
if u and (kind in t or (kind == "txt" and t in ["text","plain","text/plain"]) or (kind in u.lower())):
return u, kind
return (None, None)
def fetch_rss_transcript(ep: dict, dest_dir: Path) -> Path | None:
"""Download transcript to dest_dir and return local Path; convert VTT->SRT if needed."""
url, kind = _choose_transcript_url(ep)
if not url:
return None
dest_dir.mkdir(parents=True, exist_ok=True)
# filename from episode title
safe = sanitize(ep.get("title") or ep.get("guid") or "episode")
path = dest_dir / f"{safe}.{kind if kind!='txt' else 'txt'}"
try:
r = requests.get(url, timeout=30)
r.raise_for_status()
mode = "wb" if kind in ("vtt","srt") else "w"
if mode == "wb":
path.write_bytes(r.content)
else:
path.write_text(r.text, encoding="utf-8")
print(f"[resolver] downloaded transcript ({kind}) from {url}", flush=True)
return path
except Exception as e:
print(f"[resolver] failed to fetch transcript: {e}", flush=True)
return None
def use_rss_transcript(media_path: Path, ep: dict) -> Path | None:
"""Create standard transcript artifacts from an RSS transcript (txt/vtt/srt)."""
# Prefer direct download; else if rss_ingest already saved a local file path, try that.
sidecar = None
local_hint = ep.get("transcript_local")
if local_hint:
p = Path(local_hint)
if p.exists():
sidecar = p
if sidecar is None:
sidecar = fetch_rss_transcript(ep, TMP)
if not sidecar or not sidecar.exists():
return None
# Convert to plain text
plain = transcript_text_from_file(sidecar)
lang = (ep.get("language") or ep.get("lang") or DEFAULT_TRANSCRIPT_LANG).split("-")[0]
base = write_plain_transcript(media_path, plain, language=lang)
# Place an SRT next to video for Plex
ensure_sidecar_next_to_media(sidecar, media_path, lang=lang)
# Write provenance sidecar
(base.with_suffix(".prov.json")).write_bytes(orjson.dumps({
"source": "rss",
"feed": ep.get("feed_url"),
"guid": ep.get("guid"),
"episode_title": ep.get("title"),
"transcript_kind": sidecar.suffix.lower().lstrip("."),
"transcript_url": _choose_transcript_url(ep)[0] or "",
}))
# Write Kodi/Plex-compatible NFO
try:
# Gather metadata for NFO from RSS entry
meta = {
"title": ep.get("title"),
"episode_title": ep.get("title"),
"show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show"),
"description": ep.get("description") or ep.get("content"),
"pubdate": ep.get("pubdate"),
"pubdate_iso": ep.get("date_iso"),
"duration_sec": ep.get("duration_sec") or ep.get("duration"),
"image": ep.get("image") or ep.get("image_url"),
"guid": ep.get("guid"),
}
txt_path = base.with_suffix(".txt")
transcript_text = txt_path.read_text(encoding="utf-8") if txt_path.exists() else None
write_episode_nfo(media_path, meta, transcript_text)
# Save local artwork for Plex/Kodi
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
return base
def find_sidecar_transcript(media_path: Path) -> Path | None:
"""Return a .txt/.srt/.vtt transcript file sitting next to media, if any.
Tries common variants including language-suffixed SRT/VTT.
"""
candidates: list[Path] = []
# exact same stem in same folder
for ext in [".txt", ".srt", ".vtt"]:
p = media_path.parent / (media_path.stem + ext)
if p.exists():
candidates.append(p)
# language-suffixed near the media file (e.g., .en.srt)
for ext in [".srt", ".vtt"]:
p = media_path.with_suffix(f".en{ext}")
if p.exists() and p not in candidates:
candidates.append(p)
return candidates[0] if candidates else None
# ---------- Transcript repository reuse helpers ----------
def find_repo_transcript_for_media(media_path: Path) -> Path | None:
"""Search the transcript repository (/transcripts) for an existing transcript
that likely belongs to this media file (match by YYYYMMDD in filename and/or
fuzzy title similarity). Returns a path to a matching .json if found."""
try:
stem = media_path.stem
title_no_date = _stem_without_date(stem)
file_date = _extract_date_from_stem(stem)
best_json, best_score = None, 0.0
for j in TRN.glob("*.json"):
try:
data = json.loads(j.read_text(encoding="utf-8"))
except Exception:
continue
other_file = Path(data.get("file", ""))
other_stem = other_file.stem if other_file else j.stem
other_date = _extract_date_from_stem(other_stem)
# If both have dates and they differ a lot, skip
if file_date and other_date and file_date != other_date:
continue
# Compare titles (without dates)
sim = difflib.SequenceMatcher(
None,
_normalize_title(title_no_date),
_normalize_title(_stem_without_date(other_stem)),
).ratio()
# Nudge score when dates match
if file_date and other_date and file_date == other_date:
sim += 0.1
if sim > best_score:
best_score, best_json = sim, j
# Require a reasonable similarity
return best_json if best_json and best_score >= 0.60 else None
except Exception:
return None
def reuse_repo_transcript(media_path: Path, repo_json: Path) -> Path | None:
"""Copy/retarget an existing transcript JSON/TXT (and make SRT/VTT if possible)
from the repository so that it belongs to the provided media_path. Returns
the new base path in /transcripts or None."""
try:
# load the source transcript
data = json.loads(repo_json.read_text(encoding="utf-8"))
src_base = TRN / Path(repo_json).stem
src_txt = src_base.with_suffix(".txt")
src_srt = src_base.with_suffix(".srt")
src_vtt = src_base.with_suffix(".vtt")
# write the retargeted artifacts
new_title = media_path.stem
new_base = TRN / new_title
new_base.parent.mkdir(parents=True, exist_ok=True)
# update file path
data["file"] = str(media_path)
(new_base.with_suffix(".json")).write_bytes(orjson.dumps(data))
# copy or synthesize TXT
if src_txt.exists():
shutil.copy2(src_txt, new_base.with_suffix(".txt"))
else:
# fallback: concatenate segments
txt = " ".join(s.get("text", "") for s in data.get("segments", []))
(new_base.with_suffix(".txt")).write_text(txt, encoding="utf-8")
# copy SRT/VTT if present; otherwise synthesize SRT from segments
if src_srt.exists():
shutil.copy2(src_srt, new_base.with_suffix(".srt"))
else:
# synthesize SRT
def fmt_ts(t):
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
with open(new_base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
for i, s in enumerate(data.get("segments", []), 1):
srt.write(f"{i}\n{fmt_ts(s.get('start',0.0))} --> {fmt_ts(s.get('end',0.0))}\n{s.get('text','').strip()}\n\n")
if src_vtt.exists():
shutil.copy2(src_vtt, new_base.with_suffix(".vtt"))
else:
# synthesize VTT from segments
def fmt_ts_vtt(t):
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
return f"{h:02}:{m:02}:{s:06.3f}"
with open(new_base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
vtt.write("WEBVTT\n\n")
for s in data.get("segments", []):
vtt.write(f"{fmt_ts_vtt(s.get('start',0.0))} --> {fmt_ts_vtt(s.get('end',0.0))} \n{s.get('text','').strip()}\n\n")
# ensure sidecar next to media
try:
lang = (data.get("language") or DEFAULT_TRANSCRIPT_LANG).split("-")[0]
ensure_sidecar_next_to_media(new_base.with_suffix(".srt"), media_path, lang=lang)
except Exception:
pass
# Write Kodi/Plex-compatible NFO
try:
meta = {
"title": data.get("title") or media_path.stem,
"episode_title": data.get("title") or media_path.stem,
"show": data.get("show") or media_path.parent.name,
"description": data.get("description") or "",
"pubdate": data.get("pubdate") or data.get("date"),
"duration_sec": media_duration_seconds(media_path),
"image": data.get("image"),
"guid": data.get("guid") or data.get("id"),
}
txtp = new_base.with_suffix(".txt")
ttxt = txtp.read_text(encoding="utf-8") if txtp.exists() else None
write_episode_nfo(media_path, meta, ttxt)
# Save local artwork for Plex/Kodi
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
return new_base
except Exception as e:
print(f"[resolver] failed to reuse repo transcript: {e}", flush=True)
return None
def transcript_text_from_file(path: Path) -> str:
"""Extract plain text from .txt/.srt/.vtt by stripping timestamps and counters."""
try:
raw = path.read_text(encoding="utf-8", errors="ignore")
except Exception:
raw = path.read_text(errors="ignore")
if path.suffix.lower() == ".txt":
return raw.strip()
# For SRT/VTT, drop timestamp lines, cue numbers and headers
lines: list[str] = []
for line in raw.splitlines():
ls = line.strip()
if not ls:
continue
if "-->" in ls: # timestamp line
continue
if ls.upper().startswith("WEBVTT"):
continue
if re.match(r"^\d+$", ls): # cue index
continue
lines.append(ls)
return " ".join(lines)
def ensure_sidecar_next_to_media(sidecar: Path, media_path: Path, lang: str = "en") -> None:
"""Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed. If the sidecar is .txt, do nothing."""
try:
if sidecar.suffix.lower() == ".txt":
return
if sidecar.suffix.lower() == ".srt":
dst = media_path.with_suffix(f".{lang}.srt")
shutil.copy2(sidecar, dst)
elif sidecar.suffix.lower() == ".vtt":
tmp_srt = sidecar.with_suffix(".srt")
subprocess.run(["ffmpeg", "-nostdin", "-y", "-threads", str(FFMPEG_THREADS), "-i", str(sidecar), str(tmp_srt)], check=True)
dst = media_path.with_suffix(f".{lang}.srt")
shutil.move(str(tmp_srt), dst)
except Exception as e:
print(f"[post] sidecar copy/convert failed: {e}", flush=True)
# --- small helpers for progress/ETA formatting ---
def _fmt_eta(sec: float) -> str:
try:
sec = max(0, int(sec))
h, rem = divmod(sec, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h {m}m {s}s"
if m:
return f"{m}m {s}s"
return f"{s}s"
except Exception:
return ""
def save_episode_artwork(image_url: str | None, media_path: Path, show_title: str | None = None):
"""Download episode artwork from image_url and save next to the media as '<basename>.jpg'.
Also drop a folder-level 'poster.jpg' for the show directory if not present.
Best-effort; failures are logged but non-fatal.
"""
if not image_url:
return
try:
resp = requests.get(image_url, timeout=30, stream=True)
resp.raise_for_status()
# Determine content-type and write a temporary file
ctype = (resp.headers.get("Content-Type") or "").lower()
tmp_file = media_path.with_suffix(".art.tmp")
with open(tmp_file, "wb") as out:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
out.write(chunk)
# Always provide a .jpg next to the media for Plex
episode_jpg = media_path.with_suffix(".jpg")
if "image/jpeg" in ctype:
# Already JPEG
shutil.move(str(tmp_file), str(episode_jpg))
else:
# Try converting to JPEG with ffmpeg; if it fails, keep bytes as-is
try:
subprocess.run(
["ffmpeg", "-nostdin", "-y", "-i", str(tmp_file), str(episode_jpg)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
try:
tmp_file.unlink()
except Exception:
pass
except Exception:
shutil.move(str(tmp_file), str(episode_jpg))
# Also drop a folder poster once per show (helps Plex folder views)
try:
show_poster = media_path.parent / "poster.jpg"
if not show_poster.exists():
shutil.copy2(episode_jpg, show_poster)
except Exception:
pass
except Exception as e:
print(f"[post] artwork download failed: {e}", flush=True)
def find_companion_files(src: Path) -> dict:
"""Return likely yt-dlp companion files for a downloaded media file."""
out = {}
# info.json can be either "<name>.<ext>.info.json" or "<name>.info.json"
cands_info = [
src.parent / f"{src.name}.info.json",
src.parent / f"{src.stem}.info.json",
]
out["info"] = next((p for p in cands_info if p.exists()), None)
# thumbnails may be "<name>.<ext>.jpg" or "<name>.jpg" (we convert to jpg)
cand_thumbs = [
src.parent / f"{src.name}.jpg",
src.parent / f"{src.stem}.jpg",
src.parent / f"{src.stem}.jpeg",
src.parent / f"{src.stem}.png",
src.parent / f"{src.stem}.webp",
]
out["thumb"] = next((p for p in cand_thumbs if p.exists()), None)
# subtitles (keep multiple)
subs = []
for s in src.parent.glob(f"{src.stem}*.srt"):
subs.append(s)
for s in src.parent.glob(f"{src.stem}*.vtt"):
subs.append(s)
out["subs"] = subs
return out
def load_info_json(path: Path) -> dict | None:
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def _iso_from_yyyymmdd(s: str | None) -> str | None:
if not s or not re.match(r"^\d{8}$", s):
return None
return f"{s[0:4]}-{s[4:6]}-{s[6:8]}"
def build_meta_from_sources(media_path: Path, uploader: str, fallback_meta: dict, ep: dict | None = None) -> dict:
"""
Merge metadata from (priority): RSS episode `ep` -> yt-dlp info.json (if present) -> fallback.
Returns a dict compatible with write_episode_nfo().
"""
# Start with fallback
meta = dict(fallback_meta)
# Augment from info.json if present
info = None
for cand in [
media_path.parent / f"{media_path.name}.info.json",
media_path.parent / f"{media_path.stem}.info.json",
]:
if cand.exists():
info = load_info_json(cand)
break
if info:
meta.setdefault("title", info.get("title"))
meta.setdefault("episode_title", info.get("title"))
meta.setdefault("description", info.get("description") or info.get("fulltitle"))
# upload_date is YYYYMMDD
iso = _iso_from_yyyymmdd(info.get("upload_date"))
if iso:
meta["pubdate_iso"] = iso
# Prefer video duration if present
if not meta.get("duration_sec") and info.get("duration"):
meta["duration_sec"] = info.get("duration")
# thumbnail URL
if not meta.get("image"):
meta["image"] = info.get("thumbnail")
# show/uploader
if not meta.get("show"):
meta["show"] = info.get("uploader") or uploader
# Finally, layer RSS data on top if available (most authoritative for podcasts)
if ep:
meta.update({
"title": ep.get("title") or meta.get("title"),
"episode_title": ep.get("title") or meta.get("episode_title"),
"show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show") or meta.get("show") or uploader,
"description": ep.get("description") or ep.get("content") or meta.get("description", ""),
"pubdate": ep.get("pubdate") or meta.get("pubdate", ""),
"pubdate_iso": ep.get("date_iso") or meta.get("pubdate_iso", meta.get("pubdate")),
"duration_sec": ep.get("duration_sec") or ep.get("duration") or meta.get("duration_sec"),
"image": ep.get("image") or ep.get("image_url") or meta.get("image", ""),
"guid": ep.get("guid") or meta.get("guid", ""),
})
return meta
# ---------- Kodi/Plex NFO writer ----------
from datetime import datetime
def _first_nonempty(*vals):
for v in vals:
if v is None:
continue
if isinstance(v, str) and v.strip():
return v.strip()
if v:
return v
return None
def _coerce_aired(pubdate: str | None) -> str:
"""Convert RSS-style pubdate to YYYY-MM-DD if possible."""
if not pubdate:
return ""
# already ISO-like
m = re.match(r"^(\d{4})[-/](\d{2})[-/](\d{2})", pubdate)
if m:
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
# RFC 2822 example: Tue, 21 Feb 2023 06:00:00 +0000
try:
dt = datetime.strptime(pubdate[:31], "%a, %d %b %Y %H:%M:%S %z")
return dt.strftime("%Y-%m-%d")
except Exception:
# try without tz
try:
dt = datetime.strptime(pubdate[:25], "%a, %d %b %Y %H:%M:%S")
return dt.strftime("%Y-%m-%d")
except Exception:
return ""
def write_episode_nfo(media_path: Path, meta: dict, transcript_text: str | None = None) -> Path:
"""Write a minimal Kodi/Plex-compatible NFO next to the media file.
`meta` may include: title, show, plot, pubdate, duration_sec, thumb, guid.
"""
try:
title = _first_nonempty(meta.get("episode_title"), meta.get("title"), media_path.stem) or media_path.stem
show = _first_nonempty(meta.get("show"), meta.get("podcast_title"), meta.get("feed_title"), media_path.parent.name) or media_path.parent.name
plot = _first_nonempty(meta.get("description"), meta.get("content"), meta.get("summary"), "") or ""
# Optionally append transcript preview to plot
if transcript_text:
preview = transcript_text.strip()
if preview:
preview = (preview[:1800] + "") if len(preview) > 1800 else preview
plot = (plot + "\n\n" if plot else "") + preview
aired = _coerce_aired(_first_nonempty(meta.get("pubdate_iso"), meta.get("pubdate")))
guid = _first_nonempty(meta.get("guid"), meta.get("id"), "") or ""
thumb = _first_nonempty(meta.get("image"), meta.get("image_url"), meta.get("thumbnail"), "") or ""
dur_s = meta.get("duration_sec") or meta.get("duration") or 0
try:
dur_min = int(round(float(dur_s) / 60.0)) if dur_s else 0
except Exception:
dur_min = 0
# Build XML
xml = ["<episodedetails>"]
xml.append(f" <title>{xml_escape(title)}</title>")
xml.append(f" <showtitle>{xml_escape(show)}</showtitle>")
if plot:
xml.append(f" <plot>{xml_escape(plot)}</plot>")
if aired:
xml.append(f" <aired>{xml_escape(aired)}</aired>")
if guid:
xml.append(f" <uniqueid type=\"guid\" default=\"true\">{xml_escape(guid)}</uniqueid>")
if dur_min:
xml.append(f" <runtime>{dur_min}</runtime>")
if thumb:
xml.append(f" <thumb>{xml_escape(thumb)}</thumb>")
xml.append("</episodedetails>\n")
nfo_path = media_path.with_suffix(".nfo")
nfo_path.write_text("\n".join(xml), encoding="utf-8")
return nfo_path
except Exception:
return media_path.with_suffix(".nfo")
def write_plain_transcript(media_path: Path, text: str, language: str = "en") -> Path:
"""Write minimal transcript artifacts (.txt + .json) from plain text (no timestamps)."""
title = media_path.stem
base = TRN / title
base.parent.mkdir(parents=True, exist_ok=True)
(base.with_suffix(".txt")).write_text(text, encoding="utf-8")
(base.with_suffix(".json")).write_bytes(orjson.dumps({
"file": str(media_path),
"language": language,
"segments": [{"start": 0.0, "end": 0.0, "text": text}]
}))
return base
def yt_dlp(url, outdir):
# 1) Normalize YouTube Music URLs to standard YouTube
yurl = url
if 'music.youtube.com' in yurl:
yurl = yurl.replace('music.youtube.com', 'www.youtube.com')
outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s")
base_cmd = [
"yt-dlp", "-o", outtmpl,
"-f", "bv*+ba/best",
"--write-info-json",
"--write-thumbnail",
"--convert-thumbnails", "jpg",
"--write-subs", "--write-auto-subs",
"--sub-langs", os.getenv("YTDLP_SUBS_LANGS", "en.*,en"),
"--convert-subs", "srt",
"--no-playlist", "--no-warnings", "--restrict-filenames",
]
# 3) Optional cookies (set YTDLP_COOKIES=/path/to/cookies.txt in .env and mount it)
cookies_path = os.getenv("YTDLP_COOKIES", "").strip()
if cookies_path:
base_cmd += ["--cookies", cookies_path]
# Primary attempt
try:
subprocess.check_call(base_cmd + [yurl])
except subprocess.CalledProcessError:
# 2) Retry with Android client + mobile UA
retry_cmd = base_cmd + [
"--extractor-args", "youtube:player_client=android",
"--user-agent", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Mobile Safari/537.36",
yurl,
]
subprocess.check_call(retry_cmd)
media = (
list(outdir.rglob("*.[mM][pP]4")) +
list(outdir.rglob("*.mkv")) +
list(outdir.rglob("*.webm")) +
list(outdir.rglob("*.m4a")) +
list(outdir.rglob("*.mp3"))
)
return sorted(media, key=lambda p: p.stat().st_mtime)[-1:]
def extract_audio(src: Path, outdir: Path) -> Path:
"""Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs)."""
outdir.mkdir(parents=True, exist_ok=True)
wav_path = outdir / (src.stem + ".wav")
# Force audio-only, mono, 16kHz WAV
cmd = [
"ffmpeg", "-nostdin", "-y",
"-threads", str(FFMPEG_THREADS),
"-i", str(src),
"-vn", "-ac", "1", "-ar", "16000",
"-f", "wav", str(wav_path),
]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}")
return wav_path
# --- WAV trimming helper ---
def trim_wav(src_wav: Path, start_sec: float, outdir: Path) -> Path:
"""Return a trimmed 16k mono WAV starting at start_sec from src_wav."""
outdir.mkdir(parents=True, exist_ok=True)
if not start_sec or start_sec <= 0.0:
return src_wav
dst = outdir / (src_wav.stem + f".from_{int(start_sec)}s.wav")
try:
subprocess.check_output([
"ffmpeg", "-nostdin", "-y",
"-ss", str(max(0.0, float(start_sec))),
"-i", str(src_wav),
"-vn", "-ac", "1", "-ar", "16000",
"-f", "wav", str(dst),
], stderr=subprocess.STDOUT)
return dst
except subprocess.CalledProcessError as e:
# If trimming fails, fall back to full file
print(f"[whisper] trim failed, using full WAV: {e.output.decode(errors='ignore')}", flush=True)
return src_wav
def media_duration_seconds(path: Path) -> float:
"""Return duration in seconds using ffprobe; fallback to 0.0 on error."""
try:
out = subprocess.check_output([
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=nokey=1:noprint_wrappers=1", str(path)
], stderr=subprocess.STDOUT, text=True).strip()
return float(out) if out else 0.0
except Exception:
return 0.0
# --- Partial transcript helpers ---
def _partial_paths(title: str) -> tuple[Path, Path]:
base = TRN / title
return base.with_suffix(".partial.json"), base.with_suffix(".partial.txt")
def _save_partial(title: str, language: str, segs: list[dict]):
pjson, ptxt = _partial_paths(title)
try:
# Save JSON
pjson.write_bytes(orjson.dumps({"file": str((TRN / title).with_suffix('.wav')), "language": language, "segments": segs}))
except Exception as e:
print(f"[whisper] partial json save failed: {e}", flush=True)
try:
# Save TXT snapshot
ptxt.write_text(" ".join(s.get("text","") for s in segs), encoding="utf-8")
except Exception as e:
print(f"[whisper] partial txt save failed: {e}", flush=True)
def transcribe(media_path: Path):
print(f"[whisper] start transcribe: {media_path}", flush=True)
# If paused, abort before any heavy work (no ffmpeg, no model load)
if transcribe_paused():
print(f"[pause] transcribe: pause active before heavy work; aborting {media_path}", flush=True)
raise PauseInterrupt("pause requested before start")
# 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases)
wav = extract_audio(media_path, TMP)
# Check again after extraction to avoid loading the model if a pause was requested meanwhile
if transcribe_paused():
try:
if wav.exists():
wav.unlink()
except Exception:
pass
print(f"[pause] transcribe: pause activated; stopping before model load for {media_path}", flush=True)
raise PauseInterrupt("pause requested after extract")
title = media_path.stem
base = TRN / title
# Resume support: if a partial checkpoint exists, load it and trim input
resume_segments = []
resume_offset = 0.0
language_hint = None
if WHISPER_RESUME:
pjson, ptxt = _partial_paths(title)
if pjson.exists():
try:
pdata = json.loads(pjson.read_text(encoding="utf-8"))
resume_segments = pdata.get("segments", []) or []
if resume_segments:
resume_offset = float(resume_segments[-1].get("end", 0.0))
language_hint = pdata.get("language")
print(f"[whisper] resuming from ~{resume_offset:.2f}s with {len(resume_segments)} segments", flush=True)
except Exception as e:
print(f"[whisper] failed to load partial: {e}", flush=True)
# If resuming, trim WAV from last end time
wav_for_run = trim_wav(wav, resume_offset, TMP)
# 2) Language selection
lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE
if language_hint and WHISPER_LANGUAGE.lower() == "auto":
# carry hint forward if available
lang = language_hint
# 3) Transcribe
segments, info = run_transcribe_with_fallback(wav_for_run, lang)
# Determine duration for progress; use full WAV duration for consistent % regardless of resume
dur = media_duration_seconds(wav) or 0.0
# Start wall clock timer for speed/ETA
start_wall = time.time()
if WHISPER_RESUME and resume_offset and dur and resume_offset >= dur:
print(f"[whisper] resume offset {resume_offset:.2f}s >= duration {dur:.2f}s; resetting resume.", flush=True)
resume_offset = 0.0
last_pct = -1
segs = list(resume_segments) # start with what we already have
text_parts = [s.get("text","") for s in resume_segments]
# Walk new segments; shift their timestamps by resume_offset if trimmed
seg_count_since_save = 0
seg_index = len(resume_segments)
for s in segments:
seg_index += 1
start = (s.start or 0.0) + resume_offset
end = (s.end or 0.0) + resume_offset
seg = {"start": start, "end": end, "text": s.text}
segs.append(seg)
text_parts.append(s.text)
# --- Cooperative pause: save checkpoint and abort as soon as pause is requested ---
if transcribe_paused():
try:
pct = int(min(100, max(0, (end / dur) * 100))) if dur > 0 else 0
except Exception:
pct = 0
_save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs)
log({
"status": "paused",
"path": str(media_path),
"title": title,
"progress": pct
})
print(f"[pause] transcribe: pause requested mid-run; aborting at ~{end:.2f}s for {media_path}", flush=True)
raise PauseInterrupt("pause requested")
if WHISPER_LOG_SEGMENTS:
print(f"[whisper] {start:8.2f}{end:8.2f} {s.text.strip()}", flush=True)
# progress logging every +5%
if dur > 0 and end is not None:
pct = int(min(100, max(0, (end / dur) * 100)))
if pct >= last_pct + 5:
log({
"status": "transcribing",
"path": str(media_path),
"title": title,
"progress": pct
})
last_pct = pct
# compute realtime speed and ETA for console logs
try:
elapsed = max(0.001, time.time() - start_wall)
processed = max(0.0, float(end))
speed = (processed / elapsed) if elapsed > 0 else 0.0 # seconds processed per second
# represent as X real-time factor
rtf = speed # 1.0 == real-time
eta = ((dur - processed) / speed) if (speed > 0 and dur > 0) else 0
print(f"[whisper] progress {pct:3d}% seg={seg_index:5d} rtf={rtf:0.2f}x eta={_fmt_eta(eta)}", flush=True)
# also mirror to feed log with speed/eta
try:
log({
"status": "transcribing",
"path": str(media_path),
"title": title,
"progress": pct,
"speed_rtf": round(rtf, 2),
"eta_sec": int(max(0, eta))
})
except Exception:
pass
except Exception:
pass
# periodic partial save
seg_count_since_save += 1
if WHISPER_RESUME and seg_count_since_save >= PARTIAL_SAVE_EVERY_SEGS:
_save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs)
seg_count_since_save = 0
# ensure we mark 100% on completion
if last_pct < 100:
log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100})
txt = " ".join(text_parts).strip()
# Write final transcript artifacts
(base.with_suffix(".json")).write_bytes(orjson.dumps({
"file": str(media_path),
"language": info.language,
"segments": segs
}))
(base.with_suffix(".txt")).write_text(txt, encoding="utf-8")
def fmt_ts(t):
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
for i,s in enumerate(segs,1):
srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n")
with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
vtt.write("WEBVTT\n\n")
for s in segs:
vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n")
# 4) Copy SRT next to media for Plex (language-suffixed)
try:
lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower()
srt_src = base.with_suffix(".srt")
srt_dst = media_path.with_suffix(f".{lang_code}.srt")
shutil.copy2(srt_src, srt_dst)
except Exception as e:
print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True)
# Write Kodi/Plex-compatible NFO using enhanced metadata (same as before)
try:
fallback = {
"title": title,
"episode_title": title,
"show": media_path.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(media_path),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None)
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(media_path, meta, ttxt)
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
# Cleanup temp WAVs
try:
if wav_for_run != wav and wav_for_run.exists():
wav_for_run.unlink()
if wav.exists():
wav.unlink()
except Exception:
pass
# Remove partial checkpoints on success
if WHISPER_RESUME:
try:
pjson, ptxt = _partial_paths(title)
if pjson.exists(): pjson.unlink()
if ptxt.exists(): ptxt.unlink()
except Exception:
pass
# Final average speed over whole transcription
try:
total_elapsed = max(0.001, time.time() - start_wall)
avg_rtf = (dur / total_elapsed) if total_elapsed > 0 else 0.0
print(f"[whisper] avg speed ~{avg_rtf:0.2f}x (audio_seconds / wall_seconds)", flush=True)
except Exception:
pass
print(f"[whisper] finished: {media_path} lang={info.language} segments={len(segs)} dur={dur:.2f}s", flush=True)
return base
# --- Meilisearch helpers ---
def _safe_doc_id(s: str) -> str:
"""
Meilisearch document IDs must be [A-Za-z0-9_-]. Convert the title to a safe slug.
If the result is empty, fall back to a short SHA1 hash.
"""
import hashlib
slug = re.sub(r"\s+", "_", (s or "").strip())
slug = re.sub(r"[^A-Za-z0-9_-]", "", slug)
if not slug:
slug = hashlib.sha1((s or "").encode("utf-8", errors="ignore")).hexdigest()[:16]
return slug
def ensure_meili_index():
"""Create index 'library' with primaryKey 'id' if it does not already exist."""
try:
r = requests.get(f"{MEILI_URL}/indexes/library",
headers={"Authorization": f"Bearer {MEILI_KEY}"}, timeout=10)
if r.status_code == 200:
return
# Attempt to create it
cr = requests.post(
f"{MEILI_URL}/indexes",
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type": "application/json"},
data=orjson.dumps({"uid": "library", "primaryKey": "id"}),
timeout=10,
)
# Ignore errors if another process created it first
try:
cr.raise_for_status()
except Exception:
pass
except Exception:
# Non-fatal; indexing will fail later if the index truly doesn't exist
pass
def index_meili(json_path: Path):
# Make sure the index exists and is configured with a primary key
ensure_meili_index()
doc = json.loads(open(json_path, "r", encoding="utf-8").read())
file_field = doc.get("file", "")
title = Path(file_field).stem if file_field else json_path.stem
# Build a Meili-safe document ID
doc_id = _safe_doc_id(title)
# Extract a YYYYMMDD date if present
m = re.search(r"\b(\d{8})\b", title)
date = m.group(1) if m else ""
payload = {
"id": doc_id,
"type": "podcast",
"title": title,
"date": date,
"source": str(Path(LIB, Path(file_field or title).name)),
"text": " ".join(s.get("text", "") for s in doc.get("segments", [])),
"segments": doc.get("segments", []),
"meta": {"language": doc.get("language", "")},
}
for attempt in range(5):
try:
r = requests.post(
f"{MEILI_URL}/indexes/library/documents",
headers={
"Authorization": f"Bearer {MEILI_KEY}",
"Content-Type": "application/json",
},
data=orjson.dumps(payload),
timeout=15,
)
r.raise_for_status()
break
except Exception:
if attempt == 4:
raise
time.sleep(2 * (attempt + 1))
import tldextract, trafilatura, requests as _requests
def slugify(text):
text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_')
return text[:120] or 'page'
def _norm(s: str | None) -> str:
"""Normalize strings for stable comparisons across Unicode lookalikes and stray whitespace."""
if s is None:
return ""
try:
return unicodedata.normalize("NFKC", s).strip()
except Exception:
return (s or "").strip()
def save_web_snapshot(url: str):
r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"})
r.raise_for_status()
html = r.text
downloaded = trafilatura.load_html(html, url=url)
text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or ""
meta = trafilatura.metadata.extract_metadata(downloaded) or None
title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S).group(1).strip() if re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S) else url)
date = (meta.date if meta and getattr(meta, 'date', None) else "")
parts = tldextract.extract(url)
domain = ".".join([p for p in [parts.domain, parts.suffix] if p])
slug = slugify(title)
outdir = LIB / "web" / domain
outdir.mkdir(parents=True, exist_ok=True)
base = outdir / slug
open(base.with_suffix(".html"), "w", encoding="utf-8", errors="ignore").write(html)
open(base.with_suffix(".txt"), "w", encoding="utf-8", errors="ignore").write(text)
return base, title, domain, date, text
def index_web(base: Path, title: str, domain: str, date: str, text: str, url: str):
payload = {
"id": f"web:{domain}:{base.stem}",
"type": "web",
"title": title,
"date": re.sub(r'[^0-9]', '', date)[:8] if date else "",
"source": f"file://{str(base.with_suffix('.html'))}",
"text": text,
"segments": [],
"meta": {"url": url, "domain": domain}
}
r = requests.post(f"{MEILI_URL}/indexes/library/documents",
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"},
data=orjson.dumps(payload))
r.raise_for_status()
def is_media_url(url: str):
lowered = url.lower()
media_hosts = ["youtube.com","youtu.be","rumble.com","vimeo.com","soundcloud.com","spotify.com","podbean.com","buzzsprout.com"]
return any(h in lowered for h in media_hosts)
def owui_headers():
return {"Authorization": f"Bearer {OWUI_KEY}"} if OWUI_KEY else {}
def owui_get_or_create_kb():
"""Return a KB id for OWUI_KB without creating duplicates.
Honors OPENWEBUI_KB_ID, and tolerates both list and {"data": ...} response shapes.
"""
if not OWUI_URL or not OWUI_KEY:
return None
# 1) If an explicit id is provided, trust it
forced = os.getenv("OPENWEBUI_KB_ID", "").strip()
if forced:
return forced
# 2) List and try to find an exact name match
try:
r = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
r.raise_for_status()
body = r.json()
items = body if isinstance(body, list) else body.get("data", [])
# Prefer exact normalized name match; if multiple, pick the most recently updated
kb_target = _norm(OWUI_KB)
matches = [kb for kb in items if _norm(kb.get("name")) == kb_target]
if matches:
try:
matches.sort(key=lambda k: k.get("updated_at") or 0, reverse=True)
except Exception:
pass
return matches[0].get("id")
except Exception:
pass
# 3) Create only if not found
cr = requests.post(
f"{OWUI_URL}/api/v1/knowledge/create",
headers={**owui_headers(), "Content-Type": "application/json"},
data=orjson.dumps({"name": OWUI_KB, "description": "All local content indexed by podx"}),
timeout=15,
)
cr.raise_for_status()
created = cr.json()
if isinstance(created, dict) and created.get("id"):
return created["id"]
if isinstance(created, dict) and created.get("data") and created["data"].get("id"):
return created["data"]["id"]
# Fallback: try to resolve again by name
try:
rr = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
rr.raise_for_status()
body = rr.json()
items = body if isinstance(body, list) else body.get("data", [])
kb_target = _norm(OWUI_KB)
for kb in items:
if _norm(kb.get("name")) == kb_target:
return kb.get("id")
except Exception:
pass
return None
def owui_upload_and_attach(path: Path, kb_id: str):
with open(path, "rb") as f:
r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10)
r.raise_for_status()
up = r.json()
file_id = (up.get("id") or (up.get("data") or {}).get("id"))
if not file_id:
raise RuntimeError(f"OWUI upload: could not get file id from response: {up}")
r = requests.post(
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
headers={**owui_headers(), "Content-Type": "application/json"},
data=orjson.dumps({"file_id": file_id}),
timeout=180,
)
r.raise_for_status()
try:
time.sleep(0.5)
except Exception:
pass
return True
def publish_to_openwebui(paths):
if not OWUI_URL or not OWUI_KEY:
return
try:
kb_id = owui_get_or_create_kb()
if not kb_id:
print("[owui] KB resolve failed; skipping attach to avoid accidental duplicates", flush=True)
return
for p in paths:
p = Path(p)
if not p.exists():
continue
try:
owui_upload_and_attach(p, kb_id)
except Exception as e:
log({"url": str(p), "status": "owui_error", "error": str(e)})
except Exception as e:
log({"status": "owui_error", "error": str(e)})
# --------- Post-transcribe pipeline and job/queue helpers ---------
def _postprocess_after_transcribe(media_path: Path, base: Path):
"""Common steps after we have a `base` transcript path: index, publish, NFO, artwork."""
try:
index_meili(base.with_suffix(".json"))
except Exception as e:
print(f"[post] meili index failed: {e}", flush=True)
try:
publish_to_openwebui([base.with_suffix(".txt")])
except Exception as e:
print(f"[post] owui publish failed: {e}", flush=True)
# Build metadata using existing helper
try:
title = media_path.stem
fallback = {
"title": title,
"episode_title": title,
"show": media_path.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(media_path),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None)
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(media_path, meta, ttxt)
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
def transcribe_job(path_str: str):
"""RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'."""
# Do NOT block when paused; exit quickly so CPU-heavy work stops ASAP.
if transcribe_paused():
print(f"[pause] transcribe_job: pause is active; skipping start for {path_str}", flush=True)
return "paused"
p = Path(path_str)
try:
base = transcribe(p)
except PauseInterrupt:
print(f"[pause] transcribe_job: paused mid-run for {p}", flush=True)
return "paused"
_postprocess_after_transcribe(p, base)
return str(base)
def enqueue_transcribe(path: Path) -> bool:
"""Enqueue a transcription job to the 'transcribe' queue. Returns True on success."""
try:
if transcribe_paused():
print(f"[queue] pause flag present; enqueuing job for {path} but workers will wait", flush=True)
conn = Redis.from_url(REDIS_URL)
q = Queue("transcribe", connection=conn, default_timeout=60*60*24)
# Use dotted path so workers in other processes can import
q.enqueue("worker.transcribe_job", str(path), job_timeout=60*60*24)
print(f"[queue] enqueued transcribe job for {path}", flush=True)
return True
except Exception as e:
print(f"[queue] enqueue failed, will transcribe inline: {e}", flush=True)
return False
def handle_local_file(path_str: str):
"""Transcribe & index a local media file that already exists in /library.
If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper.
Safe to call repeatedly; it skips if transcript JSON already exists.
"""
try:
p = Path(path_str)
if not p.exists():
log({"url": path_str, "status": "error", "error": "file_not_found"})
return
if WORKER_MODE == "transcribe":
print(f"[mode] transcribe-only worker handling local file: {p}", flush=True)
title = p.stem
base_json = TRN / f"{title}.json"
if base_json.exists():
log({"url": path_str, "status": "skip", "reason": "already_transcribed"})
return
info = {"url": path_str, "status": "transcribing", "title": title,
"uploader": p.parent.name, "date": "", "path": str(p), "progress": 0}
log(info)
# 0) Try RSS resolver first: if episode with transcript exists, use it (skip Whisper)
try:
ep = match_media_to_rss(p)
except Exception as _e:
ep = None
if ep:
base = use_rss_transcript(p, ep)
if base:
index_meili(base.with_suffix(".json"))
publish_to_openwebui([base.with_suffix(".txt")])
log({**info, **{"status": "done", "note": "used_rss_transcript"}})
return
# 1) Prefer an existing transcript sidecar if present
sidecar = find_sidecar_transcript(p)
if sidecar:
plain = transcript_text_from_file(sidecar)
lang = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
base = write_plain_transcript(p, plain, language=lang)
ensure_sidecar_next_to_media(sidecar, p, lang=lang)
index_meili(base.with_suffix(".json"))
publish_to_openwebui([base.with_suffix(".txt")])
try:
# Use info.json (if present) to enrich metadata
fallback = {
"title": title,
"episode_title": title,
"show": p.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(p),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(p, p.parent.name, fallback, ep=None)
ttxt = base.with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(p, meta, ttxt)
# Try to fetch and save artwork locally
try:
save_episode_artwork(meta.get("image"), p, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
log({**info, **{"status": "done", "note": "used_existing_transcript"}})
return
# 1.5) Reuse a transcript that exists in the repository for a matching episode
repo_json = find_repo_transcript_for_media(p)
if repo_json:
base = reuse_repo_transcript(p, repo_json)
if base:
index_meili(base.with_suffix(".json"))
publish_to_openwebui([base.with_suffix(".txt")])
try:
data = json.loads((base.with_suffix(".json")).read_text(encoding="utf-8"))
# Start with repo metadata, then enrich from yt-dlp info.json if any
meta_repo = {
"title": data.get("title") or title,
"episode_title": data.get("title") or title,
"show": data.get("show") or p.parent.name,
"description": data.get("description") or "",
"pubdate": data.get("pubdate") or _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(p),
"image": data.get("image"),
"guid": data.get("guid") or data.get("id"),
}
meta = build_meta_from_sources(p, p.parent.name, meta_repo, ep=None)
ttxt = base.with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(p, meta, ttxt)
try:
save_episode_artwork(meta.get("image"), p, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
log({**info, **{"status": "done", "note": "reused_repo_transcript"}})
return
# 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker)
# If paused, do not block; either enqueue (so worker will pause) or skip now.
if transcribe_paused():
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
log({**info, **{"status": "queued_transcribe"}})
return
log({**info, **{"status": "paused"}})
print(f"[pause] handle_local_file: pause active; not starting {p}", flush=True)
return
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
log({**info, **{"status": "queued_transcribe"}})
return
base = transcribe(p)
_postprocess_after_transcribe(p, base)
log({**info, **{"status": "done"}})
except Exception as e:
log({"url": path_str, "status": "error", "error": str(e)})
raise
# --- Refresh sidecar metadata and subtitles for an already-downloaded media file ---
def refresh_media(path_str: str):
"""
Refresh sidecar metadata (info.json, thumbnail) and subtitles for an already-downloaded media file.
Requires a companion .info.json next to the media (to supply the original URL). No media re-download.
"""
try:
p = Path(path_str)
if not p.exists() or not p.is_file():
log({"url": path_str, "status": "error", "error": "file_not_found"})
return
# Locate existing info.json to get the original URL
info_json = None
for cand in [p.parent / f"{p.name}.info.json", p.parent / f"{p.stem}.info.json"]:
if cand.exists():
info_json = cand
break
if not info_json:
log({"path": str(p), "status": "refresh-skip", "reason": "no_info_json"})
print(f"[refresh] skip: no info.json next to {p}", flush=True)
return
info = load_info_json(info_json) or {}
url = info.get("webpage_url") or info.get("original_url") or info.get("url")
if not url:
log({"path": str(p), "status": "refresh-skip", "reason": "no_url_in_info"})
print(f"[refresh] skip: no URL in {info_json}", flush=True)
return
# Prepare yt-dlp command to refresh sidecars only, writing files exactly next to the media
outtmpl = str(p.with_suffix(".%(ext)s"))
sub_langs = os.getenv("YTDLP_SUBS_LANGS", "en.*,en")
cmd = [
"yt-dlp",
"--skip-download",
"--write-info-json",
"--write-thumbnail",
"--convert-thumbnails", "jpg",
"--write-subs", "--write-auto-subs",
"--sub-langs", sub_langs,
"--convert-subs", "srt",
"-o", outtmpl,
url,
]
print(f"[refresh] refreshing sidecars for {p} via yt-dlp", flush=True)
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
print(f"[refresh] yt-dlp failed: {e}", flush=True)
raise
# Ensure language-suffixed SRT exists (Plex-friendly) if any subs were fetched
try:
# Pick any .srt just fetched that matches base
for s in p.parent.glob(f"{p.stem}*.srt"):
# If it's already lang-suffixed, keep; also copy to .en.srt when only plain .srt exists
if s.name == f"{p.stem}.srt":
shutil.copy2(s, p.with_suffix(".en.srt"))
except Exception:
pass
# Rebuild NFO using fresh info.json (and RSS if available)
try:
# Try RSS match to enrich metadata (non-fatal if not present)
ep = None
try:
ep = match_media_to_rss(p)
except Exception:
ep = None
fallback = {
"title": p.stem,
"episode_title": p.stem,
"show": p.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(p.stem),
"duration_sec": media_duration_seconds(p),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(p, p.parent.name, fallback, ep)
# Save local artwork too
try:
save_episode_artwork(meta.get("image"), p, meta.get("show"))
except Exception:
pass
# If a transcript already exists, include it in the NFO plot preview
ttxt_path = (TRN / p.stem).with_suffix(".txt")
ttxt = ttxt_path.read_text(encoding="utf-8") if ttxt_path.exists() else None
write_episode_nfo(p, meta, ttxt)
except Exception as e:
print(f"[refresh] NFO/artwork update failed: {e}", flush=True)
log({"path": str(p), "status": "refresh-done"})
print(f"[refresh] done for {p}", flush=True)
except Exception as e:
log({"path": path_str, "status": "error", "error": str(e)})
raise
def handle_web(url: str):
if not _mode_allows("web"):
log({"url": url, "status": "skip", "reason": "mode_transcribe_only"})
print(f"[mode] transcribe-only: skipping web snapshot job: {url}", flush=True)
return
info = {"url": url, "status":"web-downloading", "title":"", "uploader":"", "date":"", "path":""}
log(info)
base, title, domain, date, text = save_web_snapshot(url)
info.update({"title": title, "uploader": domain, "date": date, "path": str(base.with_suffix('.html'))})
log({**info, **{"status":"web-indexing"}})
index_web(base, title, domain, date, text, url)
push = [p for p in [base.with_suffix('.txt'), base.with_suffix('.html')] if p.exists()]
publish_to_openwebui(push)
log({**info, **{"status":"done"}})
def handle_url(url: str):
try:
# In transcribe-only mode, refuse non-local/download jobs
if not _mode_allows("download"):
# Only permit local file paths in this mode
if url.startswith("/") or url.startswith("file://"):
return handle_local_file(url[7:] if url.startswith("file://") else url)
log({"url": url, "status": "skip", "reason": "mode_transcribe_only"})
print(f"[mode] transcribe-only: skipping non-local job: {url}", flush=True)
return
# If a local file path (or file:// URL) is provided, process it directly
if url.startswith("file://"):
return handle_local_file(url[7:])
if url.startswith("/") and Path(url).exists():
return handle_local_file(url)
if not is_media_url(url):
handle_web(url)
return
info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""}
log({**info, **{"status":"downloading"}})
files = yt_dlp(url, TMP)
for f in files:
parts = f.relative_to(TMP).parts
uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown"
dest_dir = LIB / uploader
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / sanitize(f.name)
shutil.move(str(f), dest)
# Move companion files produced by yt-dlp (info.json, thumbnail, subtitles)
try:
companions = find_companion_files(f)
# info.json -> prefer "<dest.name>.info.json", fallback to "<dest.stem>.info.json"
if companions.get("info") and companions["info"].exists():
dest_info = dest.parent / f"{dest.name}.info.json"
try:
shutil.move(str(companions["info"]), dest_info)
except Exception:
# fallback naming without extension
dest_info2 = dest.parent / f"{dest.stem}.info.json"
try:
shutil.move(str(companions['info']), dest_info2)
except Exception:
pass
# thumbnail -> "<dest>.jpg"
if companions.get("thumb") and companions["thumb"].exists():
try:
shutil.move(str(companions["thumb"]), str(dest.with_suffix(".jpg")))
except Exception:
pass
# subtitles -> preserve language suffix: "<dest.stem><suffix>"
for s in companions.get("subs", []):
if not s.exists():
continue
suffix_tail = ""
s_name = s.name
f_stem = f.stem
if s_name.startswith(f_stem):
suffix_tail = s_name[len(f_stem):] # includes leading dot if present
else:
suffix_tail = s.suffix
dest_sub = dest.parent / f"{dest.stem}{suffix_tail}"
try:
shutil.move(str(s), str(dest_sub))
except Exception:
pass
except Exception:
pass
info.update({"title": dest.stem, "uploader": uploader,
"date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""),
"path": str(dest)})
log({**info, **{"status":"transcribing", "progress": 0}})
# Try RSS transcript resolver first
ep = None
try:
ep = match_media_to_rss(dest)
except Exception:
ep = None
if ep:
base = use_rss_transcript(dest, ep)
else:
base = None
# 1.5) If we didn't get an RSS transcript and there is a matching one already in the repo, reuse it
if not base:
repo_json = find_repo_transcript_for_media(dest)
if repo_json:
base = reuse_repo_transcript(dest, repo_json)
if not base:
# If paused, do not block; either enqueue (so worker will pause) or skip now.
if transcribe_paused():
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
log({**info, **{"status": "queued_transcribe"}})
continue
log({**info, **{"status": "paused"}})
print(f"[pause] handle_url: pause active; not starting {dest}", flush=True)
continue
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
log({**info, **{"status": "queued_transcribe"}})
continue
base = transcribe(dest)
_postprocess_after_transcribe(dest, base)
log({**info, **{"status":"done"}})
except Exception as e:
log({"url": url, "status":"error", "error": str(e)})
raise