2428 lines
96 KiB
Python
2428 lines
96 KiB
Python
import os, subprocess, shutil, json, re, orjson, requests, unicodedata
|
||
from types import SimpleNamespace
|
||
from rq import Queue
|
||
from redis import Redis
|
||
from pathlib import Path
|
||
import math
|
||
import difflib
|
||
import time
|
||
from collections import deque
|
||
from faster_whisper import WhisperModel
|
||
|
||
from xml.sax.saxutils import escape as xml_escape
|
||
|
||
MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700")
|
||
MEILI_KEY = os.getenv("MEILI_KEY", "")
|
||
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
|
||
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
|
||
TMP = Path(os.getenv("TMP_ROOT", "/tmpdl"))
|
||
|
||
|
||
# --- Runtime pause switch for CPU-heavy work (no rebuild needed) ---
|
||
PAUSE_TRANSCRIBE_FILE = Path(os.getenv("PAUSE_TRANSCRIBE_FILE", str(TRN / ".pause_transcribe")))
|
||
|
||
# Redis-backed pause flag (podx-tools compatible)
|
||
PAUSE_TRANSCRIBE_REDIS_KEY = os.getenv("PAUSE_TRANSCRIBE_REDIS_KEY", "podx:transcribe:paused").strip()
|
||
|
||
def _pause_flag_redis() -> bool:
|
||
"""Return True if a truthy pause flag is set in Redis under PAUSE_TRANSCRIBE_REDIS_KEY."""
|
||
try:
|
||
from redis import Redis as _R
|
||
val = _R.from_url(REDIS_URL).get(PAUSE_TRANSCRIBE_REDIS_KEY)
|
||
if not val:
|
||
return False
|
||
v = val.decode("utf-8", "ignore").strip().lower()
|
||
return v not in ("", "0", "false", "no", "(nil)")
|
||
except Exception:
|
||
return False
|
||
|
||
def transcribe_paused() -> bool:
|
||
"""Return True if new transcription work should be paused (file flag or Redis flag)."""
|
||
try:
|
||
if PAUSE_TRANSCRIBE_FILE.exists():
|
||
return True
|
||
except Exception:
|
||
pass
|
||
# Fall back to Redis-based switch used by podx-tools
|
||
return _pause_flag_redis()
|
||
|
||
def wait_if_paused(label: str = "transcribe", poll_sec: int = 10):
|
||
"""
|
||
If the pause file exists, block this worker in a low-CPU sleep loop until it is removed.
|
||
This lets you 'pause' heavy work without killing workers or rebuilding.
|
||
"""
|
||
try:
|
||
if transcribe_paused():
|
||
print(f"[pause] {label}: pause flag present at {PAUSE_TRANSCRIBE_FILE}; waiting…", flush=True)
|
||
while transcribe_paused():
|
||
time.sleep(max(1, int(poll_sec)))
|
||
except Exception:
|
||
# If anything goes wrong reading the flag, don't block the pipeline.
|
||
pass
|
||
|
||
# --- Exception to abort transcription when pause is requested ---
|
||
class PauseInterrupt(Exception):
|
||
"""Raised to cooperatively abort a running transcription when pause is requested."""
|
||
pass
|
||
|
||
MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3")
|
||
COMPUTE = os.getenv("WHISPER_PRECISION","int8")
|
||
WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip()
|
||
|
||
# Whisper device/config controls
|
||
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "auto").strip()
|
||
WHISPER_DEVICE_INDEX = int(os.getenv("WHISPER_DEVICE_INDEX", "0"))
|
||
WHISPER_CPU_THREADS = int(os.getenv("WHISPER_CPU_THREADS", "4"))
|
||
# Decoding beam size (higher = more memory and quality). On GPU, 1 is typically fine.
|
||
try:
|
||
_beam_env = os.getenv("WHISPER_BEAM_SIZE", "")
|
||
WHISPER_BEAM_SIZE = int(_beam_env) if _beam_env.strip() else (1 if (os.getenv("WHISPER_DEVICE", "auto").strip().lower() == "cuda") else 2)
|
||
except Exception:
|
||
WHISPER_BEAM_SIZE = 1
|
||
|
||
# --- Host load guards / thread limits ---
|
||
# Limit ffmpeg threads (helps keep CPU in check when multiple workers run)
|
||
FFMPEG_THREADS = int(os.getenv("FFMPEG_THREADS", "1"))
|
||
|
||
# Tame BLAS/threadpools that libraries may spin up implicitly
|
||
import os as _os_threads
|
||
_os_threads.environ.setdefault("OMP_NUM_THREADS", str(WHISPER_CPU_THREADS))
|
||
_os_threads.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
|
||
_os_threads.environ.setdefault("MKL_NUM_THREADS", "1")
|
||
_os_threads.environ.setdefault("NUMEXPR_NUM_THREADS", "1")
|
||
|
||
# Whisper logging & resume controls
|
||
WHISPER_LOG_SEGMENTS = os.getenv("WHISPER_LOG_SEGMENTS", "1") not in ("0", "false", "False")
|
||
WHISPER_RESUME = os.getenv("WHISPER_RESUME", "1") not in ("0", "false", "False")
|
||
PARTIAL_SAVE_EVERY_SEGS = int(os.getenv("WHISPER_PARTIAL_SAVE_EVERY_SEGS", "20"))
|
||
|
||
# RSS resolver config
|
||
RSS_INDEX_PATH = Path(os.getenv("RSS_INDEX_PATH", "/transcripts/rss_index.json"))
|
||
RSS_DURATION_TOLERANCE = int(os.getenv("RSS_DURATION_TOLERANCE", "150")) # seconds
|
||
DEFAULT_TRANSCRIPT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
|
||
|
||
def _clean_extension(raw: str, fallback: str) -> str:
|
||
raw = (raw or fallback or "").strip()
|
||
if not raw:
|
||
raw = fallback
|
||
if not raw.startswith("."):
|
||
raw = f".{raw}"
|
||
return raw.lower()
|
||
|
||
|
||
OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/")
|
||
OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "")
|
||
OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library")
|
||
OWUI_AUTO_FIX_METADATA = os.getenv("OPENWEBUI_AUTO_FIX_METADATA", "1").strip().lower() not in ("0", "false", "no")
|
||
OWUI_METADATA_TEMPLATE_JSON = os.getenv("OPENWEBUI_METADATA_TEMPLATE_JSON", "").strip()
|
||
|
||
_OWUI_TEMPLATE_PATCHED: set[str] = set()
|
||
|
||
# Media normalisation options (transcoding for Plex-friendly formats)
|
||
MEDIA_NORMALIZE = os.getenv("MEDIA_NORMALIZE", "1").strip().lower() not in ("0", "false", "no")
|
||
MEDIA_NORMALIZE_KEEP_ORIGINAL = os.getenv("MEDIA_NORMALIZE_KEEP_ORIGINAL", "0").strip().lower() in ("1", "true", "yes")
|
||
|
||
VIDEO_NORMALIZE_CODEC = os.getenv("VIDEO_NORMALIZE_CODEC", "hevc").strip().lower()
|
||
VIDEO_NORMALIZE_EXTENSION = _clean_extension(os.getenv("VIDEO_NORMALIZE_EXTENSION", ".mp4"), ".mp4")
|
||
VIDEO_NORMALIZE_CRF = os.getenv("VIDEO_NORMALIZE_CRF", "28").strip()
|
||
VIDEO_NORMALIZE_PRESET = os.getenv("VIDEO_NORMALIZE_PRESET", "medium").strip()
|
||
VIDEO_NORMALIZE_TUNE = os.getenv("VIDEO_NORMALIZE_TUNE", "").strip()
|
||
VIDEO_NORMALIZE_AUDIO_CODEC = os.getenv("VIDEO_NORMALIZE_AUDIO_CODEC", "aac").strip().lower()
|
||
VIDEO_NORMALIZE_AUDIO_BITRATE = os.getenv("VIDEO_NORMALIZE_AUDIO_BITRATE", "160k").strip()
|
||
|
||
AUDIO_NORMALIZE_CODEC = os.getenv("AUDIO_NORMALIZE_CODEC", "libmp3lame").strip()
|
||
AUDIO_NORMALIZE_EXTENSION = _clean_extension(os.getenv("AUDIO_NORMALIZE_EXTENSION", ".mp3"), ".mp3")
|
||
AUDIO_NORMALIZE_BITRATE = os.getenv("AUDIO_NORMALIZE_BITRATE", "192k").strip()
|
||
AUDIO_NORMALIZE_CHANNELS = os.getenv("AUDIO_NORMALIZE_CHANNELS", "2").strip()
|
||
|
||
# Redis-backed job queue settings and offload toggle
|
||
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0").strip()
|
||
OFFLOAD_TRANSCRIBE = os.getenv("OFFLOAD_TRANSCRIBE", "1").lower() not in ("0", "false", "no")
|
||
|
||
# Worker role selection
|
||
WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe'
|
||
JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()]
|
||
|
||
# Remote transcription (OpenAI) configuration
|
||
TRANSCRIBE_BACKEND = os.getenv("TRANSCRIBE_BACKEND", "local").strip().lower()
|
||
OPENAI_API_KEY = (os.getenv("OPENAI_API_KEY", "") or "").strip()
|
||
OPENAI_BASE_URL = (os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") or "https://api.openai.com/v1").rstrip("/")
|
||
OPENAI_TRANSCRIBE_MODEL = os.getenv("OPENAI_TRANSCRIBE_MODEL", "whisper-1").strip()
|
||
OPENAI_TRANSCRIBE_TIMEOUT = int(os.getenv("OPENAI_TRANSCRIBE_TIMEOUT", "600"))
|
||
|
||
def _mode_allows(task: str) -> bool:
|
||
"""Gate tasks by worker role. In 'transcribe' mode only allow transcription of local files
|
||
(including indexing and OWUI publish). "task" is one of: 'download','web','local','transcribe'."""
|
||
if WORKER_MODE == "transcribe":
|
||
return task in {"local", "transcribe"}
|
||
return True
|
||
|
||
TRN.mkdir(parents=True, exist_ok=True)
|
||
LIB.mkdir(parents=True, exist_ok=True)
|
||
TMP.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Lazy Whisper model loader so the worker can start even if model download/setup is slow
|
||
_model = None
|
||
|
||
def get_model():
|
||
global _model
|
||
if _model is None:
|
||
print(f"[whisper] loading model='{MODEL_NAME}' device='{WHISPER_DEVICE}' idx={WHISPER_DEVICE_INDEX} compute='{COMPUTE}' threads={WHISPER_CPU_THREADS}", flush=True)
|
||
try:
|
||
_model = WhisperModel(
|
||
MODEL_NAME,
|
||
device=WHISPER_DEVICE,
|
||
device_index=WHISPER_DEVICE_INDEX,
|
||
compute_type=COMPUTE,
|
||
cpu_threads=WHISPER_CPU_THREADS,
|
||
)
|
||
except Exception as e:
|
||
# If GPU is selected/auto-selected but not available, some environments try to load
|
||
# CUDA/cuDNN and fail. Fall back to CPU automatically.
|
||
msg = str(e).lower()
|
||
gpu_markers = [
|
||
"cuda", "cublas", "cudnn", "hip", "rocm", "nvrtc", "gpu",
|
||
"unable to load any of {libcudnn", "cannot load symbol cudnncreatetensordescriptor",
|
||
]
|
||
if WHISPER_DEVICE.lower() != "cpu" and any(m in msg for m in gpu_markers):
|
||
print(f"[whisper] model init failed on device '{WHISPER_DEVICE}': {e}. Falling back to CPU…", flush=True)
|
||
_model = WhisperModel(
|
||
MODEL_NAME,
|
||
device="cpu",
|
||
device_index=0,
|
||
compute_type=COMPUTE,
|
||
cpu_threads=WHISPER_CPU_THREADS,
|
||
)
|
||
else:
|
||
raise
|
||
return _model
|
||
|
||
# --- Helper: Reset model with new device and device_index ---
|
||
def reset_model(device: str, device_index: int | None = None, compute_type: str | None = None):
|
||
"""Reset the global _model to a new WhisperModel with the given device and device_index."""
|
||
global _model
|
||
idx = device_index if device_index is not None else WHISPER_DEVICE_INDEX
|
||
ctype = compute_type or COMPUTE
|
||
print(f"[whisper] resetting model='{MODEL_NAME}' device='{device}' idx={idx} compute='{ctype}' threads={WHISPER_CPU_THREADS}", flush=True)
|
||
_model = WhisperModel(
|
||
MODEL_NAME,
|
||
device=device,
|
||
device_index=idx,
|
||
compute_type=ctype,
|
||
cpu_threads=WHISPER_CPU_THREADS,
|
||
)
|
||
|
||
# --- Helper: Run transcribe with fallback to CPU on GPU/oom errors ---
|
||
def run_transcribe_with_fallback(wav_path: Path, lang):
|
||
"""
|
||
Try to transcribe with current model; on GPU/CUDA/HIP/ROCm/OOM errors, reset to CPU and retry once.
|
||
Returns (segments, info) or raises exception.
|
||
"""
|
||
# First attempt with current settings
|
||
try:
|
||
model = get_model()
|
||
return model.transcribe(str(wav_path), vad_filter=True, language=lang, beam_size=WHISPER_BEAM_SIZE)
|
||
except Exception as e:
|
||
msg = str(e)
|
||
lower_msg = msg.lower()
|
||
oom_markers = ["out of memory", "cudaerrormemoryallocation", "cublas", "cuda", "cudnn"]
|
||
# If we encountered a GPU-related error, attempt progressive fallbacks on GPU before CPU
|
||
if any(m in lower_msg for m in oom_markers):
|
||
# Decide GPU compute fallback ladder
|
||
compute_chain = []
|
||
base = (COMPUTE or "float16").lower()
|
||
if base not in compute_chain:
|
||
compute_chain.append(base)
|
||
for c in ("int8_float16", "int8"):
|
||
if c not in compute_chain:
|
||
compute_chain.append(c)
|
||
|
||
for ctype in compute_chain[1:]: # skip the first (already tried via get_model())
|
||
try:
|
||
print(f"[whisper] GPU error '{msg}'. Retrying with compute_type='{ctype}' and beam_size=1...", flush=True)
|
||
reset_model("cuda", WHISPER_DEVICE_INDEX, compute_type=ctype)
|
||
model = get_model()
|
||
return model.transcribe(str(wav_path), vad_filter=True, language=lang, beam_size=1)
|
||
except Exception as e2:
|
||
emsg = str(e2).lower()
|
||
if not any(m in emsg for m in oom_markers):
|
||
# Non-GPU/oom error — rethrow
|
||
print(f"[whisper] GPU retry failed: {e2}", flush=True)
|
||
raise
|
||
# else continue to next fallback
|
||
# Fall back to CPU if all GPU fallbacks failed
|
||
print(f"[whisper] GPU attempts exhausted ('{msg}'); falling back to CPU int8", flush=True)
|
||
reset_model("cpu", 0, compute_type="int8")
|
||
try:
|
||
model = get_model()
|
||
return model.transcribe(str(wav_path), vad_filter=True, language=lang, beam_size=1)
|
||
except Exception as e3:
|
||
print(f"[whisper] CPU fallback also failed: {e3}", flush=True)
|
||
raise
|
||
# Non-GPU error — rethrow
|
||
raise
|
||
|
||
|
||
def run_transcribe_openai(wav_path: Path, lang_hint: str | None):
|
||
"""Transcribe audio via OpenAI's Whisper API, returning (segments, info, raw_payload)."""
|
||
if not OPENAI_API_KEY:
|
||
raise RuntimeError("OPENAI_API_KEY must be set when TRANSCRIBE_BACKEND is 'openai'")
|
||
|
||
url = f"{OPENAI_BASE_URL}/audio/transcriptions"
|
||
headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"}
|
||
data: dict[str, str] = {
|
||
"model": OPENAI_TRANSCRIBE_MODEL or "whisper-1",
|
||
"response_format": "verbose_json",
|
||
}
|
||
if lang_hint:
|
||
data["language"] = lang_hint
|
||
|
||
start = time.time()
|
||
with open(wav_path, "rb") as fh:
|
||
files = {"file": (wav_path.name, fh, "audio/wav")}
|
||
resp = requests.post(
|
||
url,
|
||
headers=headers,
|
||
data=data,
|
||
files=files,
|
||
timeout=OPENAI_TRANSCRIBE_TIMEOUT,
|
||
)
|
||
elapsed = time.time() - start
|
||
|
||
try:
|
||
resp.raise_for_status()
|
||
except requests.HTTPError as exc:
|
||
print(f"[openai] transcription failed ({exc}); response={resp.text[:400]}", flush=True)
|
||
raise
|
||
|
||
payload = resp.json()
|
||
segments_raw = payload.get("segments") or []
|
||
|
||
seg_objs: list[SimpleNamespace] = []
|
||
for seg in segments_raw:
|
||
seg_objs.append(
|
||
SimpleNamespace(
|
||
start=float(seg.get("start") or 0.0),
|
||
end=float(seg.get("end") or 0.0),
|
||
text=str(seg.get("text") or ""),
|
||
)
|
||
)
|
||
|
||
if not seg_objs and payload.get("text"):
|
||
duration = float(payload.get("duration") or 0.0)
|
||
seg_objs.append(
|
||
SimpleNamespace(
|
||
start=0.0,
|
||
end=duration,
|
||
text=str(payload.get("text") or ""),
|
||
)
|
||
)
|
||
|
||
language = payload.get("language") or lang_hint or ""
|
||
info = SimpleNamespace(language=language)
|
||
|
||
print(
|
||
f"[openai] transcribed {wav_path.name} via {OPENAI_TRANSCRIBE_MODEL or 'whisper-1'} "
|
||
f"in {elapsed:.1f}s; segments={len(seg_objs)} lang={language or 'unknown'}",
|
||
flush=True,
|
||
)
|
||
|
||
return seg_objs, info, payload
|
||
|
||
def log(feed):
|
||
try:
|
||
with open(TRN / "_feed.log", "a", encoding="utf-8") as f:
|
||
f.write(orjson.dumps(feed).decode()+"\n")
|
||
except Exception:
|
||
pass
|
||
|
||
def sanitize(name):
|
||
return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip()
|
||
|
||
# ---------- RSS transcript resolver ----------
|
||
|
||
def _normalize_title(t: str) -> str:
|
||
t = (t or "").lower()
|
||
t = re.sub(r"\s+", " ", t)
|
||
# remove punctuation-ish
|
||
t = re.sub(r"[^a-z0-9 _-]+", "", t)
|
||
return t.strip()
|
||
|
||
def _stem_without_date(stem: str) -> str:
|
||
# drop leading YYYYMMDD - from filenames created by yt-dlp template
|
||
m = re.match(r"^\d{8}\s*-\s*(.*)$", stem)
|
||
return m.group(1) if m else stem
|
||
|
||
def _extract_date_from_stem(stem: str) -> str | None:
|
||
m = re.search(r"\b(\d{8})\b", stem)
|
||
return m.group(1) if m else None
|
||
|
||
def _best_title_match(title: str, candidates: list[str]) -> tuple[str, float]:
|
||
"""Return (best_title, score 0..1) using difflib SequenceMatcher."""
|
||
if not candidates:
|
||
return "", 0.0
|
||
norm_title = _normalize_title(title)
|
||
best = ("", 0.0)
|
||
for c in candidates:
|
||
score = difflib.SequenceMatcher(None, norm_title, _normalize_title(c)).ratio()
|
||
if score > best[1]:
|
||
best = (c, score)
|
||
return best
|
||
|
||
def _load_rss_index() -> list[dict]:
|
||
try:
|
||
if RSS_INDEX_PATH.exists():
|
||
data = json.loads(RSS_INDEX_PATH.read_text(encoding="utf-8"))
|
||
# supports {"episodes":[...]} or a flat list
|
||
if isinstance(data, dict) and "episodes" in data:
|
||
return data["episodes"] or []
|
||
if isinstance(data, list):
|
||
return data
|
||
except Exception as e:
|
||
print(f"[resolver] failed to load RSS index: {e}", flush=True)
|
||
return []
|
||
|
||
def match_media_to_rss(media_path: Path) -> dict | None:
|
||
"""Try to match a local media file to an RSS episode entry."""
|
||
episodes = _load_rss_index()
|
||
if not episodes:
|
||
return None
|
||
|
||
stem = media_path.stem
|
||
title_no_date = _stem_without_date(stem)
|
||
file_date = _extract_date_from_stem(stem)
|
||
# duration tolerance
|
||
media_dur = media_duration_seconds(media_path)
|
||
|
||
# Candidates: filter by date if present, else all
|
||
if file_date:
|
||
pool = [e for e in episodes if (str(e.get("date", "")) == file_date or str(e.get("pubdate", "")) == file_date)]
|
||
if not pool:
|
||
pool = episodes
|
||
else:
|
||
pool = episodes
|
||
|
||
# Pick best by (title similarity, duration proximity)
|
||
best_ep, best_score = None, -1.0
|
||
for ep in pool:
|
||
ep_title = ep.get("title") or ep.get("itunes_title") or ""
|
||
sim = _best_title_match(title_no_date, [ep_title])[1]
|
||
dur = float(ep.get("duration_sec") or ep.get("duration") or 0.0)
|
||
dur_ok = True
|
||
if media_dur and dur:
|
||
dur_ok = abs(media_dur - dur) <= RSS_DURATION_TOLERANCE
|
||
score = sim + (0.1 if dur_ok else 0.0)
|
||
if score > best_score:
|
||
best_score, best_ep = score, ep
|
||
|
||
if best_ep and best_score >= 0.5:
|
||
print(f"[resolver] matched '{stem}' -> '{best_ep.get('title','')}' score={best_score:.2f}", flush=True)
|
||
return best_ep
|
||
return None
|
||
|
||
def _choose_transcript_url(ep: dict) -> tuple[str, str] | tuple[None, None]:
|
||
"""Return (url, kind) preferring txt, vtt, then srt. 'kind' in {'txt','vtt','srt'}."""
|
||
# unified structure from rss_ingest.py: ep["transcripts"] = [{"url":..., "type": ...}, ...]
|
||
items = ep.get("transcripts") or []
|
||
# some ingesters store separate keys
|
||
if not items:
|
||
for key, kind in [("transcript_txt","txt"), ("transcript_vtt","vtt"), ("transcript_srt","srt")]:
|
||
if ep.get(key):
|
||
items.append({"url": ep[key], "type": kind})
|
||
# preference order
|
||
for kind in ["txt", "vtt", "srt"]:
|
||
for it in items:
|
||
t = (it.get("type") or "").lower()
|
||
u = it.get("url") or ""
|
||
if u and (kind in t or (kind == "txt" and t in ["text","plain","text/plain"]) or (kind in u.lower())):
|
||
return u, kind
|
||
return (None, None)
|
||
|
||
def fetch_rss_transcript(ep: dict, dest_dir: Path) -> Path | None:
|
||
"""Download transcript to dest_dir and return local Path; convert VTT->SRT if needed."""
|
||
url, kind = _choose_transcript_url(ep)
|
||
if not url:
|
||
return None
|
||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||
# filename from episode title
|
||
safe = sanitize(ep.get("title") or ep.get("guid") or "episode")
|
||
path = dest_dir / f"{safe}.{kind if kind!='txt' else 'txt'}"
|
||
try:
|
||
r = requests.get(url, timeout=30)
|
||
r.raise_for_status()
|
||
mode = "wb" if kind in ("vtt","srt") else "w"
|
||
if mode == "wb":
|
||
path.write_bytes(r.content)
|
||
else:
|
||
path.write_text(r.text, encoding="utf-8")
|
||
print(f"[resolver] downloaded transcript ({kind}) from {url}", flush=True)
|
||
return path
|
||
except Exception as e:
|
||
print(f"[resolver] failed to fetch transcript: {e}", flush=True)
|
||
return None
|
||
|
||
def use_rss_transcript(media_path: Path, ep: dict) -> Path | None:
|
||
"""Create standard transcript artifacts from an RSS transcript (txt/vtt/srt)."""
|
||
# Prefer direct download; else if rss_ingest already saved a local file path, try that.
|
||
sidecar = None
|
||
local_hint = ep.get("transcript_local")
|
||
if local_hint:
|
||
p = Path(local_hint)
|
||
if p.exists():
|
||
sidecar = p
|
||
if sidecar is None:
|
||
sidecar = fetch_rss_transcript(ep, TMP)
|
||
|
||
if not sidecar or not sidecar.exists():
|
||
return None
|
||
|
||
# Convert to plain text
|
||
plain = transcript_text_from_file(sidecar)
|
||
lang = (ep.get("language") or ep.get("lang") or DEFAULT_TRANSCRIPT_LANG).split("-")[0]
|
||
base = write_plain_transcript(media_path, plain, language=lang)
|
||
# Place an SRT next to video for Plex
|
||
ensure_sidecar_next_to_media(sidecar, media_path, lang=lang)
|
||
# Write provenance sidecar
|
||
(base.with_suffix(".prov.json")).write_bytes(orjson.dumps({
|
||
"source": "rss",
|
||
"feed": ep.get("feed_url"),
|
||
"guid": ep.get("guid"),
|
||
"episode_title": ep.get("title"),
|
||
"transcript_kind": sidecar.suffix.lower().lstrip("."),
|
||
"transcript_url": _choose_transcript_url(ep)[0] or "",
|
||
}))
|
||
# Write Kodi/Plex-compatible NFO
|
||
try:
|
||
# Gather metadata for NFO from RSS entry
|
||
meta = {
|
||
"title": ep.get("title"),
|
||
"episode_title": ep.get("title"),
|
||
"show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show"),
|
||
"description": ep.get("description") or ep.get("content"),
|
||
"pubdate": ep.get("pubdate"),
|
||
"pubdate_iso": ep.get("date_iso"),
|
||
"duration_sec": ep.get("duration_sec") or ep.get("duration"),
|
||
"image": ep.get("image") or ep.get("image_url"),
|
||
"guid": ep.get("guid"),
|
||
}
|
||
txt_path = base.with_suffix(".txt")
|
||
transcript_text = txt_path.read_text(encoding="utf-8") if txt_path.exists() else None
|
||
write_episode_nfo(media_path, meta, transcript_text)
|
||
# Save local artwork for Plex/Kodi
|
||
try:
|
||
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[post] NFO write failed: {e}", flush=True)
|
||
return base
|
||
|
||
def find_sidecar_transcript(media_path: Path) -> Path | None:
|
||
"""Return a .txt/.srt/.vtt transcript file sitting next to media, if any.
|
||
Tries common variants including language-suffixed SRT/VTT.
|
||
"""
|
||
candidates: list[Path] = []
|
||
# exact same stem in same folder
|
||
for ext in [".txt", ".srt", ".vtt"]:
|
||
p = media_path.parent / (media_path.stem + ext)
|
||
if p.exists():
|
||
candidates.append(p)
|
||
# language-suffixed near the media file (e.g., .en.srt)
|
||
for ext in [".srt", ".vtt"]:
|
||
p = media_path.with_suffix(f".en{ext}")
|
||
if p.exists() and p not in candidates:
|
||
candidates.append(p)
|
||
return candidates[0] if candidates else None
|
||
|
||
|
||
# ---------- Transcript repository reuse helpers ----------
|
||
|
||
def find_repo_transcript_for_media(media_path: Path) -> Path | None:
|
||
"""Search the transcript repository (/transcripts) for an existing transcript
|
||
that likely belongs to this media file (match by YYYYMMDD in filename and/or
|
||
fuzzy title similarity). Returns a path to a matching .json if found."""
|
||
try:
|
||
stem = media_path.stem
|
||
title_no_date = _stem_without_date(stem)
|
||
file_date = _extract_date_from_stem(stem)
|
||
best_json, best_score = None, 0.0
|
||
for j in TRN.glob("*.json"):
|
||
try:
|
||
data = json.loads(j.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
continue
|
||
other_file = Path(data.get("file", ""))
|
||
other_stem = other_file.stem if other_file else j.stem
|
||
other_date = _extract_date_from_stem(other_stem)
|
||
# If both have dates and they differ a lot, skip
|
||
if file_date and other_date and file_date != other_date:
|
||
continue
|
||
# Compare titles (without dates)
|
||
sim = difflib.SequenceMatcher(
|
||
None,
|
||
_normalize_title(title_no_date),
|
||
_normalize_title(_stem_without_date(other_stem)),
|
||
).ratio()
|
||
# Nudge score when dates match
|
||
if file_date and other_date and file_date == other_date:
|
||
sim += 0.1
|
||
if sim > best_score:
|
||
best_score, best_json = sim, j
|
||
# Require a reasonable similarity
|
||
return best_json if best_json and best_score >= 0.60 else None
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def reuse_repo_transcript(media_path: Path, repo_json: Path) -> Path | None:
|
||
"""Copy/retarget an existing transcript JSON/TXT (and make SRT/VTT if possible)
|
||
from the repository so that it belongs to the provided media_path. Returns
|
||
the new base path in /transcripts or None."""
|
||
try:
|
||
# load the source transcript
|
||
data = json.loads(repo_json.read_text(encoding="utf-8"))
|
||
src_base = TRN / Path(repo_json).stem
|
||
src_txt = src_base.with_suffix(".txt")
|
||
src_srt = src_base.with_suffix(".srt")
|
||
src_vtt = src_base.with_suffix(".vtt")
|
||
|
||
# write the retargeted artifacts
|
||
new_title = media_path.stem
|
||
new_base = TRN / new_title
|
||
new_base.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
# update file path
|
||
data["file"] = str(media_path)
|
||
(new_base.with_suffix(".json")).write_bytes(orjson.dumps(data))
|
||
|
||
# copy or synthesize TXT
|
||
if src_txt.exists():
|
||
_safe_copy(src_txt, new_base.with_suffix(".txt"))
|
||
else:
|
||
# fallback: concatenate segments
|
||
txt = " ".join(s.get("text", "") for s in data.get("segments", []))
|
||
(new_base.with_suffix(".txt")).write_text(txt, encoding="utf-8")
|
||
|
||
# copy SRT/VTT if present; otherwise synthesize SRT from segments
|
||
if src_srt.exists():
|
||
_safe_copy(src_srt, new_base.with_suffix(".srt"))
|
||
else:
|
||
# synthesize SRT
|
||
def fmt_ts(t):
|
||
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
|
||
return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
|
||
with open(new_base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
|
||
for i, s in enumerate(data.get("segments", []), 1):
|
||
srt.write(f"{i}\n{fmt_ts(s.get('start',0.0))} --> {fmt_ts(s.get('end',0.0))}\n{s.get('text','').strip()}\n\n")
|
||
if src_vtt.exists():
|
||
_safe_copy(src_vtt, new_base.with_suffix(".vtt"))
|
||
else:
|
||
# synthesize VTT from segments
|
||
def fmt_ts_vtt(t):
|
||
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
|
||
return f"{h:02}:{m:02}:{s:06.3f}"
|
||
with open(new_base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
|
||
vtt.write("WEBVTT\n\n")
|
||
for s in data.get("segments", []):
|
||
vtt.write(f"{fmt_ts_vtt(s.get('start',0.0))} --> {fmt_ts_vtt(s.get('end',0.0))} \n{s.get('text','').strip()}\n\n")
|
||
|
||
# ensure sidecar next to media
|
||
try:
|
||
lang = (data.get("language") or DEFAULT_TRANSCRIPT_LANG).split("-")[0]
|
||
ensure_sidecar_next_to_media(new_base.with_suffix(".srt"), media_path, lang=lang)
|
||
except Exception:
|
||
pass
|
||
|
||
# Write Kodi/Plex-compatible NFO
|
||
try:
|
||
meta = {
|
||
"title": data.get("title") or media_path.stem,
|
||
"episode_title": data.get("title") or media_path.stem,
|
||
"show": data.get("show") or media_path.parent.name,
|
||
"description": data.get("description") or "",
|
||
"pubdate": data.get("pubdate") or data.get("date"),
|
||
"duration_sec": media_duration_seconds(media_path),
|
||
"image": data.get("image"),
|
||
"guid": data.get("guid") or data.get("id"),
|
||
}
|
||
txtp = new_base.with_suffix(".txt")
|
||
ttxt = txtp.read_text(encoding="utf-8") if txtp.exists() else None
|
||
write_episode_nfo(media_path, meta, ttxt)
|
||
# Save local artwork for Plex/Kodi
|
||
try:
|
||
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[post] NFO write failed: {e}", flush=True)
|
||
|
||
return new_base
|
||
except Exception as e:
|
||
print(f"[resolver] failed to reuse repo transcript: {e}", flush=True)
|
||
return None
|
||
|
||
|
||
def transcript_text_from_file(path: Path) -> str:
|
||
"""Extract plain text from .txt/.srt/.vtt by stripping timestamps and counters."""
|
||
try:
|
||
raw = path.read_text(encoding="utf-8", errors="ignore")
|
||
except Exception:
|
||
raw = path.read_text(errors="ignore")
|
||
|
||
if path.suffix.lower() == ".txt":
|
||
return raw.strip()
|
||
|
||
# For SRT/VTT, drop timestamp lines, cue numbers and headers
|
||
lines: list[str] = []
|
||
for line in raw.splitlines():
|
||
ls = line.strip()
|
||
if not ls:
|
||
continue
|
||
if "-->" in ls: # timestamp line
|
||
continue
|
||
if ls.upper().startswith("WEBVTT"):
|
||
continue
|
||
if re.match(r"^\d+$", ls): # cue index
|
||
continue
|
||
lines.append(ls)
|
||
return " ".join(lines)
|
||
|
||
|
||
def ensure_sidecar_next_to_media(sidecar: Path, media_path: Path, lang: str = "en") -> None:
|
||
"""Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed. If the sidecar is .txt, do nothing."""
|
||
try:
|
||
if sidecar.suffix.lower() == ".txt":
|
||
return
|
||
if sidecar.suffix.lower() == ".srt":
|
||
dst = media_path.with_suffix(f".{lang}.srt")
|
||
_safe_copy(sidecar, dst)
|
||
elif sidecar.suffix.lower() == ".vtt":
|
||
tmp_srt = sidecar.with_suffix(".srt")
|
||
subprocess.run(["ffmpeg", "-nostdin", "-y", "-threads", str(FFMPEG_THREADS), "-i", str(sidecar), str(tmp_srt)], check=True)
|
||
dst = media_path.with_suffix(f".{lang}.srt")
|
||
shutil.move(str(tmp_srt), dst)
|
||
except Exception as e:
|
||
print(f"[post] sidecar copy/convert failed: {e}", flush=True)
|
||
|
||
|
||
# --- small helpers for progress/ETA formatting ---
|
||
def _fmt_eta(sec: float) -> str:
|
||
try:
|
||
sec = max(0, int(sec))
|
||
h, rem = divmod(sec, 3600)
|
||
m, s = divmod(rem, 60)
|
||
if h:
|
||
return f"{h}h {m}m {s}s"
|
||
if m:
|
||
return f"{m}m {s}s"
|
||
return f"{s}s"
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def save_episode_artwork(image_url: str | None, media_path: Path, show_title: str | None = None):
|
||
"""Download episode artwork from image_url and save next to the media as '<basename>.jpg'.
|
||
Also drop a folder-level 'poster.jpg' for the show directory if not present.
|
||
Best-effort; failures are logged but non-fatal.
|
||
"""
|
||
if not image_url:
|
||
return
|
||
try:
|
||
resp = requests.get(image_url, timeout=30, stream=True)
|
||
resp.raise_for_status()
|
||
# Determine content-type and write a temporary file
|
||
ctype = (resp.headers.get("Content-Type") or "").lower()
|
||
tmp_file = media_path.with_suffix(".art.tmp")
|
||
with open(tmp_file, "wb") as out:
|
||
for chunk in resp.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
out.write(chunk)
|
||
|
||
# Always provide a .jpg next to the media for Plex
|
||
episode_jpg = media_path.with_suffix(".jpg")
|
||
if "image/jpeg" in ctype:
|
||
# Already JPEG
|
||
shutil.move(str(tmp_file), str(episode_jpg))
|
||
else:
|
||
# Try converting to JPEG with ffmpeg; if it fails, keep bytes as-is
|
||
try:
|
||
subprocess.run(
|
||
["ffmpeg", "-nostdin", "-y", "-i", str(tmp_file), str(episode_jpg)],
|
||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True
|
||
)
|
||
try:
|
||
tmp_file.unlink()
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
shutil.move(str(tmp_file), str(episode_jpg))
|
||
|
||
# Also drop a folder poster once per show (helps Plex folder views)
|
||
try:
|
||
show_poster = media_path.parent / "poster.jpg"
|
||
if not show_poster.exists():
|
||
_safe_copy(episode_jpg, show_poster)
|
||
except Exception:
|
||
pass
|
||
|
||
except Exception as e:
|
||
print(f"[post] artwork download failed: {e}", flush=True)
|
||
|
||
|
||
|
||
def find_companion_files(src: Path) -> dict:
|
||
"""Return likely yt-dlp companion files for a downloaded media file."""
|
||
out = {}
|
||
# info.json can be either "<name>.<ext>.info.json" or "<name>.info.json"
|
||
cands_info = [
|
||
src.parent / f"{src.name}.info.json",
|
||
src.parent / f"{src.stem}.info.json",
|
||
]
|
||
out["info"] = next((p for p in cands_info if p.exists()), None)
|
||
|
||
# thumbnails may be "<name>.<ext>.jpg" or "<name>.jpg" (we convert to jpg)
|
||
cand_thumbs = [
|
||
src.parent / f"{src.name}.jpg",
|
||
src.parent / f"{src.stem}.jpg",
|
||
src.parent / f"{src.stem}.jpeg",
|
||
src.parent / f"{src.stem}.png",
|
||
src.parent / f"{src.stem}.webp",
|
||
]
|
||
out["thumb"] = next((p for p in cand_thumbs if p.exists()), None)
|
||
|
||
# subtitles (keep multiple)
|
||
subs = []
|
||
for s in src.parent.glob(f"{src.stem}*.srt"):
|
||
subs.append(s)
|
||
for s in src.parent.glob(f"{src.stem}*.vtt"):
|
||
subs.append(s)
|
||
out["subs"] = subs
|
||
return out
|
||
|
||
def load_info_json(path: Path) -> dict | None:
|
||
try:
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return None
|
||
|
||
def _iso_from_yyyymmdd(s: str | None) -> str | None:
|
||
if not s or not re.match(r"^\d{8}$", s):
|
||
return None
|
||
return f"{s[0:4]}-{s[4:6]}-{s[6:8]}"
|
||
|
||
def build_meta_from_sources(media_path: Path, uploader: str, fallback_meta: dict, ep: dict | None = None) -> dict:
|
||
"""
|
||
Merge metadata from (priority): RSS episode `ep` -> yt-dlp info.json (if present) -> fallback.
|
||
Returns a dict compatible with write_episode_nfo().
|
||
"""
|
||
# Start with fallback
|
||
meta = dict(fallback_meta)
|
||
|
||
# Augment from info.json if present
|
||
info = None
|
||
for cand in [
|
||
media_path.parent / f"{media_path.name}.info.json",
|
||
media_path.parent / f"{media_path.stem}.info.json",
|
||
]:
|
||
if cand.exists():
|
||
info = load_info_json(cand)
|
||
break
|
||
if info:
|
||
meta.setdefault("title", info.get("title"))
|
||
meta.setdefault("episode_title", info.get("title"))
|
||
meta.setdefault("description", info.get("description") or info.get("fulltitle"))
|
||
# upload_date is YYYYMMDD
|
||
iso = _iso_from_yyyymmdd(info.get("upload_date"))
|
||
if iso:
|
||
meta["pubdate_iso"] = iso
|
||
# Prefer video duration if present
|
||
if not meta.get("duration_sec") and info.get("duration"):
|
||
meta["duration_sec"] = info.get("duration")
|
||
# thumbnail URL
|
||
if not meta.get("image"):
|
||
meta["image"] = info.get("thumbnail")
|
||
# show/uploader
|
||
if not meta.get("show"):
|
||
meta["show"] = info.get("uploader") or uploader
|
||
|
||
# Finally, layer RSS data on top if available (most authoritative for podcasts)
|
||
if ep:
|
||
meta.update({
|
||
"title": ep.get("title") or meta.get("title"),
|
||
"episode_title": ep.get("title") or meta.get("episode_title"),
|
||
"show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show") or meta.get("show") or uploader,
|
||
"description": ep.get("description") or ep.get("content") or meta.get("description", ""),
|
||
"pubdate": ep.get("pubdate") or meta.get("pubdate", ""),
|
||
"pubdate_iso": ep.get("date_iso") or meta.get("pubdate_iso", meta.get("pubdate")),
|
||
"duration_sec": ep.get("duration_sec") or ep.get("duration") or meta.get("duration_sec"),
|
||
"image": ep.get("image") or ep.get("image_url") or meta.get("image", ""),
|
||
"guid": ep.get("guid") or meta.get("guid", ""),
|
||
})
|
||
|
||
return meta
|
||
|
||
# ---------- Kodi/Plex NFO writer ----------
|
||
from datetime import datetime
|
||
|
||
def _first_nonempty(*vals):
|
||
for v in vals:
|
||
if v is None:
|
||
continue
|
||
if isinstance(v, str) and v.strip():
|
||
return v.strip()
|
||
if v:
|
||
return v
|
||
return None
|
||
|
||
def _coerce_aired(pubdate: str | None) -> str:
|
||
"""Convert RSS-style pubdate to YYYY-MM-DD if possible."""
|
||
if not pubdate:
|
||
return ""
|
||
# already ISO-like
|
||
m = re.match(r"^(\d{4})[-/](\d{2})[-/](\d{2})", pubdate)
|
||
if m:
|
||
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
|
||
# RFC 2822 example: Tue, 21 Feb 2023 06:00:00 +0000
|
||
try:
|
||
dt = datetime.strptime(pubdate[:31], "%a, %d %b %Y %H:%M:%S %z")
|
||
return dt.strftime("%Y-%m-%d")
|
||
except Exception:
|
||
# try without tz
|
||
try:
|
||
dt = datetime.strptime(pubdate[:25], "%a, %d %b %Y %H:%M:%S")
|
||
return dt.strftime("%Y-%m-%d")
|
||
except Exception:
|
||
return ""
|
||
|
||
def write_episode_nfo(media_path: Path, meta: dict, transcript_text: str | None = None) -> Path:
|
||
"""Write a minimal Kodi/Plex-compatible NFO next to the media file.
|
||
`meta` may include: title, show, plot, pubdate, duration_sec, thumb, guid.
|
||
"""
|
||
try:
|
||
title = _first_nonempty(meta.get("episode_title"), meta.get("title"), media_path.stem) or media_path.stem
|
||
show = _first_nonempty(meta.get("show"), meta.get("podcast_title"), meta.get("feed_title"), media_path.parent.name) or media_path.parent.name
|
||
plot = _first_nonempty(meta.get("description"), meta.get("content"), meta.get("summary"), "") or ""
|
||
# Optionally append transcript preview to plot
|
||
if transcript_text:
|
||
preview = transcript_text.strip()
|
||
if preview:
|
||
preview = (preview[:1800] + "…") if len(preview) > 1800 else preview
|
||
plot = (plot + "\n\n" if plot else "") + preview
|
||
aired = _coerce_aired(_first_nonempty(meta.get("pubdate_iso"), meta.get("pubdate")))
|
||
guid = _first_nonempty(meta.get("guid"), meta.get("id"), "") or ""
|
||
thumb = _first_nonempty(meta.get("image"), meta.get("image_url"), meta.get("thumbnail"), "") or ""
|
||
dur_s = meta.get("duration_sec") or meta.get("duration") or 0
|
||
try:
|
||
dur_min = int(round(float(dur_s) / 60.0)) if dur_s else 0
|
||
except Exception:
|
||
dur_min = 0
|
||
|
||
# Build XML
|
||
xml = ["<episodedetails>"]
|
||
xml.append(f" <title>{xml_escape(title)}</title>")
|
||
xml.append(f" <showtitle>{xml_escape(show)}</showtitle>")
|
||
if plot:
|
||
xml.append(f" <plot>{xml_escape(plot)}</plot>")
|
||
if aired:
|
||
xml.append(f" <aired>{xml_escape(aired)}</aired>")
|
||
if guid:
|
||
xml.append(f" <uniqueid type=\"guid\" default=\"true\">{xml_escape(guid)}</uniqueid>")
|
||
if dur_min:
|
||
xml.append(f" <runtime>{dur_min}</runtime>")
|
||
if thumb:
|
||
xml.append(f" <thumb>{xml_escape(thumb)}</thumb>")
|
||
xml.append("</episodedetails>\n")
|
||
nfo_path = media_path.with_suffix(".nfo")
|
||
nfo_path.write_text("\n".join(xml), encoding="utf-8")
|
||
return nfo_path
|
||
except Exception:
|
||
return media_path.with_suffix(".nfo")
|
||
|
||
def write_plain_transcript(media_path: Path, text: str, language: str = "en") -> Path:
|
||
"""Write minimal transcript artifacts (.txt + .json) from plain text (no timestamps)."""
|
||
title = media_path.stem
|
||
base = TRN / title
|
||
base.parent.mkdir(parents=True, exist_ok=True)
|
||
(base.with_suffix(".txt")).write_text(text, encoding="utf-8")
|
||
(base.with_suffix(".json")).write_bytes(orjson.dumps({
|
||
"file": str(media_path),
|
||
"language": language,
|
||
"segments": [{"start": 0.0, "end": 0.0, "text": text}]
|
||
}))
|
||
return base
|
||
|
||
def yt_dlp(url, outdir):
|
||
# 1) Normalize YouTube Music URLs to standard YouTube
|
||
yurl = url
|
||
if 'music.youtube.com' in yurl:
|
||
yurl = yurl.replace('music.youtube.com', 'www.youtube.com')
|
||
|
||
outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s")
|
||
|
||
base_cmd = [
|
||
"yt-dlp", "-o", outtmpl,
|
||
"-f", "bv*+ba/best",
|
||
"--write-info-json",
|
||
"--write-thumbnail",
|
||
"--convert-thumbnails", "jpg",
|
||
"--write-subs", "--write-auto-subs",
|
||
"--sub-langs", os.getenv("YTDLP_SUBS_LANGS", "en.*,en"),
|
||
"--convert-subs", "srt",
|
||
"--no-playlist", "--no-warnings", "--restrict-filenames",
|
||
]
|
||
|
||
# 3) Optional cookies (set YTDLP_COOKIES=/path/to/cookies.txt in .env and mount it)
|
||
cookies_path = os.getenv("YTDLP_COOKIES", "").strip()
|
||
if cookies_path:
|
||
base_cmd += ["--cookies", cookies_path]
|
||
|
||
# Primary attempt
|
||
try:
|
||
subprocess.check_call(base_cmd + [yurl])
|
||
except subprocess.CalledProcessError:
|
||
# 2) Retry with Android client + mobile UA
|
||
retry_cmd = base_cmd + [
|
||
"--extractor-args", "youtube:player_client=android",
|
||
"--user-agent", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Mobile Safari/537.36",
|
||
yurl,
|
||
]
|
||
subprocess.check_call(retry_cmd)
|
||
|
||
media = (
|
||
list(outdir.rglob("*.[mM][pP]4")) +
|
||
list(outdir.rglob("*.mkv")) +
|
||
list(outdir.rglob("*.webm")) +
|
||
list(outdir.rglob("*.m4a")) +
|
||
list(outdir.rglob("*.mp3"))
|
||
)
|
||
return sorted(media, key=lambda p: p.stat().st_mtime)[-1:]
|
||
|
||
def extract_audio(src: Path, outdir: Path) -> Path:
|
||
"""Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs)."""
|
||
outdir.mkdir(parents=True, exist_ok=True)
|
||
wav_path = outdir / (src.stem + ".wav")
|
||
# Force audio-only, mono, 16kHz WAV
|
||
cmd = [
|
||
"ffmpeg", "-nostdin", "-y",
|
||
"-threads", str(FFMPEG_THREADS),
|
||
"-i", str(src),
|
||
"-vn", "-ac", "1", "-ar", "16000",
|
||
"-f", "wav", str(wav_path),
|
||
]
|
||
try:
|
||
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
|
||
except subprocess.CalledProcessError as e:
|
||
raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}")
|
||
return wav_path
|
||
|
||
# --- WAV trimming helper ---
|
||
def trim_wav(src_wav: Path, start_sec: float, outdir: Path) -> Path:
|
||
"""Return a trimmed 16k mono WAV starting at start_sec from src_wav."""
|
||
outdir.mkdir(parents=True, exist_ok=True)
|
||
if not start_sec or start_sec <= 0.0:
|
||
return src_wav
|
||
dst = outdir / (src_wav.stem + f".from_{int(start_sec)}s.wav")
|
||
try:
|
||
subprocess.check_output([
|
||
"ffmpeg", "-nostdin", "-y",
|
||
"-ss", str(max(0.0, float(start_sec))),
|
||
"-i", str(src_wav),
|
||
"-vn", "-ac", "1", "-ar", "16000",
|
||
"-f", "wav", str(dst),
|
||
], stderr=subprocess.STDOUT)
|
||
return dst
|
||
except subprocess.CalledProcessError as e:
|
||
# If trimming fails, fall back to full file
|
||
print(f"[whisper] trim failed, using full WAV: {e.output.decode(errors='ignore')}", flush=True)
|
||
return src_wav
|
||
|
||
def media_duration_seconds(path: Path) -> float:
|
||
"""Return duration in seconds using ffprobe; fallback to 0.0 on error."""
|
||
try:
|
||
out = subprocess.check_output([
|
||
"ffprobe", "-v", "error", "-show_entries", "format=duration",
|
||
"-of", "default=nokey=1:noprint_wrappers=1", str(path)
|
||
], stderr=subprocess.STDOUT, text=True).strip()
|
||
return float(out) if out else 0.0
|
||
except Exception:
|
||
return 0.0
|
||
|
||
# --- Partial transcript helpers ---
|
||
def _partial_paths(title: str) -> tuple[Path, Path]:
|
||
base = TRN / title
|
||
return base.with_suffix(".partial.json"), base.with_suffix(".partial.txt")
|
||
|
||
def _save_partial(title: str, language: str, segs: list[dict]):
|
||
pjson, ptxt = _partial_paths(title)
|
||
try:
|
||
# Save JSON
|
||
pjson.write_bytes(orjson.dumps({"file": str((TRN / title).with_suffix('.wav')), "language": language, "segments": segs}))
|
||
except Exception as e:
|
||
print(f"[whisper] partial json save failed: {e}", flush=True)
|
||
try:
|
||
# Save TXT snapshot
|
||
ptxt.write_text(" ".join(s.get("text","") for s in segs), encoding="utf-8")
|
||
except Exception as e:
|
||
print(f"[whisper] partial txt save failed: {e}", flush=True)
|
||
|
||
def transcribe(media_path: Path):
|
||
backend = TRANSCRIBE_BACKEND
|
||
print(f"[transcribe] start backend={backend}: {media_path}", flush=True)
|
||
# If paused, abort before any heavy work (no ffmpeg, no model load)
|
||
if transcribe_paused():
|
||
print(f"[pause] transcribe: pause active before heavy work; aborting {media_path}", flush=True)
|
||
raise PauseInterrupt("pause requested before start")
|
||
# 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases)
|
||
wav = extract_audio(media_path, TMP)
|
||
# Check again after extraction to avoid loading the model if a pause was requested meanwhile
|
||
if transcribe_paused():
|
||
try:
|
||
if wav.exists():
|
||
wav.unlink()
|
||
except Exception:
|
||
pass
|
||
print(f"[pause] transcribe: pause activated; stopping before model load for {media_path}", flush=True)
|
||
raise PauseInterrupt("pause requested after extract")
|
||
|
||
title = media_path.stem
|
||
base = TRN / title
|
||
resume_enabled = (backend != "openai") and WHISPER_RESUME
|
||
|
||
# Resume support: if a partial checkpoint exists, load it and trim input
|
||
resume_segments = []
|
||
resume_offset = 0.0
|
||
language_hint = None
|
||
if resume_enabled:
|
||
pjson, ptxt = _partial_paths(title)
|
||
if pjson.exists():
|
||
try:
|
||
pdata = json.loads(pjson.read_text(encoding="utf-8"))
|
||
resume_segments = pdata.get("segments", []) or []
|
||
if resume_segments:
|
||
resume_offset = float(resume_segments[-1].get("end", 0.0))
|
||
language_hint = pdata.get("language")
|
||
print(f"[whisper] resuming from ~{resume_offset:.2f}s with {len(resume_segments)} segments", flush=True)
|
||
except Exception as e:
|
||
print(f"[whisper] failed to load partial: {e}", flush=True)
|
||
|
||
# If resuming, trim WAV from last end time
|
||
if resume_enabled and resume_offset > 0.0:
|
||
wav_for_run = trim_wav(wav, resume_offset, TMP)
|
||
else:
|
||
wav_for_run = wav
|
||
|
||
# 2) Language selection
|
||
lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE
|
||
if language_hint and WHISPER_LANGUAGE.lower() == "auto":
|
||
# carry hint forward if available
|
||
lang = language_hint
|
||
|
||
# 3) Transcribe (local Whisper or OpenAI backend)
|
||
payload = None
|
||
if backend == "openai":
|
||
segments, info, payload = run_transcribe_openai(wav_for_run, lang)
|
||
else:
|
||
segments, info = run_transcribe_with_fallback(wav_for_run, lang)
|
||
|
||
# Determine duration for progress; use full WAV duration for consistent % regardless of resume
|
||
dur = media_duration_seconds(wav) or 0.0
|
||
# Start wall clock timer for speed/ETA
|
||
start_wall = time.time()
|
||
if resume_enabled and resume_offset and dur and resume_offset >= dur:
|
||
print(f"[whisper] resume offset {resume_offset:.2f}s >= duration {dur:.2f}s; resetting resume.", flush=True)
|
||
resume_offset = 0.0
|
||
last_pct = -1
|
||
|
||
segs = list(resume_segments) # start with what we already have
|
||
text_parts = [s.get("text","") for s in resume_segments]
|
||
|
||
# Walk new segments; shift their timestamps by resume_offset if trimmed
|
||
seg_count_since_save = 0
|
||
seg_index = len(resume_segments)
|
||
for s in segments:
|
||
seg_index += 1
|
||
start = (s.start or 0.0) + resume_offset
|
||
end = (s.end or 0.0) + resume_offset
|
||
seg = {"start": start, "end": end, "text": s.text}
|
||
segs.append(seg)
|
||
text_parts.append(s.text)
|
||
|
||
# --- Cooperative pause: save checkpoint and abort as soon as pause is requested ---
|
||
if resume_enabled and transcribe_paused():
|
||
try:
|
||
pct = int(min(100, max(0, (end / dur) * 100))) if dur > 0 else 0
|
||
except Exception:
|
||
pct = 0
|
||
_save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs)
|
||
log({
|
||
"status": "paused",
|
||
"path": str(media_path),
|
||
"title": title,
|
||
"progress": pct
|
||
})
|
||
print(f"[pause] transcribe: pause requested mid-run; aborting at ~{end:.2f}s for {media_path}", flush=True)
|
||
raise PauseInterrupt("pause requested")
|
||
|
||
if WHISPER_LOG_SEGMENTS:
|
||
print(f"[whisper] {start:8.2f}–{end:8.2f} {s.text.strip()}", flush=True)
|
||
|
||
# progress logging every +5%
|
||
if dur > 0 and end is not None:
|
||
pct = int(min(100, max(0, (end / dur) * 100)))
|
||
if pct >= last_pct + 5:
|
||
log({
|
||
"status": "transcribing",
|
||
"path": str(media_path),
|
||
"title": title,
|
||
"progress": pct
|
||
})
|
||
last_pct = pct
|
||
|
||
# compute realtime speed and ETA for console logs
|
||
try:
|
||
elapsed = max(0.001, time.time() - start_wall)
|
||
processed = max(0.0, float(end))
|
||
speed = (processed / elapsed) if elapsed > 0 else 0.0 # seconds processed per second
|
||
# represent as X real-time factor
|
||
rtf = speed # 1.0 == real-time
|
||
eta = ((dur - processed) / speed) if (speed > 0 and dur > 0) else 0
|
||
print(f"[whisper] progress {pct:3d}% seg={seg_index:5d} rtf={rtf:0.2f}x eta={_fmt_eta(eta)}", flush=True)
|
||
# also mirror to feed log with speed/eta
|
||
try:
|
||
log({
|
||
"status": "transcribing",
|
||
"path": str(media_path),
|
||
"title": title,
|
||
"progress": pct,
|
||
"speed_rtf": round(rtf, 2),
|
||
"eta_sec": int(max(0, eta))
|
||
})
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# periodic partial save
|
||
seg_count_since_save += 1
|
||
if resume_enabled and seg_count_since_save >= PARTIAL_SAVE_EVERY_SEGS:
|
||
_save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs)
|
||
seg_count_since_save = 0
|
||
|
||
# ensure we mark 100% on completion
|
||
if last_pct < 100:
|
||
log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100})
|
||
|
||
txt = " ".join(text_parts).strip()
|
||
|
||
# Write final transcript artifacts
|
||
(base.with_suffix(".json")).write_bytes(orjson.dumps({
|
||
"file": str(media_path),
|
||
"language": info.language,
|
||
"segments": segs
|
||
}))
|
||
(base.with_suffix(".txt")).write_text(txt, encoding="utf-8")
|
||
|
||
def fmt_ts(t):
|
||
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
|
||
return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
|
||
|
||
with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
|
||
for i,s in enumerate(segs,1):
|
||
srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n")
|
||
|
||
with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
|
||
vtt.write("WEBVTT\n\n")
|
||
for s in segs:
|
||
vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n")
|
||
|
||
# 4) Copy SRT next to media for Plex (language-suffixed)
|
||
try:
|
||
lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower()
|
||
srt_src = base.with_suffix(".srt")
|
||
srt_dst = media_path.with_suffix(f".{lang_code}.srt")
|
||
_safe_copy(srt_src, srt_dst)
|
||
except Exception as e:
|
||
print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True)
|
||
|
||
# Write Kodi/Plex-compatible NFO using enhanced metadata (same as before)
|
||
try:
|
||
fallback = {
|
||
"title": title,
|
||
"episode_title": title,
|
||
"show": media_path.parent.name,
|
||
"description": "",
|
||
"pubdate": _extract_date_from_stem(title),
|
||
"duration_sec": media_duration_seconds(media_path),
|
||
"image": "",
|
||
"guid": "",
|
||
}
|
||
meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None)
|
||
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
|
||
write_episode_nfo(media_path, meta, ttxt)
|
||
try:
|
||
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[post] NFO write failed: {e}", flush=True)
|
||
|
||
# Cleanup temp WAVs
|
||
try:
|
||
if wav_for_run != wav and wav_for_run.exists():
|
||
wav_for_run.unlink()
|
||
if wav.exists():
|
||
wav.unlink()
|
||
except Exception:
|
||
pass
|
||
|
||
# Remove partial checkpoints on success
|
||
if resume_enabled:
|
||
try:
|
||
pjson, ptxt = _partial_paths(title)
|
||
if pjson.exists(): pjson.unlink()
|
||
if ptxt.exists(): ptxt.unlink()
|
||
except Exception:
|
||
pass
|
||
|
||
# Final average speed over whole transcription
|
||
try:
|
||
total_elapsed = max(0.001, time.time() - start_wall)
|
||
avg_rtf = (dur / total_elapsed) if total_elapsed > 0 else 0.0
|
||
print(f"[whisper] avg speed ~{avg_rtf:0.2f}x (audio_seconds / wall_seconds)", flush=True)
|
||
except Exception:
|
||
pass
|
||
print(
|
||
f"[transcribe] backend={backend} finished: {media_path} lang={info.language} segments={len(segs)} dur={dur:.2f}s",
|
||
flush=True,
|
||
)
|
||
return base
|
||
|
||
|
||
# --- Meilisearch helpers ---
|
||
def _safe_doc_id(s: str) -> str:
|
||
"""
|
||
Meilisearch document IDs must be [A-Za-z0-9_-]. Convert the title to a safe slug.
|
||
If the result is empty, fall back to a short SHA1 hash.
|
||
"""
|
||
import hashlib
|
||
slug = re.sub(r"\s+", "_", (s or "").strip())
|
||
slug = re.sub(r"[^A-Za-z0-9_-]", "", slug)
|
||
if not slug:
|
||
slug = hashlib.sha1((s or "").encode("utf-8", errors="ignore")).hexdigest()[:16]
|
||
return slug
|
||
|
||
|
||
def ensure_meili_index():
|
||
"""Create index 'library' with primaryKey 'id' if it does not already exist."""
|
||
try:
|
||
r = requests.get(f"{MEILI_URL}/indexes/library",
|
||
headers={"Authorization": f"Bearer {MEILI_KEY}"}, timeout=10)
|
||
if r.status_code == 200:
|
||
return
|
||
# Attempt to create it
|
||
cr = requests.post(
|
||
f"{MEILI_URL}/indexes",
|
||
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type": "application/json"},
|
||
data=orjson.dumps({"uid": "library", "primaryKey": "id"}),
|
||
timeout=10,
|
||
)
|
||
# Ignore errors if another process created it first
|
||
try:
|
||
cr.raise_for_status()
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
# Non-fatal; indexing will fail later if the index truly doesn't exist
|
||
pass
|
||
|
||
|
||
def index_meili(json_path: Path):
|
||
# Make sure the index exists and is configured with a primary key
|
||
ensure_meili_index()
|
||
|
||
doc = json.loads(open(json_path, "r", encoding="utf-8").read())
|
||
file_field = doc.get("file", "")
|
||
title = Path(file_field).stem if file_field else json_path.stem
|
||
|
||
# Build a Meili-safe document ID
|
||
doc_id = _safe_doc_id(title)
|
||
|
||
# Extract a YYYYMMDD date if present
|
||
m = re.search(r"\b(\d{8})\b", title)
|
||
date = m.group(1) if m else ""
|
||
|
||
payload = {
|
||
"id": doc_id,
|
||
"type": "podcast",
|
||
"title": title,
|
||
"date": date,
|
||
"source": str(Path(LIB, Path(file_field or title).name)),
|
||
"text": " ".join(s.get("text", "") for s in doc.get("segments", [])),
|
||
"segments": doc.get("segments", []),
|
||
"meta": {"language": doc.get("language", "")},
|
||
}
|
||
|
||
for attempt in range(5):
|
||
try:
|
||
r = requests.post(
|
||
f"{MEILI_URL}/indexes/library/documents",
|
||
headers={
|
||
"Authorization": f"Bearer {MEILI_KEY}",
|
||
"Content-Type": "application/json",
|
||
},
|
||
data=orjson.dumps(payload),
|
||
timeout=15,
|
||
)
|
||
r.raise_for_status()
|
||
break
|
||
except Exception:
|
||
if attempt == 4:
|
||
raise
|
||
time.sleep(2 * (attempt + 1))
|
||
|
||
import tldextract, trafilatura, requests as _requests
|
||
|
||
def slugify(text):
|
||
text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_')
|
||
return text[:120] or 'page'
|
||
|
||
def _norm(s: str | None) -> str:
|
||
"""Normalize strings for stable comparisons across Unicode lookalikes and stray whitespace."""
|
||
if s is None:
|
||
return ""
|
||
try:
|
||
return unicodedata.normalize("NFKC", s).strip()
|
||
except Exception:
|
||
return (s or "").strip()
|
||
|
||
def save_web_snapshot(url: str):
|
||
r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"})
|
||
r.raise_for_status()
|
||
html = r.text
|
||
downloaded = trafilatura.load_html(html, url=url)
|
||
text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or ""
|
||
meta = trafilatura.metadata.extract_metadata(downloaded) or None
|
||
title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S).group(1).strip() if re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S) else url)
|
||
date = (meta.date if meta and getattr(meta, 'date', None) else "")
|
||
parts = tldextract.extract(url)
|
||
domain = ".".join([p for p in [parts.domain, parts.suffix] if p])
|
||
slug = slugify(title)
|
||
outdir = LIB / "web" / domain
|
||
outdir.mkdir(parents=True, exist_ok=True)
|
||
base = outdir / slug
|
||
open(base.with_suffix(".html"), "w", encoding="utf-8", errors="ignore").write(html)
|
||
open(base.with_suffix(".txt"), "w", encoding="utf-8", errors="ignore").write(text)
|
||
return base, title, domain, date, text
|
||
|
||
def index_web(base: Path, title: str, domain: str, date: str, text: str, url: str):
|
||
payload = {
|
||
"id": f"web:{domain}:{base.stem}",
|
||
"type": "web",
|
||
"title": title,
|
||
"date": re.sub(r'[^0-9]', '', date)[:8] if date else "",
|
||
"source": f"file://{str(base.with_suffix('.html'))}",
|
||
"text": text,
|
||
"segments": [],
|
||
"meta": {"url": url, "domain": domain}
|
||
}
|
||
r = requests.post(f"{MEILI_URL}/indexes/library/documents",
|
||
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"},
|
||
data=orjson.dumps(payload))
|
||
r.raise_for_status()
|
||
|
||
def is_media_url(url: str):
|
||
lowered = url.lower()
|
||
media_hosts = ["youtube.com","youtu.be","rumble.com","vimeo.com","soundcloud.com","spotify.com","podbean.com","buzzsprout.com"]
|
||
return any(h in lowered for h in media_hosts)
|
||
|
||
def owui_headers():
|
||
return {"Authorization": f"Bearer {OWUI_KEY}"} if OWUI_KEY else {}
|
||
|
||
|
||
def _owui_metadata_template_payload():
|
||
"""Return the metadata template payload to apply when auto-fix is enabled."""
|
||
if not OWUI_METADATA_TEMPLATE_JSON:
|
||
return {}
|
||
try:
|
||
return json.loads(OWUI_METADATA_TEMPLATE_JSON)
|
||
except Exception:
|
||
# Treat value as a raw string template if parsing fails
|
||
return OWUI_METADATA_TEMPLATE_JSON
|
||
|
||
|
||
def owui_fix_metadata_template(kb_id: str, force: bool = False) -> bool:
|
||
"""Ensure the target knowledge base has a safe metadata template.
|
||
|
||
Attempts PATCH/PUT with either a user-provided template or an empty object.
|
||
Returns True if an update succeeded; False otherwise.
|
||
"""
|
||
if not OWUI_AUTO_FIX_METADATA or not OWUI_URL or not OWUI_KEY or not kb_id:
|
||
return False
|
||
if not force and kb_id in _OWUI_TEMPLATE_PATCHED:
|
||
return False
|
||
|
||
payload_variants: list[object] = []
|
||
template_payload = _owui_metadata_template_payload()
|
||
payload_variants.append({"metadata_template": template_payload})
|
||
if template_payload not in ({}, "", None):
|
||
payload_variants.append({"metadata_template": {}})
|
||
payload_variants.append({"metadata_template": None})
|
||
|
||
headers = {**owui_headers(), "Content-Type": "application/json"}
|
||
url = f"{OWUI_URL}/api/v1/knowledge/{kb_id}"
|
||
success_codes = {200, 201, 202, 204}
|
||
|
||
for payload in payload_variants:
|
||
try:
|
||
body = orjson.dumps(payload)
|
||
except Exception:
|
||
body = json.dumps(payload).encode("utf-8")
|
||
for method in ("PATCH", "PUT"):
|
||
try:
|
||
resp = requests.request(method, url, headers=headers, data=body, timeout=15)
|
||
except Exception:
|
||
continue
|
||
if resp.status_code in success_codes:
|
||
print(f"[owui] metadata template adjusted via {method} for KB {kb_id}", flush=True)
|
||
_OWUI_TEMPLATE_PATCHED.add(kb_id)
|
||
return True
|
||
return False
|
||
|
||
|
||
# ---------- Media normalisation helpers ----------
|
||
|
||
VIDEO_ENCODER_MAP = {
|
||
"hevc": "libx265",
|
||
"h265": "libx265",
|
||
"h.265": "libx265",
|
||
"h264": "libx264",
|
||
"h.264": "libx264",
|
||
"av1": "libaom-av1",
|
||
}
|
||
|
||
AUDIO_ENCODER_MAP = {
|
||
"mp3": "libmp3lame",
|
||
"libmp3lame": "libmp3lame",
|
||
"aac": "aac",
|
||
"libfdk_aac": "libfdk_aac",
|
||
"opus": "libopus",
|
||
"flac": "flac",
|
||
}
|
||
|
||
|
||
def _resolve_video_encoder(codec: str) -> str:
|
||
key = (codec or "").lower()
|
||
return VIDEO_ENCODER_MAP.get(key, codec or "libx265")
|
||
|
||
|
||
def _resolve_audio_encoder(codec: str) -> str:
|
||
key = (codec or "").lower()
|
||
return AUDIO_ENCODER_MAP.get(key, codec or "libmp3lame")
|
||
|
||
|
||
def _sanitize_video_preset(encoder: str) -> str | None:
|
||
preset = (VIDEO_NORMALIZE_PRESET or "").strip()
|
||
if not preset:
|
||
return None
|
||
enc = (encoder or "").lower()
|
||
pl = preset.lower()
|
||
|
||
if "nvenc" in enc:
|
||
allowed = {"p1", "p2", "p3", "p4", "p5", "p6", "p7"}
|
||
return pl if pl in allowed else "p5"
|
||
|
||
if enc in {"libx265", "libx264"}:
|
||
allowed = {"ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow", "placebo"}
|
||
if pl in allowed:
|
||
return pl
|
||
if pl.startswith("p") and pl[1:].isdigit():
|
||
return "medium"
|
||
return "medium"
|
||
|
||
if enc == "libaom-av1":
|
||
allowed = {"good", "best", "realtime"}
|
||
if pl in allowed:
|
||
return pl
|
||
return "good"
|
||
|
||
return preset
|
||
|
||
|
||
def _parse_ffmpeg_ts(ts: str) -> float:
|
||
ts = ts.strip()
|
||
if not ts:
|
||
return 0.0
|
||
try:
|
||
parts = ts.split(":")
|
||
if len(parts) == 3:
|
||
h, m, s = parts
|
||
return int(h) * 3600 + int(m) * 60 + float(s)
|
||
if len(parts) == 2:
|
||
m, s = parts
|
||
return int(m) * 60 + float(s)
|
||
return float(ts)
|
||
except Exception:
|
||
return 0.0
|
||
|
||
|
||
def _ffmpeg_run_with_progress(cmd: list[str], duration: float, label: str) -> None:
|
||
if not cmd:
|
||
raise ValueError("ffmpeg command is empty")
|
||
output_path = cmd[-1]
|
||
cmd_progress = cmd[:-1] + ["-progress", "pipe:1", "-nostats", output_path]
|
||
|
||
start = time.time()
|
||
last_pct = -5
|
||
processed_sec = 0.0
|
||
captured: deque[str] = deque(maxlen=100)
|
||
|
||
proc = subprocess.Popen(
|
||
cmd_progress,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
text=True,
|
||
bufsize=1,
|
||
)
|
||
|
||
try:
|
||
assert proc.stdout is not None
|
||
for raw_line in proc.stdout:
|
||
line = raw_line.strip()
|
||
if not line:
|
||
continue
|
||
captured.append(line)
|
||
if line.startswith("out_time="):
|
||
processed_sec = _parse_ffmpeg_ts(line.split("=", 1)[1])
|
||
if duration > 0:
|
||
pct = int(min(100, max(0, (processed_sec / duration) * 100)))
|
||
if pct >= last_pct + 5:
|
||
elapsed = max(0.001, time.time() - start)
|
||
speed = processed_sec / elapsed if elapsed > 0 else 0.0
|
||
remaining = max(0.0, duration - processed_sec)
|
||
eta = remaining / speed if speed > 0 else 0.0
|
||
print(
|
||
f"[normalize] progress {pct:3d}% {label} rtf={speed:0.2f}x eta={_fmt_eta(eta)}",
|
||
flush=True,
|
||
)
|
||
last_pct = pct
|
||
elif line.startswith("progress=") and line.endswith("end"):
|
||
break
|
||
finally:
|
||
proc.wait()
|
||
|
||
if proc.returncode != 0:
|
||
output = "\n".join(captured)
|
||
raise subprocess.CalledProcessError(proc.returncode, cmd_progress, output)
|
||
|
||
|
||
def _ffprobe_streams(path: Path) -> dict[str, str]:
|
||
try:
|
||
out = subprocess.check_output(
|
||
["ffprobe", "-v", "error", "-show_entries", "stream=codec_type,codec_name", "-of", "json", str(path)],
|
||
text=True,
|
||
)
|
||
data = json.loads(out)
|
||
except Exception:
|
||
return {}
|
||
info: dict[str, str] = {"video": "", "audio": ""}
|
||
for stream in data.get("streams", []) or []:
|
||
ctype = (stream.get("codec_type") or "").lower()
|
||
cname = (stream.get("codec_name") or "").lower()
|
||
if ctype == "video" and not info["video"]:
|
||
info["video"] = cname
|
||
elif ctype == "audio" and not info["audio"]:
|
||
info["audio"] = cname
|
||
return info
|
||
|
||
|
||
def _unique_backup_path(path: Path) -> Path:
|
||
base = path.name
|
||
candidate = path.parent / f"{base}.orig"
|
||
if not candidate.exists():
|
||
return candidate
|
||
counter = 1
|
||
while True:
|
||
candidate = path.parent / f"{base}.orig{counter}"
|
||
if not candidate.exists():
|
||
return candidate
|
||
counter += 1
|
||
|
||
|
||
def _safe_copy(src: Path, dst: Path) -> None:
|
||
try:
|
||
if src.resolve(strict=False) == dst.resolve(strict=False):
|
||
return
|
||
except Exception:
|
||
if os.path.abspath(src) == os.path.abspath(dst):
|
||
return
|
||
dst.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.copy2(src, dst)
|
||
|
||
|
||
def _is_sidecar_name(name: str, base_stem: str, base_name: str) -> bool:
|
||
exact_suffixes = [".info.json", ".nfo", ".jpg", ".jpeg", ".png", ".webp", ".prov.json"]
|
||
for suf in exact_suffixes:
|
||
if name == f"{base_name}{suf}" or name == f"{base_stem}{suf}":
|
||
return True
|
||
text_exts = {".srt", ".vtt", ".txt", ".json", ".md"}
|
||
for ext in text_exts:
|
||
if name == f"{base_stem}{ext}" or name == f"{base_name}{ext}":
|
||
return True
|
||
if name.startswith(f"{base_stem}.") and name.endswith(ext):
|
||
return True
|
||
if name.startswith(f"{base_name}.") and name.endswith(ext):
|
||
return True
|
||
return False
|
||
|
||
|
||
def rename_media_sidecars(src: Path, dst: Path, skip: set[Path] | None = None) -> None:
|
||
if src == dst:
|
||
return
|
||
skip = skip or set()
|
||
parent = src.parent
|
||
stem_src, stem_dst = src.stem, dst.stem
|
||
name_src, name_dst = src.name, dst.name
|
||
for f in list(parent.glob("*")):
|
||
if not f.exists() or f == src or f == dst or f in skip:
|
||
continue
|
||
new_name = None
|
||
fname = f.name
|
||
if not _is_sidecar_name(fname, stem_src, name_src):
|
||
continue
|
||
if fname.startswith(name_src):
|
||
new_name = name_dst + fname[len(name_src):]
|
||
elif fname.startswith(stem_src):
|
||
new_name = stem_dst + fname[len(stem_src):]
|
||
if not new_name:
|
||
continue
|
||
target = parent / new_name
|
||
if target.exists():
|
||
continue
|
||
try:
|
||
f.rename(target)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _finalize_normalized_output(original: Path, final_path: Path, tmp_path: Path) -> Path:
|
||
if final_path.exists():
|
||
try:
|
||
final_path.unlink()
|
||
except Exception:
|
||
pass
|
||
if MEDIA_NORMALIZE_KEEP_ORIGINAL:
|
||
try:
|
||
backup = _unique_backup_path(original)
|
||
if original.exists():
|
||
original.rename(backup)
|
||
except Exception as e:
|
||
print(f"[normalize] could not preserve original for {original}: {e}", flush=True)
|
||
try:
|
||
if original.exists():
|
||
original.unlink()
|
||
except Exception:
|
||
pass
|
||
else:
|
||
try:
|
||
if original.exists():
|
||
original.unlink()
|
||
except Exception:
|
||
pass
|
||
os.replace(tmp_path, final_path)
|
||
return final_path
|
||
|
||
|
||
def _normalize_video_file(path: Path, info: dict[str, str]) -> Path:
|
||
current_codec = (info.get("video") or "").lower()
|
||
ext_match = path.suffix.lower() == VIDEO_NORMALIZE_EXTENSION
|
||
if current_codec == VIDEO_NORMALIZE_CODEC and ext_match:
|
||
return path
|
||
|
||
if current_codec.startswith("av1") or current_codec in {"libaom-av1", "av01"}:
|
||
print(f"[normalize] skip: {path.name} already AV1; leaving as-is", flush=True)
|
||
return path
|
||
|
||
encoder = _resolve_video_encoder(VIDEO_NORMALIZE_CODEC)
|
||
final_path = path if ext_match else path.with_suffix(VIDEO_NORMALIZE_EXTENSION)
|
||
tmp_path = final_path.parent / f"{final_path.stem}.tmp{VIDEO_NORMALIZE_EXTENSION}"
|
||
if tmp_path.exists():
|
||
tmp_path.unlink()
|
||
|
||
duration = media_duration_seconds(path)
|
||
|
||
def build_cmd(v_encoder: str) -> list[str]:
|
||
cmd = [
|
||
"ffmpeg", "-nostdin", "-y",
|
||
"-i", str(path),
|
||
"-map", "0",
|
||
"-c:v", v_encoder,
|
||
]
|
||
preset_val = _sanitize_video_preset(v_encoder)
|
||
if preset_val:
|
||
cmd.extend(["-preset", preset_val])
|
||
if VIDEO_NORMALIZE_TUNE:
|
||
cmd.extend(["-tune", VIDEO_NORMALIZE_TUNE])
|
||
if VIDEO_NORMALIZE_CRF:
|
||
cmd.extend(["-crf", VIDEO_NORMALIZE_CRF])
|
||
|
||
if info.get("audio"):
|
||
if VIDEO_NORMALIZE_AUDIO_CODEC == "copy":
|
||
cmd.extend(["-c:a", "copy"])
|
||
else:
|
||
cmd.extend(["-c:a", _resolve_audio_encoder(VIDEO_NORMALIZE_AUDIO_CODEC)])
|
||
if VIDEO_NORMALIZE_AUDIO_BITRATE:
|
||
cmd.extend(["-b:a", VIDEO_NORMALIZE_AUDIO_BITRATE])
|
||
else:
|
||
cmd.append("-an")
|
||
|
||
cmd.extend(["-c:s", "copy", str(tmp_path)])
|
||
return cmd
|
||
|
||
def fallback_encoder() -> str:
|
||
base = VIDEO_NORMALIZE_CODEC.lower()
|
||
if base.startswith("hevc") or base.startswith("h265"):
|
||
return "libx265"
|
||
if base.startswith("h264") or base.startswith("avc"):
|
||
return "libx264"
|
||
if base.startswith("av1"):
|
||
return "libaom-av1"
|
||
return "libx265"
|
||
|
||
attempted = []
|
||
encoders_to_try = [encoder]
|
||
lower_encoder = encoder.lower()
|
||
if any(token in lower_encoder for token in ("nvenc", "qsv", "cuda", "amf")):
|
||
cpu_encoder = fallback_encoder()
|
||
if cpu_encoder not in encoders_to_try:
|
||
encoders_to_try.append(cpu_encoder)
|
||
|
||
for idx, enc in enumerate(encoders_to_try):
|
||
try:
|
||
print(f"[normalize] video -> {final_path.name} codec={enc}", flush=True)
|
||
_ffmpeg_run_with_progress(build_cmd(enc), duration, final_path.name)
|
||
rename_media_sidecars(path, final_path, skip={tmp_path})
|
||
return _finalize_normalized_output(path, final_path, tmp_path)
|
||
except subprocess.CalledProcessError as e:
|
||
attempted.append((enc, e))
|
||
if tmp_path.exists():
|
||
tmp_path.unlink()
|
||
if idx == len(encoders_to_try) - 1:
|
||
details = ", ".join(f"{enc}: {err}" for enc, err in attempted)
|
||
raise RuntimeError(f"ffmpeg video normalize failed ({details})")
|
||
else:
|
||
print(f"[normalize] encoder {enc} failed ({e}); retrying with CPU fallback", flush=True)
|
||
|
||
return path
|
||
|
||
|
||
def _normalize_audio_file(path: Path, info: dict[str, str]) -> Path:
|
||
current_codec = (info.get("audio") or "").lower()
|
||
ext_match = path.suffix.lower() == AUDIO_NORMALIZE_EXTENSION
|
||
target_encoder = _resolve_audio_encoder(AUDIO_NORMALIZE_CODEC)
|
||
equivalent_codecs = {AUDIO_NORMALIZE_CODEC.lower(), target_encoder.lower()}
|
||
if target_encoder.lower() == "libmp3lame":
|
||
equivalent_codecs.add("mp3")
|
||
if target_encoder.lower() in {"aac", "libfdk_aac"}:
|
||
equivalent_codecs.update({"aac", "mp4a"})
|
||
if current_codec in equivalent_codecs and ext_match:
|
||
return path
|
||
|
||
final_path = path if ext_match else path.with_suffix(AUDIO_NORMALIZE_EXTENSION)
|
||
tmp_path = final_path.parent / f"{final_path.stem}.tmp{AUDIO_NORMALIZE_EXTENSION}"
|
||
if tmp_path.exists():
|
||
tmp_path.unlink()
|
||
|
||
duration = media_duration_seconds(path)
|
||
|
||
cmd = [
|
||
"ffmpeg", "-nostdin", "-y",
|
||
"-i", str(path),
|
||
"-vn",
|
||
"-c:a", target_encoder,
|
||
]
|
||
if AUDIO_NORMALIZE_BITRATE:
|
||
cmd.extend(["-b:a", AUDIO_NORMALIZE_BITRATE])
|
||
if AUDIO_NORMALIZE_CHANNELS:
|
||
cmd.extend(["-ac", AUDIO_NORMALIZE_CHANNELS])
|
||
cmd.append(str(tmp_path))
|
||
|
||
print(f"[normalize] audio -> {final_path.name} codec={AUDIO_NORMALIZE_CODEC}", flush=True)
|
||
try:
|
||
_ffmpeg_run_with_progress(cmd, duration, final_path.name)
|
||
except subprocess.CalledProcessError as e:
|
||
if tmp_path.exists():
|
||
tmp_path.unlink()
|
||
raise RuntimeError(f"ffmpeg audio normalize failed: {e}")
|
||
|
||
rename_media_sidecars(path, final_path, skip={tmp_path})
|
||
return _finalize_normalized_output(path, final_path, tmp_path)
|
||
|
||
|
||
def normalize_media_file(path: Path) -> Path:
|
||
if not MEDIA_NORMALIZE or not path.exists() or not path.is_file():
|
||
return path
|
||
try:
|
||
info = _ffprobe_streams(path)
|
||
except Exception as e:
|
||
print(f"[normalize] ffprobe failed for {path}: {e}", flush=True)
|
||
return path
|
||
try:
|
||
if info.get("video"):
|
||
return _normalize_video_file(path, info)
|
||
if info.get("audio"):
|
||
return _normalize_audio_file(path, info)
|
||
except Exception as e:
|
||
print(f"[normalize] failed for {path}: {e}", flush=True)
|
||
return path
|
||
|
||
def owui_get_or_create_kb():
|
||
"""Return a KB id for OWUI_KB without creating duplicates.
|
||
Honors OPENWEBUI_KB_ID, and tolerates both list and {"data": ...} response shapes.
|
||
"""
|
||
if not OWUI_URL or not OWUI_KEY:
|
||
return None
|
||
|
||
# 1) If an explicit id is provided, trust it
|
||
forced = os.getenv("OPENWEBUI_KB_ID", "").strip()
|
||
if forced:
|
||
return forced
|
||
|
||
# 2) List and try to find an exact name match
|
||
try:
|
||
r = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
|
||
r.raise_for_status()
|
||
body = r.json()
|
||
items = body if isinstance(body, list) else body.get("data", [])
|
||
# Prefer exact normalized name match; if multiple, pick the most recently updated
|
||
kb_target = _norm(OWUI_KB)
|
||
matches = [kb for kb in items if _norm(kb.get("name")) == kb_target]
|
||
if matches:
|
||
try:
|
||
matches.sort(key=lambda k: k.get("updated_at") or 0, reverse=True)
|
||
except Exception:
|
||
pass
|
||
return matches[0].get("id")
|
||
except Exception:
|
||
pass
|
||
|
||
# 3) Create only if not found
|
||
cr = requests.post(
|
||
f"{OWUI_URL}/api/v1/knowledge/create",
|
||
headers={**owui_headers(), "Content-Type": "application/json"},
|
||
data=orjson.dumps({"name": OWUI_KB, "description": "All local content indexed by podx"}),
|
||
timeout=15,
|
||
)
|
||
cr.raise_for_status()
|
||
created = cr.json()
|
||
if isinstance(created, dict) and created.get("id"):
|
||
return created["id"]
|
||
if isinstance(created, dict) and created.get("data") and created["data"].get("id"):
|
||
return created["data"]["id"]
|
||
# Fallback: try to resolve again by name
|
||
try:
|
||
rr = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
|
||
rr.raise_for_status()
|
||
body = rr.json()
|
||
items = body if isinstance(body, list) else body.get("data", [])
|
||
kb_target = _norm(OWUI_KB)
|
||
for kb in items:
|
||
if _norm(kb.get("name")) == kb_target:
|
||
return kb.get("id")
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def owui_upload_and_attach(path: Path, kb_id: str):
|
||
if OWUI_AUTO_FIX_METADATA:
|
||
owui_fix_metadata_template(kb_id)
|
||
with open(path, "rb") as f:
|
||
r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10)
|
||
r.raise_for_status()
|
||
up = r.json()
|
||
file_id = (up.get("id") or (up.get("data") or {}).get("id"))
|
||
if not file_id:
|
||
raise RuntimeError(f"OWUI upload: could not get file id from response: {up}")
|
||
payload = {"file_id": file_id}
|
||
attach_headers = {**owui_headers(), "Content-Type": "application/json"}
|
||
body = orjson.dumps(payload)
|
||
r = requests.post(
|
||
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
|
||
headers=attach_headers,
|
||
data=body,
|
||
timeout=180,
|
||
)
|
||
if r.status_code == 400 and OWUI_AUTO_FIX_METADATA:
|
||
txt = ""
|
||
try:
|
||
txt = r.text.lower()
|
||
except Exception:
|
||
txt = str(r.content).lower()
|
||
if "metadata" in txt and owui_fix_metadata_template(kb_id, force=True):
|
||
r = requests.post(
|
||
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
|
||
headers=attach_headers,
|
||
data=body,
|
||
timeout=180,
|
||
)
|
||
r.raise_for_status()
|
||
try:
|
||
time.sleep(0.5)
|
||
except Exception:
|
||
pass
|
||
return True
|
||
|
||
def publish_to_openwebui(paths):
|
||
if not OWUI_URL or not OWUI_KEY:
|
||
return
|
||
try:
|
||
kb_id = owui_get_or_create_kb()
|
||
if not kb_id:
|
||
print("[owui] KB resolve failed; skipping attach to avoid accidental duplicates", flush=True)
|
||
return
|
||
owui_fix_metadata_template(kb_id)
|
||
for p in paths:
|
||
p = Path(p)
|
||
if not p.exists():
|
||
continue
|
||
try:
|
||
owui_upload_and_attach(p, kb_id)
|
||
except Exception as e:
|
||
log({"url": str(p), "status": "owui_error", "error": str(e)})
|
||
except Exception as e:
|
||
log({"status": "owui_error", "error": str(e)})
|
||
|
||
# --------- Post-transcribe pipeline and job/queue helpers ---------
|
||
|
||
def _postprocess_after_transcribe(media_path: Path, base: Path):
|
||
"""Common steps after we have a `base` transcript path: index, publish, NFO, artwork."""
|
||
try:
|
||
index_meili(base.with_suffix(".json"))
|
||
except Exception as e:
|
||
print(f"[post] meili index failed: {e}", flush=True)
|
||
try:
|
||
publish_to_openwebui([base.with_suffix(".txt")])
|
||
except Exception as e:
|
||
print(f"[post] owui publish failed: {e}", flush=True)
|
||
# Build metadata using existing helper
|
||
try:
|
||
title = media_path.stem
|
||
fallback = {
|
||
"title": title,
|
||
"episode_title": title,
|
||
"show": media_path.parent.name,
|
||
"description": "",
|
||
"pubdate": _extract_date_from_stem(title),
|
||
"duration_sec": media_duration_seconds(media_path),
|
||
"image": "",
|
||
"guid": "",
|
||
}
|
||
meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None)
|
||
ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8")
|
||
write_episode_nfo(media_path, meta, ttxt)
|
||
try:
|
||
save_episode_artwork(meta.get("image"), media_path, meta.get("show"))
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[post] NFO write failed: {e}", flush=True)
|
||
|
||
def transcribe_job(path_str: str):
|
||
"""RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'."""
|
||
# Do NOT block when paused; exit quickly so CPU-heavy work stops ASAP.
|
||
if transcribe_paused():
|
||
print(f"[pause] transcribe_job: pause is active; skipping start for {path_str}", flush=True)
|
||
return "paused"
|
||
p = Path(path_str)
|
||
if not p.exists():
|
||
# Underlying file moved or deleted; skip gracefully instead of crashing ffmpeg.
|
||
try:
|
||
log({"path": path_str, "status": "skip", "reason": "file_not_found"})
|
||
except Exception:
|
||
pass
|
||
print(f"[transcribe] missing input; skipping job: {p}", flush=True)
|
||
return "missing"
|
||
try:
|
||
base = transcribe(p)
|
||
except PauseInterrupt:
|
||
print(f"[pause] transcribe_job: paused mid-run for {p}", flush=True)
|
||
return "paused"
|
||
_postprocess_after_transcribe(p, base)
|
||
return str(base)
|
||
|
||
def enqueue_transcribe(path: Path) -> bool:
|
||
"""Enqueue a transcription job to the 'transcribe' queue. Returns True on success."""
|
||
try:
|
||
if transcribe_paused():
|
||
print(f"[queue] pause flag present; enqueuing job for {path} but workers will wait", flush=True)
|
||
conn = Redis.from_url(REDIS_URL)
|
||
q = Queue("transcribe", connection=conn, default_timeout=60*60*24)
|
||
# Use dotted path so workers in other processes can import
|
||
q.enqueue("worker.transcribe_job", str(path), job_timeout=60*60*24)
|
||
print(f"[queue] enqueued transcribe job for {path}", flush=True)
|
||
return True
|
||
except Exception as e:
|
||
print(f"[queue] enqueue failed, will transcribe inline: {e}", flush=True)
|
||
return False
|
||
|
||
def handle_local_file(path_str: str):
|
||
"""Transcribe & index a local media file that already exists in /library.
|
||
If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper.
|
||
Safe to call repeatedly; it skips if transcript JSON already exists.
|
||
"""
|
||
try:
|
||
p = Path(path_str)
|
||
if not p.exists():
|
||
log({"url": path_str, "status": "error", "error": "file_not_found"})
|
||
return
|
||
normalized = normalize_media_file(p)
|
||
if normalized != p:
|
||
print(f"[normalize] local media: {p.name} -> {normalized.name}", flush=True)
|
||
p = normalized
|
||
path_str = str(p)
|
||
if WORKER_MODE == "transcribe":
|
||
print(f"[mode] transcribe-only worker handling local file: {p}", flush=True)
|
||
|
||
title = p.stem
|
||
base_json = TRN / f"{title}.json"
|
||
if base_json.exists():
|
||
log({"url": path_str, "status": "skip", "reason": "already_transcribed"})
|
||
return
|
||
|
||
info = {"url": path_str, "status": "transcribing", "title": title,
|
||
"uploader": p.parent.name, "date": "", "path": str(p), "progress": 0}
|
||
log(info)
|
||
|
||
# 0) Try RSS resolver first: if episode with transcript exists, use it (skip Whisper)
|
||
try:
|
||
ep = match_media_to_rss(p)
|
||
except Exception as _e:
|
||
ep = None
|
||
if ep:
|
||
base = use_rss_transcript(p, ep)
|
||
if base:
|
||
index_meili(base.with_suffix(".json"))
|
||
publish_to_openwebui([base.with_suffix(".txt")])
|
||
log({**info, **{"status": "done", "note": "used_rss_transcript"}})
|
||
return
|
||
|
||
# 1) Prefer an existing transcript sidecar if present
|
||
sidecar = find_sidecar_transcript(p)
|
||
if sidecar:
|
||
plain = transcript_text_from_file(sidecar)
|
||
lang = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
|
||
base = write_plain_transcript(p, plain, language=lang)
|
||
ensure_sidecar_next_to_media(sidecar, p, lang=lang)
|
||
index_meili(base.with_suffix(".json"))
|
||
publish_to_openwebui([base.with_suffix(".txt")])
|
||
try:
|
||
# Use info.json (if present) to enrich metadata
|
||
fallback = {
|
||
"title": title,
|
||
"episode_title": title,
|
||
"show": p.parent.name,
|
||
"description": "",
|
||
"pubdate": _extract_date_from_stem(title),
|
||
"duration_sec": media_duration_seconds(p),
|
||
"image": "",
|
||
"guid": "",
|
||
}
|
||
meta = build_meta_from_sources(p, p.parent.name, fallback, ep=None)
|
||
ttxt = base.with_suffix(".txt").read_text(encoding="utf-8")
|
||
write_episode_nfo(p, meta, ttxt)
|
||
# Try to fetch and save artwork locally
|
||
try:
|
||
save_episode_artwork(meta.get("image"), p, meta.get("show"))
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[post] NFO write failed: {e}", flush=True)
|
||
log({**info, **{"status": "done", "note": "used_existing_transcript"}})
|
||
return
|
||
|
||
# 1.5) Reuse a transcript that exists in the repository for a matching episode
|
||
repo_json = find_repo_transcript_for_media(p)
|
||
if repo_json:
|
||
base = reuse_repo_transcript(p, repo_json)
|
||
if base:
|
||
index_meili(base.with_suffix(".json"))
|
||
publish_to_openwebui([base.with_suffix(".txt")])
|
||
try:
|
||
data = json.loads((base.with_suffix(".json")).read_text(encoding="utf-8"))
|
||
# Start with repo metadata, then enrich from yt-dlp info.json if any
|
||
meta_repo = {
|
||
"title": data.get("title") or title,
|
||
"episode_title": data.get("title") or title,
|
||
"show": data.get("show") or p.parent.name,
|
||
"description": data.get("description") or "",
|
||
"pubdate": data.get("pubdate") or _extract_date_from_stem(title),
|
||
"duration_sec": media_duration_seconds(p),
|
||
"image": data.get("image"),
|
||
"guid": data.get("guid") or data.get("id"),
|
||
}
|
||
meta = build_meta_from_sources(p, p.parent.name, meta_repo, ep=None)
|
||
ttxt = base.with_suffix(".txt").read_text(encoding="utf-8")
|
||
write_episode_nfo(p, meta, ttxt)
|
||
try:
|
||
save_episode_artwork(meta.get("image"), p, meta.get("show"))
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[post] NFO write failed: {e}", flush=True)
|
||
log({**info, **{"status": "done", "note": "reused_repo_transcript"}})
|
||
return
|
||
|
||
# 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker)
|
||
# If paused, do not block; either enqueue (so worker will pause) or skip now.
|
||
if transcribe_paused():
|
||
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
|
||
log({**info, **{"status": "queued_transcribe"}})
|
||
return
|
||
log({**info, **{"status": "paused"}})
|
||
print(f"[pause] handle_local_file: pause active; not starting {p}", flush=True)
|
||
return
|
||
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p):
|
||
log({**info, **{"status": "queued_transcribe"}})
|
||
return
|
||
base = transcribe(p)
|
||
_postprocess_after_transcribe(p, base)
|
||
log({**info, **{"status": "done"}})
|
||
except Exception as e:
|
||
log({"url": path_str, "status": "error", "error": str(e)})
|
||
raise
|
||
|
||
|
||
# --- Refresh sidecar metadata and subtitles for an already-downloaded media file ---
|
||
def refresh_media(path_str: str):
|
||
"""
|
||
Refresh sidecar metadata (info.json, thumbnail) and subtitles for an already-downloaded media file.
|
||
Requires a companion .info.json next to the media (to supply the original URL). No media re-download.
|
||
"""
|
||
try:
|
||
p = Path(path_str)
|
||
if not p.exists() or not p.is_file():
|
||
log({"url": path_str, "status": "error", "error": "file_not_found"})
|
||
return
|
||
normalized = normalize_media_file(p)
|
||
if normalized != p:
|
||
print(f"[normalize] refresh media: {p.name} -> {normalized.name}", flush=True)
|
||
p = normalized
|
||
path_str = str(p)
|
||
|
||
# Locate existing info.json to get the original URL
|
||
info_json = None
|
||
for cand in [p.parent / f"{p.name}.info.json", p.parent / f"{p.stem}.info.json"]:
|
||
if cand.exists():
|
||
info_json = cand
|
||
break
|
||
|
||
if not info_json:
|
||
log({"path": str(p), "status": "refresh-skip", "reason": "no_info_json"})
|
||
print(f"[refresh] skip: no info.json next to {p}", flush=True)
|
||
return
|
||
|
||
info = load_info_json(info_json) or {}
|
||
url = info.get("webpage_url") or info.get("original_url") or info.get("url")
|
||
if not url:
|
||
log({"path": str(p), "status": "refresh-skip", "reason": "no_url_in_info"})
|
||
print(f"[refresh] skip: no URL in {info_json}", flush=True)
|
||
return
|
||
|
||
# Prepare yt-dlp command to refresh sidecars only, writing files exactly next to the media
|
||
outtmpl = str(p.with_suffix(".%(ext)s"))
|
||
sub_langs = os.getenv("YTDLP_SUBS_LANGS", "en.*,en")
|
||
|
||
cmd = [
|
||
"yt-dlp",
|
||
"--skip-download",
|
||
"--write-info-json",
|
||
"--write-thumbnail",
|
||
"--convert-thumbnails", "jpg",
|
||
"--write-subs", "--write-auto-subs",
|
||
"--sub-langs", sub_langs,
|
||
"--convert-subs", "srt",
|
||
"-o", outtmpl,
|
||
url,
|
||
]
|
||
|
||
print(f"[refresh] refreshing sidecars for {p} via yt-dlp", flush=True)
|
||
try:
|
||
subprocess.check_call(cmd)
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"[refresh] yt-dlp failed: {e}", flush=True)
|
||
raise
|
||
|
||
# Ensure language-suffixed SRT exists (Plex-friendly) if any subs were fetched
|
||
try:
|
||
# Pick any .srt just fetched that matches base
|
||
for s in p.parent.glob(f"{p.stem}*.srt"):
|
||
# If it's already lang-suffixed, keep; also copy to .en.srt when only plain .srt exists
|
||
if s.name == f"{p.stem}.srt":
|
||
_safe_copy(s, p.with_suffix(".en.srt"))
|
||
except Exception:
|
||
pass
|
||
|
||
# Rebuild NFO using fresh info.json (and RSS if available)
|
||
try:
|
||
# Try RSS match to enrich metadata (non-fatal if not present)
|
||
ep = None
|
||
try:
|
||
ep = match_media_to_rss(p)
|
||
except Exception:
|
||
ep = None
|
||
|
||
fallback = {
|
||
"title": p.stem,
|
||
"episode_title": p.stem,
|
||
"show": p.parent.name,
|
||
"description": "",
|
||
"pubdate": _extract_date_from_stem(p.stem),
|
||
"duration_sec": media_duration_seconds(p),
|
||
"image": "",
|
||
"guid": "",
|
||
}
|
||
meta = build_meta_from_sources(p, p.parent.name, fallback, ep)
|
||
# Save local artwork too
|
||
try:
|
||
save_episode_artwork(meta.get("image"), p, meta.get("show"))
|
||
except Exception:
|
||
pass
|
||
|
||
# If a transcript already exists, include it in the NFO plot preview
|
||
ttxt_path = (TRN / p.stem).with_suffix(".txt")
|
||
ttxt = ttxt_path.read_text(encoding="utf-8") if ttxt_path.exists() else None
|
||
write_episode_nfo(p, meta, ttxt)
|
||
except Exception as e:
|
||
print(f"[refresh] NFO/artwork update failed: {e}", flush=True)
|
||
|
||
log({"path": str(p), "status": "refresh-done"})
|
||
print(f"[refresh] done for {p}", flush=True)
|
||
|
||
except Exception as e:
|
||
log({"path": path_str, "status": "error", "error": str(e)})
|
||
raise
|
||
|
||
def handle_web(url: str):
|
||
if not _mode_allows("web"):
|
||
log({"url": url, "status": "skip", "reason": "mode_transcribe_only"})
|
||
print(f"[mode] transcribe-only: skipping web snapshot job: {url}", flush=True)
|
||
return
|
||
info = {"url": url, "status":"web-downloading", "title":"", "uploader":"", "date":"", "path":""}
|
||
log(info)
|
||
base, title, domain, date, text = save_web_snapshot(url)
|
||
info.update({"title": title, "uploader": domain, "date": date, "path": str(base.with_suffix('.html'))})
|
||
log({**info, **{"status":"web-indexing"}})
|
||
index_web(base, title, domain, date, text, url)
|
||
push = [p for p in [base.with_suffix('.txt'), base.with_suffix('.html')] if p.exists()]
|
||
publish_to_openwebui(push)
|
||
log({**info, **{"status":"done"}})
|
||
|
||
def handle_url(url: str):
|
||
try:
|
||
# In transcribe-only mode, refuse non-local/download jobs
|
||
if not _mode_allows("download"):
|
||
# Only permit local file paths in this mode
|
||
if url.startswith("/") or url.startswith("file://"):
|
||
return handle_local_file(url[7:] if url.startswith("file://") else url)
|
||
log({"url": url, "status": "skip", "reason": "mode_transcribe_only"})
|
||
print(f"[mode] transcribe-only: skipping non-local job: {url}", flush=True)
|
||
return
|
||
# If a local file path (or file:// URL) is provided, process it directly
|
||
if url.startswith("file://"):
|
||
return handle_local_file(url[7:])
|
||
if url.startswith("/") and Path(url).exists():
|
||
return handle_local_file(url)
|
||
|
||
if not is_media_url(url):
|
||
handle_web(url)
|
||
return
|
||
info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""}
|
||
log({**info, **{"status":"downloading"}})
|
||
files = yt_dlp(url, TMP)
|
||
for f in files:
|
||
parts = f.relative_to(TMP).parts
|
||
uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown"
|
||
dest_dir = LIB / uploader
|
||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||
dest = dest_dir / sanitize(f.name)
|
||
shutil.move(str(f), dest)
|
||
# Move companion files produced by yt-dlp (info.json, thumbnail, subtitles)
|
||
try:
|
||
companions = find_companion_files(f)
|
||
# info.json -> prefer "<dest.name>.info.json", fallback to "<dest.stem>.info.json"
|
||
if companions.get("info") and companions["info"].exists():
|
||
dest_info = dest.parent / f"{dest.name}.info.json"
|
||
try:
|
||
shutil.move(str(companions["info"]), dest_info)
|
||
except Exception:
|
||
# fallback naming without extension
|
||
dest_info2 = dest.parent / f"{dest.stem}.info.json"
|
||
try:
|
||
shutil.move(str(companions['info']), dest_info2)
|
||
except Exception:
|
||
pass
|
||
# thumbnail -> "<dest>.jpg"
|
||
if companions.get("thumb") and companions["thumb"].exists():
|
||
try:
|
||
shutil.move(str(companions["thumb"]), str(dest.with_suffix(".jpg")))
|
||
except Exception:
|
||
pass
|
||
# subtitles -> preserve language suffix: "<dest.stem><suffix>"
|
||
for s in companions.get("subs", []):
|
||
if not s.exists():
|
||
continue
|
||
suffix_tail = ""
|
||
s_name = s.name
|
||
f_stem = f.stem
|
||
if s_name.startswith(f_stem):
|
||
suffix_tail = s_name[len(f_stem):] # includes leading dot if present
|
||
else:
|
||
suffix_tail = s.suffix
|
||
dest_sub = dest.parent / f"{dest.stem}{suffix_tail}"
|
||
try:
|
||
shutil.move(str(s), str(dest_sub))
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
normalized_dest = normalize_media_file(dest)
|
||
if normalized_dest != dest:
|
||
print(f"[normalize] download media: {dest.name} -> {normalized_dest.name}", flush=True)
|
||
dest = normalized_dest
|
||
info.update({"title": dest.stem, "uploader": uploader,
|
||
"date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""),
|
||
"path": str(dest)})
|
||
log({**info, **{"status":"transcribing", "progress": 0}})
|
||
# Try RSS transcript resolver first
|
||
ep = None
|
||
try:
|
||
ep = match_media_to_rss(dest)
|
||
except Exception:
|
||
ep = None
|
||
if ep:
|
||
base = use_rss_transcript(dest, ep)
|
||
else:
|
||
base = None
|
||
# 1.5) If we didn't get an RSS transcript and there is a matching one already in the repo, reuse it
|
||
if not base:
|
||
repo_json = find_repo_transcript_for_media(dest)
|
||
if repo_json:
|
||
base = reuse_repo_transcript(dest, repo_json)
|
||
if not base:
|
||
# If paused, do not block; either enqueue (so worker will pause) or skip now.
|
||
if transcribe_paused():
|
||
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
|
||
log({**info, **{"status": "queued_transcribe"}})
|
||
continue
|
||
log({**info, **{"status": "paused"}})
|
||
print(f"[pause] handle_url: pause active; not starting {dest}", flush=True)
|
||
continue
|
||
if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest):
|
||
log({**info, **{"status": "queued_transcribe"}})
|
||
continue
|
||
base = transcribe(dest)
|
||
_postprocess_after_transcribe(dest, base)
|
||
log({**info, **{"status":"done"}})
|
||
except Exception as e:
|
||
log({"url": url, "status":"error", "error": str(e)})
|
||
raise
|