Files
podx/app/worker.py
2025-10-05 16:02:42 +02:00

2428 lines
96 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os, subprocess, shutil, json, re, orjson, requests, unicodedata
from types import SimpleNamespace
from rq import Queue
from redis import Redis
from pathlib import Path
import math
import difflib
import time
from collections import deque
from faster_whisper import WhisperModel
from xml.sax.saxutils import escape as xml_escape
MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700")
MEILI_KEY = os.getenv("MEILI_KEY", "")
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
TMP = Path(os.getenv("TMP_ROOT", "/tmpdl"))
# --- Runtime pause switch for CPU-heavy work (no rebuild needed) ---
PAUSE_TRANSCRIBE_FILE = Path(os.getenv("PAUSE_TRANSCRIBE_FILE", str(TRN / ".pause_transcribe")))
# Redis-backed pause flag (podx-tools compatible)
PAUSE_TRANSCRIBE_REDIS_KEY = os.getenv("PAUSE_TRANSCRIBE_REDIS_KEY", "podx:transcribe:paused").strip()
def _pause_flag_redis() -> bool:
"""Return True if a truthy pause flag is set in Redis under PAUSE_TRANSCRIBE_REDIS_KEY."""
try:
from redis import Redis as _R
val = _R.from_url(REDIS_URL).get(PAUSE_TRANSCRIBE_REDIS_KEY)
if not val:
return False
v = val.decode("utf-8", "ignore").strip().lower()
return v not in ("", "0", "false", "no", "(nil)")
except Exception:
return False
def transcribe_paused() -> bool:
"""Return True if new transcription work should be paused (file flag or Redis flag)."""
try:
if PAUSE_TRANSCRIBE_FILE.exists():
return True
except Exception:
pass
# Fall back to Redis-based switch used by podx-tools
return _pause_flag_redis()
def wait_if_paused(label: str = "transcribe", poll_sec: int = 10):
"""
If the pause file exists, block this worker in a low-CPU sleep loop until it is removed.
This lets you 'pause' heavy work without killing workers or rebuilding.
"""
try:
if transcribe_paused():
print(f"[pause] {label}: pause flag present at {PAUSE_TRANSCRIBE_FILE}; waiting…", flush=True)
while transcribe_paused():
time.sleep(max(1, int(poll_sec)))
except Exception:
# If anything goes wrong reading the flag, don't block the pipeline.
pass
# --- Exception to abort transcription when pause is requested ---
class PauseInterrupt(Exception):
"""Raised to cooperatively abort a running transcription when pause is requested."""
pass
MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3")
COMPUTE = os.getenv("WHISPER_PRECISION","int8")
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip()
# Whisper device/config controls
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "auto").strip()
WHISPER_DEVICE_INDEX = int(os.getenv("WHISPER_DEVICE_INDEX", "0"))
WHISPER_CPU_THREADS = int(os.getenv("WHISPER_CPU_THREADS", "4"))
# Decoding beam size (higher = more memory and quality). On GPU, 1 is typically fine.
try:
_beam_env = os.getenv("WHISPER_BEAM_SIZE", "")
WHISPER_BEAM_SIZE = int(_beam_env) if _beam_env.strip() else (1 if (os.getenv("WHISPER_DEVICE", "auto").strip().lower() == "cuda") else 2)
except Exception:
WHISPER_BEAM_SIZE = 1
# --- Host load guards / thread limits ---
# Limit ffmpeg threads (helps keep CPU in check when multiple workers run)
FFMPEG_THREADS = int(os.getenv("FFMPEG_THREADS", "1"))
# Tame BLAS/threadpools that libraries may spin up implicitly
import os as _os_threads
_os_threads.environ.setdefault("OMP_NUM_THREADS", str(WHISPER_CPU_THREADS))
_os_threads.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
_os_threads.environ.setdefault("MKL_NUM_THREADS", "1")
_os_threads.environ.setdefault("NUMEXPR_NUM_THREADS", "1")
# Whisper logging & resume controls
WHISPER_LOG_SEGMENTS = os.getenv("WHISPER_LOG_SEGMENTS", "1") not in ("0", "false", "False")
WHISPER_RESUME = os.getenv("WHISPER_RESUME", "1") not in ("0", "false", "False")
PARTIAL_SAVE_EVERY_SEGS = int(os.getenv("WHISPER_PARTIAL_SAVE_EVERY_SEGS", "20"))
# RSS resolver config
RSS_INDEX_PATH = Path(os.getenv("RSS_INDEX_PATH", "/transcripts/rss_index.json"))
RSS_DURATION_TOLERANCE = int(os.getenv("RSS_DURATION_TOLERANCE", "150")) # seconds
DEFAULT_TRANSCRIPT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
def _clean_extension(raw: str, fallback: str) -> str:
raw = (raw or fallback or "").strip()
if not raw:
raw = fallback
if not raw.startswith("."):
raw = f".{raw}"
return raw.lower()
OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/")
OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "")
OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library")
OWUI_AUTO_FIX_METADATA = os.getenv("OPENWEBUI_AUTO_FIX_METADATA", "1").strip().lower() not in ("0", "false", "no")
OWUI_METADATA_TEMPLATE_JSON = os.getenv("OPENWEBUI_METADATA_TEMPLATE_JSON", "").strip()
_OWUI_TEMPLATE_PATCHED: set[str] = set()
# Media normalisation options (transcoding for Plex-friendly formats)
MEDIA_NORMALIZE = os.getenv("MEDIA_NORMALIZE", "1").strip().lower() not in ("0", "false", "no")
MEDIA_NORMALIZE_KEEP_ORIGINAL = os.getenv("MEDIA_NORMALIZE_KEEP_ORIGINAL", "0").strip().lower() in ("1", "true", "yes")
VIDEO_NORMALIZE_CODEC = os.getenv("VIDEO_NORMALIZE_CODEC", "hevc").strip().lower()
VIDEO_NORMALIZE_EXTENSION = _clean_extension(os.getenv("VIDEO_NORMALIZE_EXTENSION", ".mp4"), ".mp4")
VIDEO_NORMALIZE_CRF = os.getenv("VIDEO_NORMALIZE_CRF", "28").strip()
VIDEO_NORMALIZE_PRESET = os.getenv("VIDEO_NORMALIZE_PRESET", "medium").strip()
VIDEO_NORMALIZE_TUNE = os.getenv("VIDEO_NORMALIZE_TUNE", "").strip()
VIDEO_NORMALIZE_AUDIO_CODEC = os.getenv("VIDEO_NORMALIZE_AUDIO_CODEC", "aac").strip().lower()
VIDEO_NORMALIZE_AUDIO_BITRATE = os.getenv("VIDEO_NORMALIZE_AUDIO_BITRATE", "160k").strip()
AUDIO_NORMALIZE_CODEC = os.getenv("AUDIO_NORMALIZE_CODEC", "libmp3lame").strip()
AUDIO_NORMALIZE_EXTENSION = _clean_extension(os.getenv("AUDIO_NORMALIZE_EXTENSION", ".mp3"), ".mp3")
AUDIO_NORMALIZE_BITRATE = os.getenv("AUDIO_NORMALIZE_BITRATE", "192k").strip()
AUDIO_NORMALIZE_CHANNELS = os.getenv("AUDIO_NORMALIZE_CHANNELS", "2").strip()
# Redis-backed job queue settings and offload toggle
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0").strip()
OFFLOAD_TRANSCRIBE = os.getenv("OFFLOAD_TRANSCRIBE", "1").lower() not in ("0", "false", "no")
# Worker role selection
WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe'
JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()]
# Remote transcription (OpenAI) configuration
TRANSCRIBE_BACKEND = os.getenv("TRANSCRIBE_BACKEND", "local").strip().lower()
OPENAI_API_KEY = (os.getenv("OPENAI_API_KEY", "") or "").strip()
OPENAI_BASE_URL = (os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") or "https://api.openai.com/v1").rstrip("/")
OPENAI_TRANSCRIBE_MODEL = os.getenv("OPENAI_TRANSCRIBE_MODEL", "whisper-1").strip()
OPENAI_TRANSCRIBE_TIMEOUT = int(os.getenv("OPENAI_TRANSCRIBE_TIMEOUT", "600"))
def _mode_allows(task: str) -> bool:
"""Gate tasks by worker role. In 'transcribe' mode only allow transcription of local files
(including indexing and OWUI publish). "task" is one of: 'download','web','local','transcribe'."""
if WORKER_MODE == "transcribe":
return task in {"local", "transcribe"}
return True
TRN.mkdir(parents=True, exist_ok=True)
LIB.mkdir(parents=True, exist_ok=True)
TMP.mkdir(parents=True, exist_ok=True)
# Lazy Whisper model loader so the worker can start even if model download/setup is slow
_model = None
def get_model():
global _model
if _model is None:
print(f"[whisper] loading model='{MODEL_NAME}' device='{WHISPER_DEVICE}' idx={WHISPER_DEVICE_INDEX} compute='{COMPUTE}' threads={WHISPER_CPU_THREADS}", flush=True)
try:
_model = WhisperModel(
MODEL_NAME,
device=WHISPER_DEVICE,
device_index=WHISPER_DEVICE_INDEX,
compute_type=COMPUTE,
cpu_threads=WHISPER_CPU_THREADS,
)
except Exception as e:
# If GPU is selected/auto-selected but not available, some environments try to load
# CUDA/cuDNN and fail. Fall back to CPU automatically.
msg = str(e).lower()
gpu_markers = [
"cuda", "cublas", "cudnn", "hip", "rocm", "nvrtc", "gpu",
"unable to load any of {libcudnn", "cannot load symbol cudnncreatetensordescriptor",
]
if WHISPER_DEVICE.lower() != "cpu" and any(m in msg for m in gpu_markers):
print(f"[whisper] model init failed on device '{WHISPER_DEVICE}': {e}. Falling back to CPU…", flush=True)
_model = WhisperModel(
MODEL_NAME,
device="cpu",
device_index=0,
compute_type=COMPUTE,
cpu_threads=WHISPER_CPU_THREADS,
)
else:
raise
return _model
# --- Helper: Reset model with new device and device_index ---
def reset_model(device: str, device_index: int | None = None, compute_type: str | None = None):
"""Reset the global _model to a new WhisperModel with the given device and device_index."""
global _model
idx = device_index if device_index is not None else WHISPER_DEVICE_INDEX
ctype = compute_type or COMPUTE
print(f"[whisper] resetting model='{MODEL_NAME}' device='{device}' idx={idx} compute='{ctype}' threads={WHISPER_CPU_THREADS}", flush=True)
_model = WhisperModel(
MODEL_NAME,
device=device,
device_index=idx,
compute_type=ctype,
cpu_threads=WHISPER_CPU_THREADS,
)
# --- Helper: Run transcribe with fallback to CPU on GPU/oom errors ---
def run_transcribe_with_fallback(wav_path: Path, lang):
"""
Try to transcribe with current model; on GPU/CUDA/HIP/ROCm/OOM errors, reset to CPU and retry once.
Returns (segments, info) or raises exception.
"""
# First attempt with current settings
try:
model = get_model()
return model.transcribe(str(wav_path), vad_filter=True, language=lang, beam_size=WHISPER_BEAM_SIZE)
except Exception as e:
msg = str(e)
lower_msg = msg.lower()
oom_markers = ["out of memory", "cudaerrormemoryallocation", "cublas", "cuda", "cudnn"]
# If we encountered a GPU-related error, attempt progressive fallbacks on GPU before CPU
if any(m in lower_msg for m in oom_markers):
# Decide GPU compute fallback ladder
compute_chain = []
base = (COMPUTE or "float16").lower()
if base not in compute_chain:
compute_chain.append(base)
for c in ("int8_float16", "int8"):
if c not in compute_chain:
compute_chain.append(c)
for ctype in compute_chain[1:]: # skip the first (already tried via get_model())
try:
print(f"[whisper] GPU error '{msg}'. Retrying with compute_type='{ctype}' and beam_size=1...", flush=True)
reset_model("cuda", WHISPER_DEVICE_INDEX, compute_type=ctype)
model = get_model()
return model.transcribe(str(wav_path), vad_filter=True, language=lang, beam_size=1)
except Exception as e2:
emsg = str(e2).lower()
if not any(m in emsg for m in oom_markers):
# Non-GPU/oom error — rethrow
print(f"[whisper] GPU retry failed: {e2}", flush=True)
raise
# else continue to next fallback
# Fall back to CPU if all GPU fallbacks failed
print(f"[whisper] GPU attempts exhausted ('{msg}'); falling back to CPU int8", flush=True)
reset_model("cpu", 0, compute_type="int8")
try:
model = get_model()
return model.transcribe(str(wav_path), vad_filter=True, language=lang, beam_size=1)
except Exception as e3:
print(f"[whisper] CPU fallback also failed: {e3}", flush=True)
raise
# Non-GPU error — rethrow
raise
def run_transcribe_openai(wav_path: Path, lang_hint: str | None):
"""Transcribe audio via OpenAI's Whisper API, returning (segments, info, raw_payload)."""
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY must be set when TRANSCRIBE_BACKEND is 'openai'")
url = f"{OPENAI_BASE_URL}/audio/transcriptions"
headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"}
data: dict[str, str] = {
"model": OPENAI_TRANSCRIBE_MODEL or "whisper-1",
"response_format": "verbose_json",
}
if lang_hint:
data["language"] = lang_hint
start = time.time()
with open(wav_path, "rb") as fh:
files = {"file": (wav_path.name, fh, "audio/wav")}
resp = requests.post(
url,
headers=headers,
data=data,
files=files,
timeout=OPENAI_TRANSCRIBE_TIMEOUT,
)
elapsed = time.time() - start
try:
resp.raise_for_status()
except requests.HTTPError as exc:
print(f"[openai] transcription failed ({exc}); response={resp.text[:400]}", flush=True)
raise
payload = resp.json()
segments_raw = payload.get("segments") or []
seg_objs: list[SimpleNamespace] = []
for seg in segments_raw:
seg_objs.append(
SimpleNamespace(
start=float(seg.get("start") or 0.0),
end=float(seg.get("end") or 0.0),
text=str(seg.get("text") or ""),
)
)
if not seg_objs and payload.get("text"):
duration = float(payload.get("duration") or 0.0)
seg_objs.append(
SimpleNamespace(
start=0.0,
end=duration,
text=str(payload.get("text") or ""),
)
)
language = payload.get("language") or lang_hint or ""
info = SimpleNamespace(language=language)
print(
f"[openai] transcribed {wav_path.name} via {OPENAI_TRANSCRIBE_MODEL or 'whisper-1'} "
f"in {elapsed:.1f}s; segments={len(seg_objs)} lang={language or 'unknown'}",
flush=True,
)
return seg_objs, info, payload
def log(feed):
try:
with open(TRN / "_feed.log", "a", encoding="utf-8") as f:
f.write(orjson.dumps(feed).decode()+"\n")
except Exception:
pass
def sanitize(name):
return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip()
# ---------- RSS transcript resolver ----------
def _normalize_title(t: str) -> str:
t = (t or "").lower()
t = re.sub(r"\s+", " ", t)
# remove punctuation-ish
t = re.sub(r"[^a-z0-9 _-]+", "", t)
return t.strip()
def _stem_without_date(stem: str) -> str:
# drop leading YYYYMMDD - from filenames created by yt-dlp template
m = re.match(r"^\d{8}\s*-\s*(.*)$", stem)
return m.group(1) if m else stem
def _extract_date_from_stem(stem: str) -> str | None:
m = re.search(r"\b(\d{8})\b", stem)
return m.group(1) if m else None
def _best_title_match(title: str, candidates: list[str]) -> tuple[str, float]:
"""Return (best_title, score 0..1) using difflib SequenceMatcher."""
if not candidates:
return "", 0.0
norm_title = _normalize_title(title)
best = ("", 0.0)
for c in candidates:
score = difflib.SequenceMatcher(None, norm_title, _normalize_title(c)).ratio()
if score > best[1]:
best = (c, score)
return best
def _load_rss_index() -> list[dict]:
try:
if RSS_INDEX_PATH.exists():
data = json.loads(RSS_INDEX_PATH.read_text(encoding="utf-8"))
# supports {"episodes":[...]} or a flat list
if isinstance(data, dict) and "episodes" in data:
return data["episodes"] or []
if isinstance(data, list):
return data
except Exception as e:
print(f"[resolver] failed to load RSS index: {e}", flush=True)
return []
def match_media_to_rss(media_path: Path) -> dict | None:
"""Try to match a local media file to an RSS episode entry."""
episodes = _load_rss_index()
if not episodes:
return None
stem = media_path.stem
title_no_date = _stem_without_date(stem)
file_date = _extract_date_from_stem(stem)
# duration tolerance
media_dur = media_duration_seconds(media_path)
# Candidates: filter by date if present, else all
if file_date:
pool = [e for e in episodes if (str(e.get("date", "")) == file_date or str(e.get("pubdate", "")) == file_date)]
if not pool:
pool = episodes
else:
pool = episodes
# Pick best by (title similarity, duration proximity)
best_ep, best_score = None, -1.0
for ep in pool:
ep_title = ep.get("title") or ep.get("itunes_title") or ""
sim = _best_title_match(title_no_date, [ep_title])[1]
dur = float(ep.get("duration_sec") or ep.get("duration") or 0.0)
dur_ok = True
if media_dur and dur:
dur_ok = abs(media_dur - dur) <= RSS_DURATION_TOLERANCE
score = sim + (0.1 if dur_ok else 0.0)
if score > best_score:
best_score, best_ep = score, ep
if best_ep and best_score >= 0.5:
print(f"[resolver] matched '{stem}' -> '{best_ep.get('title','')}' score={best_score:.2f}", flush=True)
return best_ep
return None
def _choose_transcript_url(ep: dict) -> tuple[str, str] | tuple[None, None]:
"""Return (url, kind) preferring txt, vtt, then srt. 'kind' in {'txt','vtt','srt'}."""
# unified structure from rss_ingest.py: ep["transcripts"] = [{"url":..., "type": ...}, ...]
items = ep.get("transcripts") or []
# some ingesters store separate keys
if not items:
for key, kind in [("transcript_txt","txt"), ("transcript_vtt","vtt"), ("transcript_srt","srt")]:
if ep.get(key):
items.append({"url": ep[key], "type": kind})
# preference order
for kind in ["txt", "vtt", "srt"]:
for it in items:
t = (it.get("type") or "").lower()
u = it.get("url") or ""
if u and (kind in t or (kind == "txt" and t in ["text","plain","text/plain"]) or (kind in u.lower())):
return u, kind
return (None, None)
def fetch_rss_transcript(ep: dict, dest_dir: Path) -> Path | None:
"""Download transcript to dest_dir and return local Path; convert VTT->SRT if needed."""
url, kind = _choose_transcript_url(ep)
if not url:
return None
dest_dir.mkdir(parents=True, exist_ok=True)
# filename from episode title
safe = sanitize(ep.get("title") or ep.get("guid") or "episode")
path = dest_dir / f"{safe}.{kind if kind!='txt' else 'txt'}"
try:
r = requests.get(url, timeout=30)
r.raise_for_status()
mode = "wb" if kind in ("vtt","srt") else "w"
if mode == "wb":
path.write_bytes(r.content)
else:
path.write_text(r.text, encoding="utf-8")
print(f"[resolver] downloaded transcript ({kind}) from {url}", flush=True)
return path
except Exception as e:
print(f"[resolver] failed to fetch transcript: {e}", flush=True)
return None
def use_rss_transcript(media_path: Path, ep: dict) -> Path | None:
"""Create standard transcript artifacts from an RSS transcript (txt/vtt/srt)."""
# Prefer direct download; else if rss_ingest already saved a local file path, try that.
sidecar = None
local_hint = ep.get("transcript_local")
if local_hint:
p = Path(local_hint)
if p.exists():
sidecar = p
if sidecar is None:
sidecar = fetch_rss_transcript(ep, TMP)
if not sidecar or not sidecar.exists():
return None
# Convert to plain text
plain = transcript_text_from_file(sidecar)
lang = (ep.get("language") or ep.get("lang") or DEFAULT_TRANSCRIPT_LANG).split("-")[0]
base = write_plain_transcript(media_path, plain, language=lang)
# Place an SRT next to video for Plex
ensure_sidecar_next_to_media(sidecar, media_path, lang=lang)
# Write provenance sidecar
(base.with_suffix(".prov.json")).write_bytes(orjson.dumps({
"source": "rss",
"feed": ep.get("feed_url"),
"guid": ep.get("guid"),
"episode_title": ep.get("title"),
"transcript_kind": sidecar.suffix.lower().lstrip("."),
"transcript_url": _choose_transcript_url(ep)[0] or "",
}))
# Write Kodi/Plex-compatible NFO
try:
# Gather metadata for NFO from RSS entry
meta = {
"title": ep.get("title"),
"episode_title": ep.get("title"),
"show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show"),
"description": ep.get("description") or ep.get("content"),
"pubdate": ep.get("pubdate"),
"pubdate_iso": ep.get("date_iso"),
"duration_sec": ep.get("duration_sec") or ep.get("duration"),
"image": ep.get("image") or ep.get("image_url"),
"guid": ep.get("guid"),
}
txt_path = base.with_suffix(".txt")
transcript_text = txt_path.read_text(encoding="utf-8") if txt_path.exists() else None
write_episode_nfo(media_path, meta, transcript_text)
# Save local artwork for Plex/Kodi
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
return base
def find_sidecar_transcript(media_path: Path) -> Path | None:
"""Return a .txt/.srt/.vtt transcript file sitting next to media, if any.
Tries common variants including language-suffixed SRT/VTT.
"""
candidates: list[Path] = []
# exact same stem in same folder
for ext in [".txt", ".srt", ".vtt"]:
p = media_path.parent / (media_path.stem + ext)
if p.exists():
candidates.append(p)
# language-suffixed near the media file (e.g., .en.srt)
for ext in [".srt", ".vtt"]:
p = media_path.with_suffix(f".en{ext}")
if p.exists() and p not in candidates:
candidates.append(p)
return candidates[0] if candidates else None
# ---------- Transcript repository reuse helpers ----------
def find_repo_transcript_for_media(media_path: Path) -> Path | None:
"""Search the transcript repository (/transcripts) for an existing transcript
that likely belongs to this media file (match by YYYYMMDD in filename and/or
fuzzy title similarity). Returns a path to a matching .json if found."""
try:
stem = media_path.stem
title_no_date = _stem_without_date(stem)
file_date = _extract_date_from_stem(stem)
best_json, best_score = None, 0.0
for j in TRN.glob("*.json"):
try:
data = json.loads(j.read_text(encoding="utf-8"))
except Exception:
continue
other_file = Path(data.get("file", ""))
other_stem = other_file.stem if other_file else j.stem
other_date = _extract_date_from_stem(other_stem)
# If both have dates and they differ a lot, skip
if file_date and other_date and file_date != other_date:
continue
# Compare titles (without dates)
sim = difflib.SequenceMatcher(
None,
_normalize_title(title_no_date),
_normalize_title(_stem_without_date(other_stem)),
).ratio()
# Nudge score when dates match
if file_date and other_date and file_date == other_date:
sim += 0.1
if sim > best_score:
best_score, best_json = sim, j
# Require a reasonable similarity
return best_json if best_json and best_score >= 0.60 else None
except Exception:
return None
def reuse_repo_transcript(media_path: Path, repo_json: Path) -> Path | None:
"""Copy/retarget an existing transcript JSON/TXT (and make SRT/VTT if possible)
from the repository so that it belongs to the provided media_path. Returns
the new base path in /transcripts or None."""
try:
# load the source transcript
data = json.loads(repo_json.read_text(encoding="utf-8"))
src_base = TRN / Path(repo_json).stem
src_txt = src_base.with_suffix(".txt")
src_srt = src_base.with_suffix(".srt")
src_vtt = src_base.with_suffix(".vtt")
# write the retargeted artifacts
new_title = media_path.stem
new_base = TRN / new_title
new_base.parent.mkdir(parents=True, exist_ok=True)
# update file path
data["file"] = str(media_path)
(new_base.with_suffix(".json")).write_bytes(orjson.dumps(data))
# copy or synthesize TXT
if src_txt.exists():
_safe_copy(src_txt, new_base.with_suffix(".txt"))
else:
# fallback: concatenate segments
txt = " ".join(s.get("text", "") for s in data.get("segments", []))
(new_base.with_suffix(".txt")).write_text(txt, encoding="utf-8")
# copy SRT/VTT if present; otherwise synthesize SRT from segments
if src_srt.exists():
_safe_copy(src_srt, new_base.with_suffix(".srt"))
else:
# synthesize SRT
def fmt_ts(t):
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
with open(new_base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
for i, s in enumerate(data.get("segments", []), 1):
srt.write(f"{i}\n{fmt_ts(s.get('start',0.0))} --> {fmt_ts(s.get('end',0.0))}\n{s.get('text','').strip()}\n\n")
if src_vtt.exists():
_safe_copy(src_vtt, new_base.with_suffix(".vtt"))
else:
# synthesize VTT from segments
def fmt_ts_vtt(t):
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
return f"{h:02}:{m:02}:{s:06.3f}"
with open(new_base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
vtt.write("WEBVTT\n\n")
for s in data.get("segments", []):
vtt.write(f"{fmt_ts_vtt(s.get('start',0.0))} --> {fmt_ts_vtt(s.get('end',0.0))} \n{s.get('text','').strip()}\n\n")
# ensure sidecar next to media
try:
lang = (data.get("language") or DEFAULT_TRANSCRIPT_LANG).split("-")[0]
ensure_sidecar_next_to_media(new_base.with_suffix(".srt"), media_path, lang=lang)
except Exception:
pass
# Write Kodi/Plex-compatible NFO
try:
meta = {
"title": data.get("title") or media_path.stem,
"episode_title": data.get("title") or media_path.stem,
"show": data.get("show") or media_path.parent.name,
"description": data.get("description") or "",
"pubdate": data.get("pubdate") or data.get("date"),
"duration_sec": media_duration_seconds(media_path),
"image": data.get("image"),
"guid": data.get("guid") or data.get("id"),
}
txtp = new_base.with_suffix(".txt")
ttxt = txtp.read_text(encoding="utf-8") if txtp.exists() else None
write_episode_nfo(media_path, meta, ttxt)
# Save local artwork for Plex/Kodi
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
return new_base
except Exception as e:
print(f"[resolver] failed to reuse repo transcript: {e}", flush=True)
return None
def transcript_text_from_file(path: Path) -> str:
"""Extract plain text from .txt/.srt/.vtt by stripping timestamps and counters."""
try:
raw = path.read_text(encoding="utf-8", errors="ignore")
except Exception:
raw = path.read_text(errors="ignore")
if path.suffix.lower() == ".txt":
return raw.strip()
# For SRT/VTT, drop timestamp lines, cue numbers and headers
lines: list[str] = []
for line in raw.splitlines():
ls = line.strip()
if not ls:
continue
if "-->" in ls: # timestamp line
continue
if ls.upper().startswith("WEBVTT"):
continue
if re.match(r"^\d+$", ls): # cue index
continue
lines.append(ls)
return " ".join(lines)
def ensure_sidecar_next_to_media(sidecar: Path, media_path: Path, lang: str = "en") -> None:
"""Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed. If the sidecar is .txt, do nothing."""
try:
if sidecar.suffix.lower() == ".txt":
return
if sidecar.suffix.lower() == ".srt":
dst = media_path.with_suffix(f".{lang}.srt")
_safe_copy(sidecar, dst)
elif sidecar.suffix.lower() == ".vtt":
tmp_srt = sidecar.with_suffix(".srt")
subprocess.run(["ffmpeg", "-nostdin", "-y", "-threads", str(FFMPEG_THREADS), "-i", str(sidecar), str(tmp_srt)], check=True)
dst = media_path.with_suffix(f".{lang}.srt")
shutil.move(str(tmp_srt), dst)
except Exception as e:
print(f"[post] sidecar copy/convert failed: {e}", flush=True)
# --- small helpers for progress/ETA formatting ---
def _fmt_eta(sec: float) -> str:
try:
sec = max(0, int(sec))
h, rem = divmod(sec, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h {m}m {s}s"
if m:
return f"{m}m {s}s"
return f"{s}s"
except Exception:
return ""
def save_episode_artwork(image_url: str | None, media_path: Path, show_title: str | None = None):
"""Download episode artwork from image_url and save next to the media as '<basename>.jpg'.
Also drop a folder-level 'poster.jpg' for the show directory if not present.
Best-effort; failures are logged but non-fatal.
"""
if not image_url:
return
try:
resp = requests.get(image_url, timeout=30, stream=True)
resp.raise_for_status()
# Determine content-type and write a temporary file
ctype = (resp.headers.get("Content-Type") or "").lower()
tmp_file = media_path.with_suffix(".art.tmp")
with open(tmp_file, "wb") as out:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
out.write(chunk)
# Always provide a .jpg next to the media for Plex
episode_jpg = media_path.with_suffix(".jpg")
if "image/jpeg" in ctype:
# Already JPEG
shutil.move(str(tmp_file), str(episode_jpg))
else:
# Try converting to JPEG with ffmpeg; if it fails, keep bytes as-is
try:
subprocess.run(
["ffmpeg", "-nostdin", "-y", "-i", str(tmp_file), str(episode_jpg)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
)
try:
tmp_file.unlink()
except Exception:
pass
except Exception:
shutil.move(str(tmp_file), str(episode_jpg))
# Also drop a folder poster once per show (helps Plex folder views)
try:
show_poster = media_path.parent / "poster.jpg"
if not show_poster.exists():
_safe_copy(episode_jpg, show_poster)
except Exception:
pass
except Exception as e:
print(f"[post] artwork download failed: {e}", flush=True)
def find_companion_files(src: Path) -> dict:
"""Return likely yt-dlp companion files for a downloaded media file."""
out = {}
# info.json can be either "<name>.<ext>.info.json" or "<name>.info.json"
cands_info = [
src.parent / f"{src.name}.info.json",
src.parent / f"{src.stem}.info.json",
]
out["info"] = next((p for p in cands_info if p.exists()), None)
# thumbnails may be "<name>.<ext>.jpg" or "<name>.jpg" (we convert to jpg)
cand_thumbs = [
src.parent / f"{src.name}.jpg",
src.parent / f"{src.stem}.jpg",
src.parent / f"{src.stem}.jpeg",
src.parent / f"{src.stem}.png",
src.parent / f"{src.stem}.webp",
]
out["thumb"] = next((p for p in cand_thumbs if p.exists()), None)
# subtitles (keep multiple)
subs = []
for s in src.parent.glob(f"{src.stem}*.srt"):
subs.append(s)
for s in src.parent.glob(f"{src.stem}*.vtt"):
subs.append(s)
out["subs"] = subs
return out
def load_info_json(path: Path) -> dict | None:
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
def _iso_from_yyyymmdd(s: str | None) -> str | None:
if not s or not re.match(r"^\d{8}$", s):
return None
return f"{s[0:4]}-{s[4:6]}-{s[6:8]}"
def build_meta_from_sources(media_path: Path, uploader: str, fallback_meta: dict, ep: dict | None = None) -> dict:
"""
Merge metadata from (priority): RSS episode `ep` -> yt-dlp info.json (if present) -> fallback.
Returns a dict compatible with write_episode_nfo().
"""
# Start with fallback
meta = dict(fallback_meta)
# Augment from info.json if present
info = None
for cand in [
media_path.parent / f"{media_path.name}.info.json",
media_path.parent / f"{media_path.stem}.info.json",
]:
if cand.exists():
info = load_info_json(cand)
break
if info:
meta.setdefault("title", info.get("title"))
meta.setdefault("episode_title", info.get("title"))
meta.setdefault("description", info.get("description") or info.get("fulltitle"))
# upload_date is YYYYMMDD
iso = _iso_from_yyyymmdd(info.get("upload_date"))
if iso:
meta["pubdate_iso"] = iso
# Prefer video duration if present
if not meta.get("duration_sec") and info.get("duration"):
meta["duration_sec"] = info.get("duration")
# thumbnail URL
if not meta.get("image"):
meta["image"] = info.get("thumbnail")
# show/uploader
if not meta.get("show"):
meta["show"] = info.get("uploader") or uploader
# Finally, layer RSS data on top if available (most authoritative for podcasts)
if ep:
meta.update({
"title": ep.get("title") or meta.get("title"),
"episode_title": ep.get("title") or meta.get("episode_title"),
"show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show") or meta.get("show") or uploader,
"description": ep.get("description") or ep.get("content") or meta.get("description", ""),
"pubdate": ep.get("pubdate") or meta.get("pubdate", ""),
"pubdate_iso": ep.get("date_iso") or meta.get("pubdate_iso", meta.get("pubdate")),
"duration_sec": ep.get("duration_sec") or ep.get("duration") or meta.get("duration_sec"),
"image": ep.get("image") or ep.get("image_url") or meta.get("image", ""),
"guid": ep.get("guid") or meta.get("guid", ""),
})
return meta
# ---------- Kodi/Plex NFO writer ----------
from datetime import datetime
def _first_nonempty(*vals):
for v in vals:
if v is None:
continue
if isinstance(v, str) and v.strip():
return v.strip()
if v:
return v
return None
def _coerce_aired(pubdate: str | None) -> str:
"""Convert RSS-style pubdate to YYYY-MM-DD if possible."""
if not pubdate:
return ""
# already ISO-like
m = re.match(r"^(\d{4})[-/](\d{2})[-/](\d{2})", pubdate)
if m:
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
# RFC 2822 example: Tue, 21 Feb 2023 06:00:00 +0000
try:
dt = datetime.strptime(pubdate[:31], "%a, %d %b %Y %H:%M:%S %z")
return dt.strftime("%Y-%m-%d")
except Exception:
# try without tz
try:
dt = datetime.strptime(pubdate[:25], "%a, %d %b %Y %H:%M:%S")
return dt.strftime("%Y-%m-%d")
except Exception:
return ""
def write_episode_nfo(media_path: Path, meta: dict, transcript_text: str | None = None) -> Path:
"""Write a minimal Kodi/Plex-compatible NFO next to the media file.
`meta` may include: title, show, plot, pubdate, duration_sec, thumb, guid.
"""
try:
title = _first_nonempty(meta.get("episode_title"), meta.get("title"), media_path.stem) or media_path.stem
show = _first_nonempty(meta.get("show"), meta.get("podcast_title"), meta.get("feed_title"), media_path.parent.name) or media_path.parent.name
plot = _first_nonempty(meta.get("description"), meta.get("content"), meta.get("summary"), "") or ""
# Optionally append transcript preview to plot
if transcript_text:
preview = transcript_text.strip()
if preview:
preview = (preview[:1800] + "") if len(preview) > 1800 else preview
plot = (plot + "\n\n" if plot else "") + preview
aired = _coerce_aired(_first_nonempty(meta.get("pubdate_iso"), meta.get("pubdate")))
guid = _first_nonempty(meta.get("guid"), meta.get("id"), "") or ""
thumb = _first_nonempty(meta.get("image"), meta.get("image_url"), meta.get("thumbnail"), "") or ""
dur_s = meta.get("duration_sec") or meta.get("duration") or 0
try:
dur_min = int(round(float(dur_s) / 60.0)) if dur_s else 0
except Exception:
dur_min = 0
# Build XML
xml = ["<episodedetails>"]
xml.append(f" <title>{xml_escape(title)}</title>")
xml.append(f" <showtitle>{xml_escape(show)}</showtitle>")
if plot:
xml.append(f" <plot>{xml_escape(plot)}</plot>")
if aired:
xml.append(f" <aired>{xml_escape(aired)}</aired>")
if guid:
xml.append(f" <uniqueid type=\"guid\" default=\"true\">{xml_escape(guid)}</uniqueid>")
if dur_min:
xml.append(f" <runtime>{dur_min}</runtime>")
if thumb:
xml.append(f" <thumb>{xml_escape(thumb)}</thumb>")
xml.append("</episodedetails>\n")
nfo_path = media_path.with_suffix(".nfo")
nfo_path.write_text("\n".join(xml), encoding="utf-8")
return nfo_path
except Exception:
return media_path.with_suffix(".nfo")
def write_plain_transcript(media_path: Path, text: str, language: str = "en") -> Path:
"""Write minimal transcript artifacts (.txt + .json) from plain text (no timestamps)."""
title = media_path.stem
base = TRN / title
base.parent.mkdir(parents=True, exist_ok=True)
(base.with_suffix(".txt")).write_text(text, encoding="utf-8")
(base.with_suffix(".json")).write_bytes(orjson.dumps({
"file": str(media_path),
"language": language,
"segments": [{"start": 0.0, "end": 0.0, "text": text}]
}))
return base
def yt_dlp(url, outdir):
# 1) Normalize YouTube Music URLs to standard YouTube
yurl = url
if 'music.youtube.com' in yurl:
yurl = yurl.replace('music.youtube.com', 'www.youtube.com')
outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s")
base_cmd = [
"yt-dlp", "-o", outtmpl,
"-f", "bv*+ba/best",
"--write-info-json",
"--write-thumbnail",
"--convert-thumbnails", "jpg",
"--write-subs", "--write-auto-subs",
"--sub-langs", os.getenv("YTDLP_SUBS_LANGS", "en.*,en"),
"--convert-subs", "srt",
"--no-playlist", "--no-warnings", "--restrict-filenames",
]
# 3) Optional cookies (set YTDLP_COOKIES=/path/to/cookies.txt in .env and mount it)
cookies_path = os.getenv("YTDLP_COOKIES", "").strip()
if cookies_path:
base_cmd += ["--cookies", cookies_path]
# Primary attempt
try:
subprocess.check_call(base_cmd + [yurl])
except subprocess.CalledProcessError:
# 2) Retry with Android client + mobile UA
retry_cmd = base_cmd + [
"--extractor-args", "youtube:player_client=android",
"--user-agent", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Mobile Safari/537.36",
yurl,
]
subprocess.check_call(retry_cmd)
media = (
list(outdir.rglob("*.[mM][pP]4")) +
list(outdir.rglob("*.mkv")) +
list(outdir.rglob("*.webm")) +
list(outdir.rglob("*.m4a")) +
list(outdir.rglob("*.mp3"))
)
return sorted(media, key=lambda p: p.stat().st_mtime)[-1:]
def extract_audio(src: Path, outdir: Path) -> Path:
"""Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs)."""
outdir.mkdir(parents=True, exist_ok=True)
wav_path = outdir / (src.stem + ".wav")
# Force audio-only, mono, 16kHz WAV
cmd = [
"ffmpeg", "-nostdin", "-y",
"-threads", str(FFMPEG_THREADS),
"-i", str(src),
"-vn", "-ac", "1", "-ar", "16000",
"-f", "wav", str(wav_path),
]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}")
return wav_path
# --- WAV trimming helper ---
def trim_wav(src_wav: Path, start_sec: float, outdir: Path) -> Path:
"""Return a trimmed 16k mono WAV starting at start_sec from src_wav."""
outdir.mkdir(parents=True, exist_ok=True)
if not start_sec or start_sec <= 0.0:
return src_wav
dst = outdir / (src_wav.stem + f".from_{int(start_sec)}s.wav")
try:
subprocess.check_output([
"ffmpeg", "-nostdin", "-y",
"-ss", str(max(0.0, float(start_sec))),
"-i", str(src_wav),
"-vn", "-ac", "1", "-ar", "16000",
"-f", "wav", str(dst),
], stderr=subprocess.STDOUT)
return dst
except subprocess.CalledProcessError as e:
# If trimming fails, fall back to full file
print(f"[whisper] trim failed, using full WAV: {e.output.decode(errors='ignore')}", flush=True)
return src_wav
def media_duration_seconds(path: Path) -> float:
"""Return duration in seconds using ffprobe; fallback to 0.0 on error."""
try:
out = subprocess.check_output([
"ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=nokey=1:noprint_wrappers=1", str(path)
], stderr=subprocess.STDOUT, text=True).strip()
return float(out) if out else 0.0
except Exception:
return 0.0
# --- Partial transcript helpers ---
def _partial_paths(title: str) -> tuple[Path, Path]:
base = TRN / title
return base.with_suffix(".partial.json"), base.with_suffix(".partial.txt")
def _save_partial(title: str, language: str, segs: list[dict]):
pjson, ptxt = _partial_paths(title)
try:
# Save JSON
pjson.write_bytes(orjson.dumps({"file": str((TRN / title).with_suffix('.wav')), "language": language, "segments": segs}))
except Exception as e:
print(f"[whisper] partial json save failed: {e}", flush=True)
try:
# Save TXT snapshot
ptxt.write_text(" ".join(s.get("text","") for s in segs), encoding="utf-8")
except Exception as e:
print(f"[whisper] partial txt save failed: {e}", flush=True)
def transcribe(media_path: Path):
backend = TRANSCRIBE_BACKEND
print(f"[transcribe] start backend={backend}: {media_path}", flush=True)
# If paused, abort before any heavy work (no ffmpeg, no model load)
if transcribe_paused():
print(f"[pause] transcribe: pause active before heavy work; aborting {media_path}", flush=True)
raise PauseInterrupt("pause requested before start")
# 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases)
wav = extract_audio(media_path, TMP)
# Check again after extraction to avoid loading the model if a pause was requested meanwhile
if transcribe_paused():
try:
if wav.exists():
wav.unlink()
except Exception:
pass
print(f"[pause] transcribe: pause activated; stopping before model load for {media_path}", flush=True)
raise PauseInterrupt("pause requested after extract")
title = media_path.stem
base = TRN / title
resume_enabled = (backend != "openai") and WHISPER_RESUME
# Resume support: if a partial checkpoint exists, load it and trim input
resume_segments = []
resume_offset = 0.0
language_hint = None
if resume_enabled:
pjson, ptxt = _partial_paths(title)
if pjson.exists():
try:
pdata = json.loads(pjson.read_text(encoding="utf-8"))
resume_segments = pdata.get("segments", []) or []
if resume_segments:
resume_offset = float(resume_segments[-1].get("end", 0.0))
language_hint = pdata.get("language")
print(f"[whisper] resuming from ~{resume_offset:.2f}s with {len(resume_segments)} segments", flush=True)
except Exception as e:
print(f"[whisper] failed to load partial: {e}", flush=True)
# If resuming, trim WAV from last end time
if resume_enabled and resume_offset > 0.0:
wav_for_run = trim_wav(wav, resume_offset, TMP)
else:
wav_for_run = wav
# 2) Language selection
lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE
if language_hint and WHISPER_LANGUAGE.lower() == "auto":
# carry hint forward if available
lang = language_hint
# 3) Transcribe (local Whisper or OpenAI backend)
payload = None
if backend == "openai":
segments, info, payload = run_transcribe_openai(wav_for_run, lang)
else:
segments, info = run_transcribe_with_fallback(wav_for_run, lang)
# Determine duration for progress; use full WAV duration for consistent % regardless of resume
dur = media_duration_seconds(wav) or 0.0
# Start wall clock timer for speed/ETA
start_wall = time.time()
if resume_enabled and resume_offset and dur and resume_offset >= dur:
print(f"[whisper] resume offset {resume_offset:.2f}s >= duration {dur:.2f}s; resetting resume.", flush=True)
resume_offset = 0.0
last_pct = -1
segs = list(resume_segments) # start with what we already have
text_parts = [s.get("text","") for s in resume_segments]
# Walk new segments; shift their timestamps by resume_offset if trimmed
seg_count_since_save = 0
seg_index = len(resume_segments)
for s in segments:
seg_index += 1
start = (s.start or 0.0) + resume_offset
end = (s.end or 0.0) + resume_offset
seg = {"start": start, "end": end, "text": s.text}
segs.append(seg)
text_parts.append(s.text)
# --- Cooperative pause: save checkpoint and abort as soon as pause is requested ---
if resume_enabled and transcribe_paused():
try:
pct = int(min(100, max(0, (end / dur) * 100))) if dur > 0 else 0
except Exception:
pct = 0
_save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs)
log({
"status": "paused",
"path": str(media_path),
"title": title,
"progress": pct
})
print(f"[pause] transcribe: pause requested mid-run; aborting at ~{end:.2f}s for {media_path}", flush=True)
raise PauseInterrupt("pause requested")
if WHISPER_LOG_SEGMENTS:
print(f"[whisper] {start:8.2f}{end:8.2f} {s.text.strip()}", flush=True)
# progress logging every +5%
if dur > 0 and end is not None:
pct = int(min(100, max(0, (end / dur) * 100)))
if pct >= last_pct + 5:
log({
"status": "transcribing",
"path": str(media_path),
"title": title,
"progress": pct
})
last_pct = pct
# compute realtime speed and ETA for console logs
try:
elapsed = max(0.001, time.time() - start_wall)
processed = max(0.0, float(end))
speed = (processed / elapsed) if elapsed > 0 else 0.0 # seconds processed per second
# represent as X real-time factor
rtf = speed # 1.0 == real-time
eta = ((dur - processed) / speed) if (speed > 0 and dur > 0) else 0
print(f"[whisper] progress {pct:3d}% seg={seg_index:5d} rtf={rtf:0.2f}x eta={_fmt_eta(eta)}", flush=True)
# also mirror to feed log with speed/eta
try:
log({
"status": "transcribing",
"path": str(media_path),
"title": title,
"progress": pct,
"speed_rtf": round(rtf, 2),
"eta_sec": int(max(0, eta))
})
except Exception:
pass
except Exception:
pass
# periodic partial save
seg_count_since_save += 1
if resume_enabled and seg_count_since_save >= PARTIAL_SAVE_EVERY_SEGS:
_save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs)
seg_count_since_save = 0
# ensure we mark 100% on completion
if last_pct < 100:
log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100})
txt = " ".join(text_parts).strip()
# Write final transcript artifacts
(base.with_suffix(".json")).write_bytes(orjson.dumps({
"file": str(media_path),
"language": info.language,
"segments": segs
}))
(base.with_suffix(".txt")).write_text(txt, encoding="utf-8")
def fmt_ts(t):
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
for i,s in enumerate(segs,1):
srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n")
with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
vtt.write("WEBVTT\n\n")
for s in segs:
vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n")
# 4) Copy SRT next to media for Plex (language-suffixed)
try:
lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower()
srt_src = base.with_suffix(".srt")
srt_dst = media_path.with_suffix(f".{lang_code}.srt")
_safe_copy(srt_src, srt_dst)
except Exception as e:
print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True)
# Write Kodi/Plex-compatible NFO using enhanced metadata (same as before)
try:
fallback = {
"title": title,
"episode_title": title,
"show": media_path.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(media_path),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None)
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(media_path, meta, ttxt)
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
# Cleanup temp WAVs
try:
if wav_for_run != wav and wav_for_run.exists():
wav_for_run.unlink()
if wav.exists():
wav.unlink()
except Exception:
pass
# Remove partial checkpoints on success
if resume_enabled:
try:
pjson, ptxt = _partial_paths(title)
if pjson.exists(): pjson.unlink()
if ptxt.exists(): ptxt.unlink()
except Exception:
pass
# Final average speed over whole transcription
try:
total_elapsed = max(0.001, time.time() - start_wall)
avg_rtf = (dur / total_elapsed) if total_elapsed > 0 else 0.0
print(f"[whisper] avg speed ~{avg_rtf:0.2f}x (audio_seconds / wall_seconds)", flush=True)
except Exception:
pass
print(
f"[transcribe] backend={backend} finished: {media_path} lang={info.language} segments={len(segs)} dur={dur:.2f}s",
flush=True,
)
return base
# --- Meilisearch helpers ---
def _safe_doc_id(s: str) -> str:
"""
Meilisearch document IDs must be [A-Za-z0-9_-]. Convert the title to a safe slug.
If the result is empty, fall back to a short SHA1 hash.
"""
import hashlib
slug = re.sub(r"\s+", "_", (s or "").strip())
slug = re.sub(r"[^A-Za-z0-9_-]", "", slug)
if not slug:
slug = hashlib.sha1((s or "").encode("utf-8", errors="ignore")).hexdigest()[:16]
return slug
def ensure_meili_index():
"""Create index 'library' with primaryKey 'id' if it does not already exist."""
try:
r = requests.get(f"{MEILI_URL}/indexes/library",
headers={"Authorization": f"Bearer {MEILI_KEY}"}, timeout=10)
if r.status_code == 200:
return
# Attempt to create it
cr = requests.post(
f"{MEILI_URL}/indexes",
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type": "application/json"},
data=orjson.dumps({"uid": "library", "primaryKey": "id"}),
timeout=10,
)
# Ignore errors if another process created it first
try:
cr.raise_for_status()
except Exception:
pass
except Exception:
# Non-fatal; indexing will fail later if the index truly doesn't exist
pass
def index_meili(json_path: Path):
# Make sure the index exists and is configured with a primary key
ensure_meili_index()
doc = json.loads(open(json_path, "r", encoding="utf-8").read())
file_field = doc.get("file", "")
title = Path(file_field).stem if file_field else json_path.stem
# Build a Meili-safe document ID
doc_id = _safe_doc_id(title)
# Extract a YYYYMMDD date if present
m = re.search(r"\b(\d{8})\b", title)
date = m.group(1) if m else ""
payload = {
"id": doc_id,
"type": "podcast",
"title": title,
"date": date,
"source": str(Path(LIB, Path(file_field or title).name)),
"text": " ".join(s.get("text", "") for s in doc.get("segments", [])),
"segments": doc.get("segments", []),
"meta": {"language": doc.get("language", "")},
}
for attempt in range(5):
try:
r = requests.post(
f"{MEILI_URL}/indexes/library/documents",
headers={
"Authorization": f"Bearer {MEILI_KEY}",
"Content-Type": "application/json",
},
data=orjson.dumps(payload),
timeout=15,
)
r.raise_for_status()
break
except Exception:
if attempt == 4:
raise
time.sleep(2 * (attempt + 1))
import tldextract, trafilatura, requests as _requests
def slugify(text):
text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_')
return text[:120] or 'page'
def _norm(s: str | None) -> str:
"""Normalize strings for stable comparisons across Unicode lookalikes and stray whitespace."""
if s is None:
return ""
try:
return unicodedata.normalize("NFKC", s).strip()
except Exception:
return (s or "").strip()
def save_web_snapshot(url: str):
r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"})
r.raise_for_status()
html = r.text
downloaded = trafilatura.load_html(html, url=url)
text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or ""
meta = trafilatura.metadata.extract_metadata(downloaded) or None
title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S).group(1).strip() if re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S) else url)
date = (meta.date if meta and getattr(meta, 'date', None) else "")
parts = tldextract.extract(url)
domain = ".".join([p for p in [parts.domain, parts.suffix] if p])
slug = slugify(title)
outdir = LIB / "web" / domain
outdir.mkdir(parents=True, exist_ok=True)
base = outdir / slug
open(base.with_suffix(".html"), "w", encoding="utf-8", errors="ignore").write(html)
open(base.with_suffix(".txt"), "w", encoding="utf-8", errors="ignore").write(text)
return base, title, domain, date, text
def index_web(base: Path, title: str, domain: str, date: str, text: str, url: str):
payload = {
"id": f"web:{domain}:{base.stem}",
"type": "web",
"title": title,
"date": re.sub(r'[^0-9]', '', date)[:8] if date else "",
"source": f"file://{str(base.with_suffix('.html'))}",
"text": text,
"segments": [],
"meta": {"url": url, "domain": domain}
}
r = requests.post(f"{MEILI_URL}/indexes/library/documents",
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"},
data=orjson.dumps(payload))
r.raise_for_status()
def is_media_url(url: str):
lowered = url.lower()
media_hosts = ["youtube.com","youtu.be","rumble.com","vimeo.com","soundcloud.com","spotify.com","podbean.com","buzzsprout.com"]
return any(h in lowered for h in media_hosts)
def owui_headers():
return {"Authorization": f"Bearer {OWUI_KEY}"} if OWUI_KEY else {}
def _owui_metadata_template_payload():
"""Return the metadata template payload to apply when auto-fix is enabled."""
if not OWUI_METADATA_TEMPLATE_JSON:
return {}
try:
return json.loads(OWUI_METADATA_TEMPLATE_JSON)
except Exception:
# Treat value as a raw string template if parsing fails
return OWUI_METADATA_TEMPLATE_JSON
def owui_fix_metadata_template(kb_id: str, force: bool = False) -> bool:
"""Ensure the target knowledge base has a safe metadata template.
Attempts PATCH/PUT with either a user-provided template or an empty object.
Returns True if an update succeeded; False otherwise.
"""
if not OWUI_AUTO_FIX_METADATA or not OWUI_URL or not OWUI_KEY or not kb_id:
return False
if not force and kb_id in _OWUI_TEMPLATE_PATCHED:
return False
payload_variants: list[object] = []
template_payload = _owui_metadata_template_payload()
payload_variants.append({"metadata_template": template_payload})
if template_payload not in ({}, "", None):
payload_variants.append({"metadata_template": {}})
payload_variants.append({"metadata_template": None})
headers = {**owui_headers(), "Content-Type": "application/json"}
url = f"{OWUI_URL}/api/v1/knowledge/{kb_id}"
success_codes = {200, 201, 202, 204}
for payload in payload_variants:
try:
body = orjson.dumps(payload)
except Exception:
body = json.dumps(payload).encode("utf-8")
for method in ("PATCH", "PUT"):
try:
resp = requests.request(method, url, headers=headers, data=body, timeout=15)
except Exception:
continue
if resp.status_code in success_codes:
print(f"[owui] metadata template adjusted via {method} for KB {kb_id}", flush=True)
_OWUI_TEMPLATE_PATCHED.add(kb_id)
return True
return False
# ---------- Media normalisation helpers ----------
VIDEO_ENCODER_MAP = {
"hevc": "libx265",
"h265": "libx265",
"h.265": "libx265",
"h264": "libx264",
"h.264": "libx264",
"av1": "libaom-av1",
}
AUDIO_ENCODER_MAP = {
"mp3": "libmp3lame",
"libmp3lame": "libmp3lame",
"aac": "aac",
"libfdk_aac": "libfdk_aac",
"opus": "libopus",
"flac": "flac",
}
def _resolve_video_encoder(codec: str) -> str:
key = (codec or "").lower()
return VIDEO_ENCODER_MAP.get(key, codec or "libx265")
def _resolve_audio_encoder(codec: str) -> str:
key = (codec or "").lower()
return AUDIO_ENCODER_MAP.get(key, codec or "libmp3lame")
def _sanitize_video_preset(encoder: str) -> str | None:
preset = (VIDEO_NORMALIZE_PRESET or "").strip()
if not preset:
return None
enc = (encoder or "").lower()
pl = preset.lower()
if "nvenc" in enc:
allowed = {"p1", "p2", "p3", "p4", "p5", "p6", "p7"}
return pl if pl in allowed else "p5"
if enc in {"libx265", "libx264"}:
allowed = {"ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow", "placebo"}
if pl in allowed:
return pl
if pl.startswith("p") and pl[1:].isdigit():
return "medium"
return "medium"
if enc == "libaom-av1":
allowed = {"good", "best", "realtime"}
if pl in allowed:
return pl
return "good"
return preset
def _parse_ffmpeg_ts(ts: str) -> float:
ts = ts.strip()
if not ts:
return 0.0
try:
parts = ts.split(":")
if len(parts) == 3:
h, m, s = parts
return int(h) * 3600 + int(m) * 60 + float(s)
if len(parts) == 2:
m, s = parts
return int(m) * 60 + float(s)
return float(ts)
except Exception:
return 0.0
def _ffmpeg_run_with_progress(cmd: list[str], duration: float, label: str) -> None:
if not cmd:
raise ValueError("ffmpeg command is empty")
output_path = cmd[-1]
cmd_progress = cmd[:-1] + ["-progress", "pipe:1", "-nostats", output_path]
start = time.time()
last_pct = -5
processed_sec = 0.0
captured: deque[str] = deque(maxlen=100)
proc = subprocess.Popen(
cmd_progress,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
try:
assert proc.stdout is not None
for raw_line in proc.stdout:
line = raw_line.strip()
if not line:
continue
captured.append(line)
if line.startswith("out_time="):
processed_sec = _parse_ffmpeg_ts(line.split("=", 1)[1])
if duration > 0:
pct = int(min(100, max(0, (processed_sec / duration) * 100)))
if pct >= last_pct + 5:
elapsed = max(0.001, time.time() - start)
speed = processed_sec / elapsed if elapsed > 0 else 0.0
remaining = max(0.0, duration - processed_sec)
eta = remaining / speed if speed > 0 else 0.0
print(
f"[normalize] progress {pct:3d}% {label} rtf={speed:0.2f}x eta={_fmt_eta(eta)}",
flush=True,
)
last_pct = pct
elif line.startswith("progress=") and line.endswith("end"):
break
finally:
proc.wait()
if proc.returncode != 0:
output = "\n".join(captured)
raise subprocess.CalledProcessError(proc.returncode, cmd_progress, output)
def _ffprobe_streams(path: Path) -> dict[str, str]:
try:
out = subprocess.check_output(
["ffprobe", "-v", "error", "-show_entries", "stream=codec_type,codec_name", "-of", "json", str(path)],
text=True,
)
data = json.loads(out)
except Exception:
return {}
info: dict[str, str] = {"video": "", "audio": ""}
for stream in data.get("streams", []) or []:
ctype = (stream.get("codec_type") or "").lower()
cname = (stream.get("codec_name") or "").lower()
if ctype == "video" and not info["video"]:
info["video"] = cname
elif ctype == "audio" and not info["audio"]:
info["audio"] = cname
return info
def _unique_backup_path(path: Path) -> Path:
base = path.name
candidate = path.parent / f"{base}.orig"
if not candidate.exists():
return candidate
counter = 1
while True:
candidate = path.parent / f"{base}.orig{counter}"
if not candidate.exists():
return candidate
counter += 1
def _safe_copy(src: Path, dst: Path) -> None:
try:
if src.resolve(strict=False) == dst.resolve(strict=False):
return
except Exception:
if os.path.abspath(src) == os.path.abspath(dst):
return
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
def _is_sidecar_name(name: str, base_stem: str, base_name: str) -> bool:
exact_suffixes = [".info.json", ".nfo", ".jpg", ".jpeg", ".png", ".webp", ".prov.json"]
for suf in exact_suffixes:
if name == f"{base_name}{suf}" or name == f"{base_stem}{suf}":
return True
text_exts = {".srt", ".vtt", ".txt", ".json", ".md"}
for ext in text_exts:
if name == f"{base_stem}{ext}" or name == f"{base_name}{ext}":
return True
if name.startswith(f"{base_stem}.") and name.endswith(ext):
return True
if name.startswith(f"{base_name}.") and name.endswith(ext):
return True
return False
def rename_media_sidecars(src: Path, dst: Path, skip: set[Path] | None = None) -> None:
if src == dst:
return
skip = skip or set()
parent = src.parent
stem_src, stem_dst = src.stem, dst.stem
name_src, name_dst = src.name, dst.name
for f in list(parent.glob("*")):
if not f.exists() or f == src or f == dst or f in skip:
continue
new_name = None
fname = f.name
if not _is_sidecar_name(fname, stem_src, name_src):
continue
if fname.startswith(name_src):
new_name = name_dst + fname[len(name_src):]
elif fname.startswith(stem_src):
new_name = stem_dst + fname[len(stem_src):]
if not new_name:
continue
target = parent / new_name
if target.exists():
continue
try:
f.rename(target)
except Exception:
pass
def _finalize_normalized_output(original: Path, final_path: Path, tmp_path: Path) -> Path:
if final_path.exists():
try:
final_path.unlink()
except Exception:
pass
if MEDIA_NORMALIZE_KEEP_ORIGINAL:
try:
backup = _unique_backup_path(original)
if original.exists():
original.rename(backup)
except Exception as e:
print(f"[normalize] could not preserve original for {original}: {e}", flush=True)
try:
if original.exists():
original.unlink()
except Exception:
pass
else:
try:
if original.exists():
original.unlink()
except Exception:
pass
os.replace(tmp_path, final_path)
return final_path
def _normalize_video_file(path: Path, info: dict[str, str]) -> Path:
current_codec = (info.get("video") or "").lower()
ext_match = path.suffix.lower() == VIDEO_NORMALIZE_EXTENSION
if current_codec == VIDEO_NORMALIZE_CODEC and ext_match:
return path
if current_codec.startswith("av1") or current_codec in {"libaom-av1", "av01"}:
print(f"[normalize] skip: {path.name} already AV1; leaving as-is", flush=True)
return path
encoder = _resolve_video_encoder(VIDEO_NORMALIZE_CODEC)
final_path = path if ext_match else path.with_suffix(VIDEO_NORMALIZE_EXTENSION)
tmp_path = final_path.parent / f"{final_path.stem}.tmp{VIDEO_NORMALIZE_EXTENSION}"
if tmp_path.exists():
tmp_path.unlink()
duration = media_duration_seconds(path)
def build_cmd(v_encoder: str) -> list[str]:
cmd = [
"ffmpeg", "-nostdin", "-y",
"-i", str(path),
"-map", "0",
"-c:v", v_encoder,
]
preset_val = _sanitize_video_preset(v_encoder)
if preset_val:
cmd.extend(["-preset", preset_val])
if VIDEO_NORMALIZE_TUNE:
cmd.extend(["-tune", VIDEO_NORMALIZE_TUNE])
if VIDEO_NORMALIZE_CRF:
cmd.extend(["-crf", VIDEO_NORMALIZE_CRF])
if info.get("audio"):
if VIDEO_NORMALIZE_AUDIO_CODEC == "copy":
cmd.extend(["-c:a", "copy"])
else:
cmd.extend(["-c:a", _resolve_audio_encoder(VIDEO_NORMALIZE_AUDIO_CODEC)])
if VIDEO_NORMALIZE_AUDIO_BITRATE:
cmd.extend(["-b:a", VIDEO_NORMALIZE_AUDIO_BITRATE])
else:
cmd.append("-an")
cmd.extend(["-c:s", "copy", str(tmp_path)])
return cmd
def fallback_encoder() -> str:
base = VIDEO_NORMALIZE_CODEC.lower()
if base.startswith("hevc") or base.startswith("h265"):
return "libx265"
if base.startswith("h264") or base.startswith("avc"):
return "libx264"
if base.startswith("av1"):
return "libaom-av1"
return "libx265"
attempted = []
encoders_to_try = [encoder]
lower_encoder = encoder.lower()
if any(token in lower_encoder for token in ("nvenc", "qsv", "cuda", "amf")):
cpu_encoder = fallback_encoder()
if cpu_encoder not in encoders_to_try:
encoders_to_try.append(cpu_encoder)
for idx, enc in enumerate(encoders_to_try):
try:
print(f"[normalize] video -> {final_path.name} codec={enc}", flush=True)
_ffmpeg_run_with_progress(build_cmd(enc), duration, final_path.name)
rename_media_sidecars(path, final_path, skip={tmp_path})
return _finalize_normalized_output(path, final_path, tmp_path)
except subprocess.CalledProcessError as e:
attempted.append((enc, e))
if tmp_path.exists():
tmp_path.unlink()
if idx == len(encoders_to_try) - 1:
details = ", ".join(f"{enc}: {err}" for enc, err in attempted)
raise RuntimeError(f"ffmpeg video normalize failed ({details})")
else:
print(f"[normalize] encoder {enc} failed ({e}); retrying with CPU fallback", flush=True)
return path
def _normalize_audio_file(path: Path, info: dict[str, str]) -> Path:
current_codec = (info.get("audio") or "").lower()
ext_match = path.suffix.lower() == AUDIO_NORMALIZE_EXTENSION
target_encoder = _resolve_audio_encoder(AUDIO_NORMALIZE_CODEC)
equivalent_codecs = {AUDIO_NORMALIZE_CODEC.lower(), target_encoder.lower()}
if target_encoder.lower() == "libmp3lame":
equivalent_codecs.add("mp3")
if target_encoder.lower() in {"aac", "libfdk_aac"}:
equivalent_codecs.update({"aac", "mp4a"})
if current_codec in equivalent_codecs and ext_match:
return path
final_path = path if ext_match else path.with_suffix(AUDIO_NORMALIZE_EXTENSION)
tmp_path = final_path.parent / f"{final_path.stem}.tmp{AUDIO_NORMALIZE_EXTENSION}"
if tmp_path.exists():
tmp_path.unlink()
duration = media_duration_seconds(path)
cmd = [
"ffmpeg", "-nostdin", "-y",
"-i", str(path),
"-vn",
"-c:a", target_encoder,
]
if AUDIO_NORMALIZE_BITRATE:
cmd.extend(["-b:a", AUDIO_NORMALIZE_BITRATE])
if AUDIO_NORMALIZE_CHANNELS:
cmd.extend(["-ac", AUDIO_NORMALIZE_CHANNELS])
cmd.append(str(tmp_path))
print(f"[normalize] audio -> {final_path.name} codec={AUDIO_NORMALIZE_CODEC}", flush=True)
try:
_ffmpeg_run_with_progress(cmd, duration, final_path.name)
except subprocess.CalledProcessError as e:
if tmp_path.exists():
tmp_path.unlink()
raise RuntimeError(f"ffmpeg audio normalize failed: {e}")
rename_media_sidecars(path, final_path, skip={tmp_path})
return _finalize_normalized_output(path, final_path, tmp_path)
def normalize_media_file(path: Path) -> Path:
if not MEDIA_NORMALIZE or not path.exists() or not path.is_file():
return path
try:
info = _ffprobe_streams(path)
except Exception as e:
print(f"[normalize] ffprobe failed for {path}: {e}", flush=True)
return path
try:
if info.get("video"):
return _normalize_video_file(path, info)
if info.get("audio"):
return _normalize_audio_file(path, info)
except Exception as e:
print(f"[normalize] failed for {path}: {e}", flush=True)
return path
def owui_get_or_create_kb():
"""Return a KB id for OWUI_KB without creating duplicates.
Honors OPENWEBUI_KB_ID, and tolerates both list and {"data": ...} response shapes.
"""
if not OWUI_URL or not OWUI_KEY:
return None
# 1) If an explicit id is provided, trust it
forced = os.getenv("OPENWEBUI_KB_ID", "").strip()
if forced:
return forced
# 2) List and try to find an exact name match
try:
r = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
r.raise_for_status()
body = r.json()
items = body if isinstance(body, list) else body.get("data", [])
# Prefer exact normalized name match; if multiple, pick the most recently updated
kb_target = _norm(OWUI_KB)
matches = [kb for kb in items if _norm(kb.get("name")) == kb_target]
if matches:
try:
matches.sort(key=lambda k: k.get("updated_at") or 0, reverse=True)
except Exception:
pass
return matches[0].get("id")
except Exception:
pass
# 3) Create only if not found
cr = requests.post(
f"{OWUI_URL}/api/v1/knowledge/create",
headers={**owui_headers(), "Content-Type": "application/json"},
data=orjson.dumps({"name": OWUI_KB, "description": "All local content indexed by podx"}),
timeout=15,
)
cr.raise_for_status()
created = cr.json()
if isinstance(created, dict) and created.get("id"):
return created["id"]
if isinstance(created, dict) and created.get("data") and created["data"].get("id"):
return created["data"]["id"]
# Fallback: try to resolve again by name
try:
rr = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
rr.raise_for_status()
body = rr.json()
items = body if isinstance(body, list) else body.get("data", [])
kb_target = _norm(OWUI_KB)
for kb in items:
if _norm(kb.get("name")) == kb_target:
return kb.get("id")
except Exception:
pass
return None
def owui_upload_and_attach(path: Path, kb_id: str):
if OWUI_AUTO_FIX_METADATA:
owui_fix_metadata_template(kb_id)
with open(path, "rb") as f:
r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10)
r.raise_for_status()
up = r.json()
file_id = (up.get("id") or (up.get("data") or {}).get("id"))
if not file_id:
raise RuntimeError(f"OWUI upload: could not get file id from response: {up}")
payload = {"file_id": file_id}
attach_headers = {**owui_headers(), "Content-Type": "application/json"}
body = orjson.dumps(payload)
r = requests.post(
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
headers=attach_headers,
data=body,
timeout=180,
)
if r.status_code == 400 and OWUI_AUTO_FIX_METADATA:
txt = ""
try:
txt = r.text.lower()
except Exception:
txt = str(r.content).lower()
if "metadata" in txt and owui_fix_metadata_template(kb_id, force=True):
r = requests.post(
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
headers=attach_headers,
data=body,
timeout=180,
)
r.raise_for_status()
try:
time.sleep(0.5)
except Exception:
pass
return True
def publish_to_openwebui(paths):
if not OWUI_URL or not OWUI_KEY:
return
try:
kb_id = owui_get_or_create_kb()
if not kb_id:
print("[owui] KB resolve failed; skipping attach to avoid accidental duplicates", flush=True)
return
owui_fix_metadata_template(kb_id)
for p in paths:
p = Path(p)
if not p.exists():
continue
try:
owui_upload_and_attach(p, kb_id)
except Exception as e:
log({"url": str(p), "status": "owui_error", "error": str(e)})
except Exception as e:
log({"status": "owui_error", "error": str(e)})
# --------- Post-transcribe pipeline and job/queue helpers ---------
def _postprocess_after_transcribe(media_path: Path, base: Path):
"""Common steps after we have a `base` transcript path: index, publish, NFO, artwork."""
try:
index_meili(base.with_suffix(".json"))
except Exception as e:
print(f"[post] meili index failed: {e}", flush=True)
try:
publish_to_openwebui([base.with_suffix(".txt")])
except Exception as e:
print(f"[post] owui publish failed: {e}", flush=True)
# Build metadata using existing helper
try:
title = media_path.stem
fallback = {
"title": title,
"episode_title": title,
"show": media_path.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(media_path),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None)
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(media_path, meta, ttxt)
try:
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
def transcribe_job(path_str: str):
"""RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'."""
# Do NOT block when paused; exit quickly so CPU-heavy work stops ASAP.
if transcribe_paused():
print(f"[pause] transcribe_job: pause is active; skipping start for {path_str}", flush=True)
return "paused"
p = Path(path_str)
if not p.exists():
# Underlying file moved or deleted; skip gracefully instead of crashing ffmpeg.
try:
log({"path": path_str, "status": "skip", "reason": "file_not_found"})
except Exception:
pass
print(f"[transcribe] missing input; skipping job: {p}", flush=True)
return "missing"
try:
base = transcribe(p)
except PauseInterrupt:
print(f"[pause] transcribe_job: paused mid-run for {p}", flush=True)
return "paused"
_postprocess_after_transcribe(p, base)
return str(base)
def enqueue_transcribe(path: Path) -> bool:
"""Enqueue a transcription job to the 'transcribe' queue. Returns True on success."""
try:
if transcribe_paused():
print(f"[queue] pause flag present; enqueuing job for {path} but workers will wait", flush=True)
conn = Redis.from_url(REDIS_URL)
q = Queue("transcribe", connection=conn, default_timeout=60*60*24)
# Use dotted path so workers in other processes can import
q.enqueue("worker.transcribe_job", str(path), job_timeout=60*60*24)
print(f"[queue] enqueued transcribe job for {path}", flush=True)
return True
except Exception as e:
print(f"[queue] enqueue failed, will transcribe inline: {e}", flush=True)
return False
def handle_local_file(path_str: str):
"""Transcribe & index a local media file that already exists in /library.
If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper.
Safe to call repeatedly; it skips if transcript JSON already exists.
"""
try:
p = Path(path_str)
if not p.exists():
log({"url": path_str, "status": "error", "error": "file_not_found"})
return
normalized = normalize_media_file(p)
if normalized != p:
print(f"[normalize] local media: {p.name} -> {normalized.name}", flush=True)
p = normalized
path_str = str(p)
if WORKER_MODE == "transcribe":
print(f"[mode] transcribe-only worker handling local file: {p}", flush=True)
title = p.stem
base_json = TRN / f"{title}.json"
if base_json.exists():
log({"url": path_str, "status": "skip", "reason": "already_transcribed"})
return
info = {"url": path_str, "status": "transcribing", "title": title,
"uploader": p.parent.name, "date": "", "path": str(p), "progress": 0}
log(info)
# 0) Try RSS resolver first: if episode with transcript exists, use it (skip Whisper)
try:
ep = match_media_to_rss(p)
except Exception as _e:
ep = None
if ep:
base = use_rss_transcript(p, ep)
if base:
index_meili(base.with_suffix(".json"))
publish_to_openwebui([base.with_suffix(".txt")])
log({**info, **{"status": "done", "note": "used_rss_transcript"}})
return
# 1) Prefer an existing transcript sidecar if present
sidecar = find_sidecar_transcript(p)
if sidecar:
plain = transcript_text_from_file(sidecar)
lang = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
base = write_plain_transcript(p, plain, language=lang)
ensure_sidecar_next_to_media(sidecar, p, lang=lang)
index_meili(base.with_suffix(".json"))
publish_to_openwebui([base.with_suffix(".txt")])
try:
# Use info.json (if present) to enrich metadata
fallback = {
"title": title,
"episode_title": title,
"show": p.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(p),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(p, p.parent.name, fallback, ep=None)
ttxt = base.with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(p, meta, ttxt)
# Try to fetch and save artwork locally
try:
save_episode_artwork(meta.get("image"), p, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
log({**info, **{"status": "done", "note": "used_existing_transcript"}})
return
# 1.5) Reuse a transcript that exists in the repository for a matching episode
repo_json = find_repo_transcript_for_media(p)
if repo_json:
base = reuse_repo_transcript(p, repo_json)
if base:
index_meili(base.with_suffix(".json"))
publish_to_openwebui([base.with_suffix(".txt")])
try:
data = json.loads((base.with_suffix(".json")).read_text(encoding="utf-8"))
# Start with repo metadata, then enrich from yt-dlp info.json if any
meta_repo = {
"title": data.get("title") or title,
"episode_title": data.get("title") or title,
"show": data.get("show") or p.parent.name,
"description": data.get("description") or "",
"pubdate": data.get("pubdate") or _extract_date_from_stem(title),
"duration_sec": media_duration_seconds(p),
"image": data.get("image"),
"guid": data.get("guid") or data.get("id"),
}
meta = build_meta_from_sources(p, p.parent.name, meta_repo, ep=None)
ttxt = base.with_suffix(".txt").read_text(encoding="utf-8")
write_episode_nfo(p, meta, ttxt)
try:
save_episode_artwork(meta.get("image"), p, meta.get("show"))
except Exception:
pass
except Exception as e:
print(f"[post] NFO write failed: {e}", flush=True)
log({**info, **{"status": "done", "note": "reused_repo_transcript"}})
return
# 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker)
# If paused, do not block; either enqueue (so worker will pause) or skip now.
if transcribe_paused():
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
log({**info, **{"status": "queued_transcribe"}})
return
log({**info, **{"status": "paused"}})
print(f"[pause] handle_local_file: pause active; not starting {p}", flush=True)
return
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
log({**info, **{"status": "queued_transcribe"}})
return
base = transcribe(p)
_postprocess_after_transcribe(p, base)
log({**info, **{"status": "done"}})
except Exception as e:
log({"url": path_str, "status": "error", "error": str(e)})
raise
# --- Refresh sidecar metadata and subtitles for an already-downloaded media file ---
def refresh_media(path_str: str):
"""
Refresh sidecar metadata (info.json, thumbnail) and subtitles for an already-downloaded media file.
Requires a companion .info.json next to the media (to supply the original URL). No media re-download.
"""
try:
p = Path(path_str)
if not p.exists() or not p.is_file():
log({"url": path_str, "status": "error", "error": "file_not_found"})
return
normalized = normalize_media_file(p)
if normalized != p:
print(f"[normalize] refresh media: {p.name} -> {normalized.name}", flush=True)
p = normalized
path_str = str(p)
# Locate existing info.json to get the original URL
info_json = None
for cand in [p.parent / f"{p.name}.info.json", p.parent / f"{p.stem}.info.json"]:
if cand.exists():
info_json = cand
break
if not info_json:
log({"path": str(p), "status": "refresh-skip", "reason": "no_info_json"})
print(f"[refresh] skip: no info.json next to {p}", flush=True)
return
info = load_info_json(info_json) or {}
url = info.get("webpage_url") or info.get("original_url") or info.get("url")
if not url:
log({"path": str(p), "status": "refresh-skip", "reason": "no_url_in_info"})
print(f"[refresh] skip: no URL in {info_json}", flush=True)
return
# Prepare yt-dlp command to refresh sidecars only, writing files exactly next to the media
outtmpl = str(p.with_suffix(".%(ext)s"))
sub_langs = os.getenv("YTDLP_SUBS_LANGS", "en.*,en")
cmd = [
"yt-dlp",
"--skip-download",
"--write-info-json",
"--write-thumbnail",
"--convert-thumbnails", "jpg",
"--write-subs", "--write-auto-subs",
"--sub-langs", sub_langs,
"--convert-subs", "srt",
"-o", outtmpl,
url,
]
print(f"[refresh] refreshing sidecars for {p} via yt-dlp", flush=True)
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError as e:
print(f"[refresh] yt-dlp failed: {e}", flush=True)
raise
# Ensure language-suffixed SRT exists (Plex-friendly) if any subs were fetched
try:
# Pick any .srt just fetched that matches base
for s in p.parent.glob(f"{p.stem}*.srt"):
# If it's already lang-suffixed, keep; also copy to .en.srt when only plain .srt exists
if s.name == f"{p.stem}.srt":
_safe_copy(s, p.with_suffix(".en.srt"))
except Exception:
pass
# Rebuild NFO using fresh info.json (and RSS if available)
try:
# Try RSS match to enrich metadata (non-fatal if not present)
ep = None
try:
ep = match_media_to_rss(p)
except Exception:
ep = None
fallback = {
"title": p.stem,
"episode_title": p.stem,
"show": p.parent.name,
"description": "",
"pubdate": _extract_date_from_stem(p.stem),
"duration_sec": media_duration_seconds(p),
"image": "",
"guid": "",
}
meta = build_meta_from_sources(p, p.parent.name, fallback, ep)
# Save local artwork too
try:
save_episode_artwork(meta.get("image"), p, meta.get("show"))
except Exception:
pass
# If a transcript already exists, include it in the NFO plot preview
ttxt_path = (TRN / p.stem).with_suffix(".txt")
ttxt = ttxt_path.read_text(encoding="utf-8") if ttxt_path.exists() else None
write_episode_nfo(p, meta, ttxt)
except Exception as e:
print(f"[refresh] NFO/artwork update failed: {e}", flush=True)
log({"path": str(p), "status": "refresh-done"})
print(f"[refresh] done for {p}", flush=True)
except Exception as e:
log({"path": path_str, "status": "error", "error": str(e)})
raise
def handle_web(url: str):
if not _mode_allows("web"):
log({"url": url, "status": "skip", "reason": "mode_transcribe_only"})
print(f"[mode] transcribe-only: skipping web snapshot job: {url}", flush=True)
return
info = {"url": url, "status":"web-downloading", "title":"", "uploader":"", "date":"", "path":""}
log(info)
base, title, domain, date, text = save_web_snapshot(url)
info.update({"title": title, "uploader": domain, "date": date, "path": str(base.with_suffix('.html'))})
log({**info, **{"status":"web-indexing"}})
index_web(base, title, domain, date, text, url)
push = [p for p in [base.with_suffix('.txt'), base.with_suffix('.html')] if p.exists()]
publish_to_openwebui(push)
log({**info, **{"status":"done"}})
def handle_url(url: str):
try:
# In transcribe-only mode, refuse non-local/download jobs
if not _mode_allows("download"):
# Only permit local file paths in this mode
if url.startswith("/") or url.startswith("file://"):
return handle_local_file(url[7:] if url.startswith("file://") else url)
log({"url": url, "status": "skip", "reason": "mode_transcribe_only"})
print(f"[mode] transcribe-only: skipping non-local job: {url}", flush=True)
return
# If a local file path (or file:// URL) is provided, process it directly
if url.startswith("file://"):
return handle_local_file(url[7:])
if url.startswith("/") and Path(url).exists():
return handle_local_file(url)
if not is_media_url(url):
handle_web(url)
return
info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""}
log({**info, **{"status":"downloading"}})
files = yt_dlp(url, TMP)
for f in files:
parts = f.relative_to(TMP).parts
uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown"
dest_dir = LIB / uploader
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / sanitize(f.name)
shutil.move(str(f), dest)
# Move companion files produced by yt-dlp (info.json, thumbnail, subtitles)
try:
companions = find_companion_files(f)
# info.json -> prefer "<dest.name>.info.json", fallback to "<dest.stem>.info.json"
if companions.get("info") and companions["info"].exists():
dest_info = dest.parent / f"{dest.name}.info.json"
try:
shutil.move(str(companions["info"]), dest_info)
except Exception:
# fallback naming without extension
dest_info2 = dest.parent / f"{dest.stem}.info.json"
try:
shutil.move(str(companions['info']), dest_info2)
except Exception:
pass
# thumbnail -> "<dest>.jpg"
if companions.get("thumb") and companions["thumb"].exists():
try:
shutil.move(str(companions["thumb"]), str(dest.with_suffix(".jpg")))
except Exception:
pass
# subtitles -> preserve language suffix: "<dest.stem><suffix>"
for s in companions.get("subs", []):
if not s.exists():
continue
suffix_tail = ""
s_name = s.name
f_stem = f.stem
if s_name.startswith(f_stem):
suffix_tail = s_name[len(f_stem):] # includes leading dot if present
else:
suffix_tail = s.suffix
dest_sub = dest.parent / f"{dest.stem}{suffix_tail}"
try:
shutil.move(str(s), str(dest_sub))
except Exception:
pass
except Exception:
pass
normalized_dest = normalize_media_file(dest)
if normalized_dest != dest:
print(f"[normalize] download media: {dest.name} -> {normalized_dest.name}", flush=True)
dest = normalized_dest
info.update({"title": dest.stem, "uploader": uploader,
"date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""),
"path": str(dest)})
log({**info, **{"status":"transcribing", "progress": 0}})
# Try RSS transcript resolver first
ep = None
try:
ep = match_media_to_rss(dest)
except Exception:
ep = None
if ep:
base = use_rss_transcript(dest, ep)
else:
base = None
# 1.5) If we didn't get an RSS transcript and there is a matching one already in the repo, reuse it
if not base:
repo_json = find_repo_transcript_for_media(dest)
if repo_json:
base = reuse_repo_transcript(dest, repo_json)
if not base:
# If paused, do not block; either enqueue (so worker will pause) or skip now.
if transcribe_paused():
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
log({**info, **{"status": "queued_transcribe"}})
continue
log({**info, **{"status": "paused"}})
print(f"[pause] handle_url: pause active; not starting {dest}", flush=True)
continue
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
log({**info, **{"status": "queued_transcribe"}})
continue
base = transcribe(dest)
_postprocess_after_transcribe(dest, base)
log({**info, **{"status":"done"}})
except Exception as e:
log({"url": url, "status":"error", "error": str(e)})
raise