import os, subprocess, shutil, json, re, orjson, requests, unicodedata from types import SimpleNamespace from rq import Queue from redis import Redis from pathlib import Path import math import difflib import time from faster_whisper import WhisperModel from xml.sax.saxutils import escape as xml_escape MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") MEILI_KEY = os.getenv("MEILI_KEY", "") LIB = Path(os.getenv("LIBRARY_ROOT", "/library")) TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) TMP = Path(os.getenv("TMP_ROOT", "/tmpdl")) # --- Runtime pause switch for CPU-heavy work (no rebuild needed) --- PAUSE_TRANSCRIBE_FILE = Path(os.getenv("PAUSE_TRANSCRIBE_FILE", str(TRN / ".pause_transcribe"))) # Redis-backed pause flag (podx-tools compatible) PAUSE_TRANSCRIBE_REDIS_KEY = os.getenv("PAUSE_TRANSCRIBE_REDIS_KEY", "podx:transcribe:paused").strip() def _pause_flag_redis() -> bool: """Return True if a truthy pause flag is set in Redis under PAUSE_TRANSCRIBE_REDIS_KEY.""" try: from redis import Redis as _R val = _R.from_url(REDIS_URL).get(PAUSE_TRANSCRIBE_REDIS_KEY) if not val: return False v = val.decode("utf-8", "ignore").strip().lower() return v not in ("", "0", "false", "no", "(nil)") except Exception: return False def transcribe_paused() -> bool: """Return True if new transcription work should be paused (file flag or Redis flag).""" try: if PAUSE_TRANSCRIBE_FILE.exists(): return True except Exception: pass # Fall back to Redis-based switch used by podx-tools return _pause_flag_redis() def wait_if_paused(label: str = "transcribe", poll_sec: int = 10): """ If the pause file exists, block this worker in a low-CPU sleep loop until it is removed. This lets you 'pause' heavy work without killing workers or rebuilding. """ try: if transcribe_paused(): print(f"[pause] {label}: pause flag present at {PAUSE_TRANSCRIBE_FILE}; waiting…", flush=True) while transcribe_paused(): time.sleep(max(1, int(poll_sec))) except Exception: # If anything goes wrong reading the flag, don't block the pipeline. pass # --- Exception to abort transcription when pause is requested --- class PauseInterrupt(Exception): """Raised to cooperatively abort a running transcription when pause is requested.""" pass MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3") COMPUTE = os.getenv("WHISPER_PRECISION","int8") WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip() # Whisper device/config controls WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "auto").strip() WHISPER_DEVICE_INDEX = int(os.getenv("WHISPER_DEVICE_INDEX", "0")) WHISPER_CPU_THREADS = int(os.getenv("WHISPER_CPU_THREADS", "4")) # --- Host load guards / thread limits --- # Limit ffmpeg threads (helps keep CPU in check when multiple workers run) FFMPEG_THREADS = int(os.getenv("FFMPEG_THREADS", "1")) # Tame BLAS/threadpools that libraries may spin up implicitly import os as _os_threads _os_threads.environ.setdefault("OMP_NUM_THREADS", str(WHISPER_CPU_THREADS)) _os_threads.environ.setdefault("OPENBLAS_NUM_THREADS", "1") _os_threads.environ.setdefault("MKL_NUM_THREADS", "1") _os_threads.environ.setdefault("NUMEXPR_NUM_THREADS", "1") # Whisper logging & resume controls WHISPER_LOG_SEGMENTS = os.getenv("WHISPER_LOG_SEGMENTS", "1") not in ("0", "false", "False") WHISPER_RESUME = os.getenv("WHISPER_RESUME", "1") not in ("0", "false", "False") PARTIAL_SAVE_EVERY_SEGS = int(os.getenv("WHISPER_PARTIAL_SAVE_EVERY_SEGS", "20")) # RSS resolver config RSS_INDEX_PATH = Path(os.getenv("RSS_INDEX_PATH", "/transcripts/rss_index.json")) RSS_DURATION_TOLERANCE = int(os.getenv("RSS_DURATION_TOLERANCE", "150")) # seconds DEFAULT_TRANSCRIPT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en" def _clean_extension(raw: str, fallback: str) -> str: raw = (raw or fallback or "").strip() if not raw: raw = fallback if not raw.startswith("."): raw = f".{raw}" return raw.lower() OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/") OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "") OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library") OWUI_AUTO_FIX_METADATA = os.getenv("OPENWEBUI_AUTO_FIX_METADATA", "1").strip().lower() not in ("0", "false", "no") OWUI_METADATA_TEMPLATE_JSON = os.getenv("OPENWEBUI_METADATA_TEMPLATE_JSON", "").strip() _OWUI_TEMPLATE_PATCHED: set[str] = set() # Media normalisation options (transcoding for Plex-friendly formats) MEDIA_NORMALIZE = os.getenv("MEDIA_NORMALIZE", "1").strip().lower() not in ("0", "false", "no") MEDIA_NORMALIZE_KEEP_ORIGINAL = os.getenv("MEDIA_NORMALIZE_KEEP_ORIGINAL", "0").strip().lower() in ("1", "true", "yes") VIDEO_NORMALIZE_CODEC = os.getenv("VIDEO_NORMALIZE_CODEC", "hevc").strip().lower() VIDEO_NORMALIZE_EXTENSION = _clean_extension(os.getenv("VIDEO_NORMALIZE_EXTENSION", ".mp4"), ".mp4") VIDEO_NORMALIZE_CRF = os.getenv("VIDEO_NORMALIZE_CRF", "28").strip() VIDEO_NORMALIZE_PRESET = os.getenv("VIDEO_NORMALIZE_PRESET", "medium").strip() VIDEO_NORMALIZE_TUNE = os.getenv("VIDEO_NORMALIZE_TUNE", "").strip() VIDEO_NORMALIZE_AUDIO_CODEC = os.getenv("VIDEO_NORMALIZE_AUDIO_CODEC", "aac").strip().lower() VIDEO_NORMALIZE_AUDIO_BITRATE = os.getenv("VIDEO_NORMALIZE_AUDIO_BITRATE", "160k").strip() AUDIO_NORMALIZE_CODEC = os.getenv("AUDIO_NORMALIZE_CODEC", "libmp3lame").strip() AUDIO_NORMALIZE_EXTENSION = _clean_extension(os.getenv("AUDIO_NORMALIZE_EXTENSION", ".mp3"), ".mp3") AUDIO_NORMALIZE_BITRATE = os.getenv("AUDIO_NORMALIZE_BITRATE", "192k").strip() AUDIO_NORMALIZE_CHANNELS = os.getenv("AUDIO_NORMALIZE_CHANNELS", "2").strip() # Redis-backed job queue settings and offload toggle REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0").strip() OFFLOAD_TRANSCRIBE = os.getenv("OFFLOAD_TRANSCRIBE", "1").lower() not in ("0", "false", "no") # Worker role selection WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe' JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()] # Remote transcription (OpenAI) configuration TRANSCRIBE_BACKEND = os.getenv("TRANSCRIBE_BACKEND", "local").strip().lower() OPENAI_API_KEY = (os.getenv("OPENAI_API_KEY", "") or "").strip() OPENAI_BASE_URL = (os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") or "https://api.openai.com/v1").rstrip("/") OPENAI_TRANSCRIBE_MODEL = os.getenv("OPENAI_TRANSCRIBE_MODEL", "whisper-1").strip() OPENAI_TRANSCRIBE_TIMEOUT = int(os.getenv("OPENAI_TRANSCRIBE_TIMEOUT", "600")) def _mode_allows(task: str) -> bool: """Gate tasks by worker role. In 'transcribe' mode only allow transcription of local files (including indexing and OWUI publish). "task" is one of: 'download','web','local','transcribe'.""" if WORKER_MODE == "transcribe": return task in {"local", "transcribe"} return True TRN.mkdir(parents=True, exist_ok=True) LIB.mkdir(parents=True, exist_ok=True) TMP.mkdir(parents=True, exist_ok=True) # Lazy Whisper model loader so the worker can start even if model download/setup is slow _model = None def get_model(): global _model if _model is None: print(f"[whisper] loading model='{MODEL_NAME}' device='{WHISPER_DEVICE}' idx={WHISPER_DEVICE_INDEX} compute='{COMPUTE}' threads={WHISPER_CPU_THREADS}", flush=True) _model = WhisperModel( MODEL_NAME, device=WHISPER_DEVICE, device_index=WHISPER_DEVICE_INDEX, compute_type=COMPUTE, cpu_threads=WHISPER_CPU_THREADS, ) return _model # --- Helper: Reset model with new device and device_index --- def reset_model(device: str, device_index: int | None = None): """Reset the global _model to a new WhisperModel with the given device and device_index.""" global _model idx = device_index if device_index is not None else WHISPER_DEVICE_INDEX print(f"[whisper] resetting model='{MODEL_NAME}' device='{device}' idx={idx} compute='{COMPUTE}' threads={WHISPER_CPU_THREADS}", flush=True) _model = WhisperModel( MODEL_NAME, device=device, device_index=idx, compute_type=COMPUTE, cpu_threads=WHISPER_CPU_THREADS, ) # --- Helper: Run transcribe with fallback to CPU on GPU/oom errors --- def run_transcribe_with_fallback(wav_path: Path, lang): """ Try to transcribe with current model; on GPU/CUDA/HIP/ROCm/OOM errors, reset to CPU and retry once. Returns (segments, info) or raises exception. """ model = get_model() try: return model.transcribe(str(wav_path), vad_filter=True, language=lang) except Exception as e: msg = str(e) gpu_errs = [ "CUDA", "cublas", "out of memory", "HIP", "ROCm", "device-side assert", "CUDNN", "cudaError", "cuda runtime", "cudaMalloc" ] if any(err.lower() in msg.lower() for err in gpu_errs): print(f"[whisper] GPU error detected: '{msg}'. Retrying on CPU...", flush=True) reset_model("cpu", 0) try: model = get_model() return model.transcribe(str(wav_path), vad_filter=True, language=lang) except Exception as e2: print(f"[whisper] CPU fallback also failed: {e2}", flush=True) raise raise def run_transcribe_openai(wav_path: Path, lang_hint: str | None): """Transcribe audio via OpenAI's Whisper API, returning (segments, info, raw_payload).""" if not OPENAI_API_KEY: raise RuntimeError("OPENAI_API_KEY must be set when TRANSCRIBE_BACKEND is 'openai'") url = f"{OPENAI_BASE_URL}/audio/transcriptions" headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"} data: dict[str, str] = { "model": OPENAI_TRANSCRIBE_MODEL or "whisper-1", "response_format": "verbose_json", } if lang_hint: data["language"] = lang_hint start = time.time() with open(wav_path, "rb") as fh: files = {"file": (wav_path.name, fh, "audio/wav")} resp = requests.post( url, headers=headers, data=data, files=files, timeout=OPENAI_TRANSCRIBE_TIMEOUT, ) elapsed = time.time() - start try: resp.raise_for_status() except requests.HTTPError as exc: print(f"[openai] transcription failed ({exc}); response={resp.text[:400]}", flush=True) raise payload = resp.json() segments_raw = payload.get("segments") or [] seg_objs: list[SimpleNamespace] = [] for seg in segments_raw: seg_objs.append( SimpleNamespace( start=float(seg.get("start") or 0.0), end=float(seg.get("end") or 0.0), text=str(seg.get("text") or ""), ) ) if not seg_objs and payload.get("text"): duration = float(payload.get("duration") or 0.0) seg_objs.append( SimpleNamespace( start=0.0, end=duration, text=str(payload.get("text") or ""), ) ) language = payload.get("language") or lang_hint or "" info = SimpleNamespace(language=language) print( f"[openai] transcribed {wav_path.name} via {OPENAI_TRANSCRIBE_MODEL or 'whisper-1'} " f"in {elapsed:.1f}s; segments={len(seg_objs)} lang={language or 'unknown'}", flush=True, ) return seg_objs, info, payload def log(feed): try: with open(TRN / "_feed.log", "a", encoding="utf-8") as f: f.write(orjson.dumps(feed).decode()+"\n") except Exception: pass def sanitize(name): return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip() # ---------- RSS transcript resolver ---------- def _normalize_title(t: str) -> str: t = (t or "").lower() t = re.sub(r"\s+", " ", t) # remove punctuation-ish t = re.sub(r"[^a-z0-9 _-]+", "", t) return t.strip() def _stem_without_date(stem: str) -> str: # drop leading YYYYMMDD - from filenames created by yt-dlp template m = re.match(r"^\d{8}\s*-\s*(.*)$", stem) return m.group(1) if m else stem def _extract_date_from_stem(stem: str) -> str | None: m = re.search(r"\b(\d{8})\b", stem) return m.group(1) if m else None def _best_title_match(title: str, candidates: list[str]) -> tuple[str, float]: """Return (best_title, score 0..1) using difflib SequenceMatcher.""" if not candidates: return "", 0.0 norm_title = _normalize_title(title) best = ("", 0.0) for c in candidates: score = difflib.SequenceMatcher(None, norm_title, _normalize_title(c)).ratio() if score > best[1]: best = (c, score) return best def _load_rss_index() -> list[dict]: try: if RSS_INDEX_PATH.exists(): data = json.loads(RSS_INDEX_PATH.read_text(encoding="utf-8")) # supports {"episodes":[...]} or a flat list if isinstance(data, dict) and "episodes" in data: return data["episodes"] or [] if isinstance(data, list): return data except Exception as e: print(f"[resolver] failed to load RSS index: {e}", flush=True) return [] def match_media_to_rss(media_path: Path) -> dict | None: """Try to match a local media file to an RSS episode entry.""" episodes = _load_rss_index() if not episodes: return None stem = media_path.stem title_no_date = _stem_without_date(stem) file_date = _extract_date_from_stem(stem) # duration tolerance media_dur = media_duration_seconds(media_path) # Candidates: filter by date if present, else all if file_date: pool = [e for e in episodes if (str(e.get("date", "")) == file_date or str(e.get("pubdate", "")) == file_date)] if not pool: pool = episodes else: pool = episodes # Pick best by (title similarity, duration proximity) best_ep, best_score = None, -1.0 for ep in pool: ep_title = ep.get("title") or ep.get("itunes_title") or "" sim = _best_title_match(title_no_date, [ep_title])[1] dur = float(ep.get("duration_sec") or ep.get("duration") or 0.0) dur_ok = True if media_dur and dur: dur_ok = abs(media_dur - dur) <= RSS_DURATION_TOLERANCE score = sim + (0.1 if dur_ok else 0.0) if score > best_score: best_score, best_ep = score, ep if best_ep and best_score >= 0.5: print(f"[resolver] matched '{stem}' -> '{best_ep.get('title','')}' score={best_score:.2f}", flush=True) return best_ep return None def _choose_transcript_url(ep: dict) -> tuple[str, str] | tuple[None, None]: """Return (url, kind) preferring txt, vtt, then srt. 'kind' in {'txt','vtt','srt'}.""" # unified structure from rss_ingest.py: ep["transcripts"] = [{"url":..., "type": ...}, ...] items = ep.get("transcripts") or [] # some ingesters store separate keys if not items: for key, kind in [("transcript_txt","txt"), ("transcript_vtt","vtt"), ("transcript_srt","srt")]: if ep.get(key): items.append({"url": ep[key], "type": kind}) # preference order for kind in ["txt", "vtt", "srt"]: for it in items: t = (it.get("type") or "").lower() u = it.get("url") or "" if u and (kind in t or (kind == "txt" and t in ["text","plain","text/plain"]) or (kind in u.lower())): return u, kind return (None, None) def fetch_rss_transcript(ep: dict, dest_dir: Path) -> Path | None: """Download transcript to dest_dir and return local Path; convert VTT->SRT if needed.""" url, kind = _choose_transcript_url(ep) if not url: return None dest_dir.mkdir(parents=True, exist_ok=True) # filename from episode title safe = sanitize(ep.get("title") or ep.get("guid") or "episode") path = dest_dir / f"{safe}.{kind if kind!='txt' else 'txt'}" try: r = requests.get(url, timeout=30) r.raise_for_status() mode = "wb" if kind in ("vtt","srt") else "w" if mode == "wb": path.write_bytes(r.content) else: path.write_text(r.text, encoding="utf-8") print(f"[resolver] downloaded transcript ({kind}) from {url}", flush=True) return path except Exception as e: print(f"[resolver] failed to fetch transcript: {e}", flush=True) return None def use_rss_transcript(media_path: Path, ep: dict) -> Path | None: """Create standard transcript artifacts from an RSS transcript (txt/vtt/srt).""" # Prefer direct download; else if rss_ingest already saved a local file path, try that. sidecar = None local_hint = ep.get("transcript_local") if local_hint: p = Path(local_hint) if p.exists(): sidecar = p if sidecar is None: sidecar = fetch_rss_transcript(ep, TMP) if not sidecar or not sidecar.exists(): return None # Convert to plain text plain = transcript_text_from_file(sidecar) lang = (ep.get("language") or ep.get("lang") or DEFAULT_TRANSCRIPT_LANG).split("-")[0] base = write_plain_transcript(media_path, plain, language=lang) # Place an SRT next to video for Plex ensure_sidecar_next_to_media(sidecar, media_path, lang=lang) # Write provenance sidecar (base.with_suffix(".prov.json")).write_bytes(orjson.dumps({ "source": "rss", "feed": ep.get("feed_url"), "guid": ep.get("guid"), "episode_title": ep.get("title"), "transcript_kind": sidecar.suffix.lower().lstrip("."), "transcript_url": _choose_transcript_url(ep)[0] or "", })) # Write Kodi/Plex-compatible NFO try: # Gather metadata for NFO from RSS entry meta = { "title": ep.get("title"), "episode_title": ep.get("title"), "show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show"), "description": ep.get("description") or ep.get("content"), "pubdate": ep.get("pubdate"), "pubdate_iso": ep.get("date_iso"), "duration_sec": ep.get("duration_sec") or ep.get("duration"), "image": ep.get("image") or ep.get("image_url"), "guid": ep.get("guid"), } txt_path = base.with_suffix(".txt") transcript_text = txt_path.read_text(encoding="utf-8") if txt_path.exists() else None write_episode_nfo(media_path, meta, transcript_text) # Save local artwork for Plex/Kodi try: save_episode_artwork(meta.get("image"), media_path, meta.get("show")) except Exception: pass except Exception as e: print(f"[post] NFO write failed: {e}", flush=True) return base def find_sidecar_transcript(media_path: Path) -> Path | None: """Return a .txt/.srt/.vtt transcript file sitting next to media, if any. Tries common variants including language-suffixed SRT/VTT. """ candidates: list[Path] = [] # exact same stem in same folder for ext in [".txt", ".srt", ".vtt"]: p = media_path.parent / (media_path.stem + ext) if p.exists(): candidates.append(p) # language-suffixed near the media file (e.g., .en.srt) for ext in [".srt", ".vtt"]: p = media_path.with_suffix(f".en{ext}") if p.exists() and p not in candidates: candidates.append(p) return candidates[0] if candidates else None # ---------- Transcript repository reuse helpers ---------- def find_repo_transcript_for_media(media_path: Path) -> Path | None: """Search the transcript repository (/transcripts) for an existing transcript that likely belongs to this media file (match by YYYYMMDD in filename and/or fuzzy title similarity). Returns a path to a matching .json if found.""" try: stem = media_path.stem title_no_date = _stem_without_date(stem) file_date = _extract_date_from_stem(stem) best_json, best_score = None, 0.0 for j in TRN.glob("*.json"): try: data = json.loads(j.read_text(encoding="utf-8")) except Exception: continue other_file = Path(data.get("file", "")) other_stem = other_file.stem if other_file else j.stem other_date = _extract_date_from_stem(other_stem) # If both have dates and they differ a lot, skip if file_date and other_date and file_date != other_date: continue # Compare titles (without dates) sim = difflib.SequenceMatcher( None, _normalize_title(title_no_date), _normalize_title(_stem_without_date(other_stem)), ).ratio() # Nudge score when dates match if file_date and other_date and file_date == other_date: sim += 0.1 if sim > best_score: best_score, best_json = sim, j # Require a reasonable similarity return best_json if best_json and best_score >= 0.60 else None except Exception: return None def reuse_repo_transcript(media_path: Path, repo_json: Path) -> Path | None: """Copy/retarget an existing transcript JSON/TXT (and make SRT/VTT if possible) from the repository so that it belongs to the provided media_path. Returns the new base path in /transcripts or None.""" try: # load the source transcript data = json.loads(repo_json.read_text(encoding="utf-8")) src_base = TRN / Path(repo_json).stem src_txt = src_base.with_suffix(".txt") src_srt = src_base.with_suffix(".srt") src_vtt = src_base.with_suffix(".vtt") # write the retargeted artifacts new_title = media_path.stem new_base = TRN / new_title new_base.parent.mkdir(parents=True, exist_ok=True) # update file path data["file"] = str(media_path) (new_base.with_suffix(".json")).write_bytes(orjson.dumps(data)) # copy or synthesize TXT if src_txt.exists(): shutil.copy2(src_txt, new_base.with_suffix(".txt")) else: # fallback: concatenate segments txt = " ".join(s.get("text", "") for s in data.get("segments", [])) (new_base.with_suffix(".txt")).write_text(txt, encoding="utf-8") # copy SRT/VTT if present; otherwise synthesize SRT from segments if src_srt.exists(): shutil.copy2(src_srt, new_base.with_suffix(".srt")) else: # synthesize SRT def fmt_ts(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') with open(new_base.with_suffix(".srt"), "w", encoding="utf-8") as srt: for i, s in enumerate(data.get("segments", []), 1): srt.write(f"{i}\n{fmt_ts(s.get('start',0.0))} --> {fmt_ts(s.get('end',0.0))}\n{s.get('text','').strip()}\n\n") if src_vtt.exists(): shutil.copy2(src_vtt, new_base.with_suffix(".vtt")) else: # synthesize VTT from segments def fmt_ts_vtt(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}" with open(new_base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: vtt.write("WEBVTT\n\n") for s in data.get("segments", []): vtt.write(f"{fmt_ts_vtt(s.get('start',0.0))} --> {fmt_ts_vtt(s.get('end',0.0))} \n{s.get('text','').strip()}\n\n") # ensure sidecar next to media try: lang = (data.get("language") or DEFAULT_TRANSCRIPT_LANG).split("-")[0] ensure_sidecar_next_to_media(new_base.with_suffix(".srt"), media_path, lang=lang) except Exception: pass # Write Kodi/Plex-compatible NFO try: meta = { "title": data.get("title") or media_path.stem, "episode_title": data.get("title") or media_path.stem, "show": data.get("show") or media_path.parent.name, "description": data.get("description") or "", "pubdate": data.get("pubdate") or data.get("date"), "duration_sec": media_duration_seconds(media_path), "image": data.get("image"), "guid": data.get("guid") or data.get("id"), } txtp = new_base.with_suffix(".txt") ttxt = txtp.read_text(encoding="utf-8") if txtp.exists() else None write_episode_nfo(media_path, meta, ttxt) # Save local artwork for Plex/Kodi try: save_episode_artwork(meta.get("image"), media_path, meta.get("show")) except Exception: pass except Exception as e: print(f"[post] NFO write failed: {e}", flush=True) return new_base except Exception as e: print(f"[resolver] failed to reuse repo transcript: {e}", flush=True) return None def transcript_text_from_file(path: Path) -> str: """Extract plain text from .txt/.srt/.vtt by stripping timestamps and counters.""" try: raw = path.read_text(encoding="utf-8", errors="ignore") except Exception: raw = path.read_text(errors="ignore") if path.suffix.lower() == ".txt": return raw.strip() # For SRT/VTT, drop timestamp lines, cue numbers and headers lines: list[str] = [] for line in raw.splitlines(): ls = line.strip() if not ls: continue if "-->" in ls: # timestamp line continue if ls.upper().startswith("WEBVTT"): continue if re.match(r"^\d+$", ls): # cue index continue lines.append(ls) return " ".join(lines) def ensure_sidecar_next_to_media(sidecar: Path, media_path: Path, lang: str = "en") -> None: """Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed. If the sidecar is .txt, do nothing.""" try: if sidecar.suffix.lower() == ".txt": return if sidecar.suffix.lower() == ".srt": dst = media_path.with_suffix(f".{lang}.srt") shutil.copy2(sidecar, dst) elif sidecar.suffix.lower() == ".vtt": tmp_srt = sidecar.with_suffix(".srt") subprocess.run(["ffmpeg", "-nostdin", "-y", "-threads", str(FFMPEG_THREADS), "-i", str(sidecar), str(tmp_srt)], check=True) dst = media_path.with_suffix(f".{lang}.srt") shutil.move(str(tmp_srt), dst) except Exception as e: print(f"[post] sidecar copy/convert failed: {e}", flush=True) # --- small helpers for progress/ETA formatting --- def _fmt_eta(sec: float) -> str: try: sec = max(0, int(sec)) h, rem = divmod(sec, 3600) m, s = divmod(rem, 60) if h: return f"{h}h {m}m {s}s" if m: return f"{m}m {s}s" return f"{s}s" except Exception: return "" def save_episode_artwork(image_url: str | None, media_path: Path, show_title: str | None = None): """Download episode artwork from image_url and save next to the media as '.jpg'. Also drop a folder-level 'poster.jpg' for the show directory if not present. Best-effort; failures are logged but non-fatal. """ if not image_url: return try: resp = requests.get(image_url, timeout=30, stream=True) resp.raise_for_status() # Determine content-type and write a temporary file ctype = (resp.headers.get("Content-Type") or "").lower() tmp_file = media_path.with_suffix(".art.tmp") with open(tmp_file, "wb") as out: for chunk in resp.iter_content(chunk_size=8192): if chunk: out.write(chunk) # Always provide a .jpg next to the media for Plex episode_jpg = media_path.with_suffix(".jpg") if "image/jpeg" in ctype: # Already JPEG shutil.move(str(tmp_file), str(episode_jpg)) else: # Try converting to JPEG with ffmpeg; if it fails, keep bytes as-is try: subprocess.run( ["ffmpeg", "-nostdin", "-y", "-i", str(tmp_file), str(episode_jpg)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True ) try: tmp_file.unlink() except Exception: pass except Exception: shutil.move(str(tmp_file), str(episode_jpg)) # Also drop a folder poster once per show (helps Plex folder views) try: show_poster = media_path.parent / "poster.jpg" if not show_poster.exists(): shutil.copy2(episode_jpg, show_poster) except Exception: pass except Exception as e: print(f"[post] artwork download failed: {e}", flush=True) def find_companion_files(src: Path) -> dict: """Return likely yt-dlp companion files for a downloaded media file.""" out = {} # info.json can be either "..info.json" or ".info.json" cands_info = [ src.parent / f"{src.name}.info.json", src.parent / f"{src.stem}.info.json", ] out["info"] = next((p for p in cands_info if p.exists()), None) # thumbnails may be "..jpg" or ".jpg" (we convert to jpg) cand_thumbs = [ src.parent / f"{src.name}.jpg", src.parent / f"{src.stem}.jpg", src.parent / f"{src.stem}.jpeg", src.parent / f"{src.stem}.png", src.parent / f"{src.stem}.webp", ] out["thumb"] = next((p for p in cand_thumbs if p.exists()), None) # subtitles (keep multiple) subs = [] for s in src.parent.glob(f"{src.stem}*.srt"): subs.append(s) for s in src.parent.glob(f"{src.stem}*.vtt"): subs.append(s) out["subs"] = subs return out def load_info_json(path: Path) -> dict | None: try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return None def _iso_from_yyyymmdd(s: str | None) -> str | None: if not s or not re.match(r"^\d{8}$", s): return None return f"{s[0:4]}-{s[4:6]}-{s[6:8]}" def build_meta_from_sources(media_path: Path, uploader: str, fallback_meta: dict, ep: dict | None = None) -> dict: """ Merge metadata from (priority): RSS episode `ep` -> yt-dlp info.json (if present) -> fallback. Returns a dict compatible with write_episode_nfo(). """ # Start with fallback meta = dict(fallback_meta) # Augment from info.json if present info = None for cand in [ media_path.parent / f"{media_path.name}.info.json", media_path.parent / f"{media_path.stem}.info.json", ]: if cand.exists(): info = load_info_json(cand) break if info: meta.setdefault("title", info.get("title")) meta.setdefault("episode_title", info.get("title")) meta.setdefault("description", info.get("description") or info.get("fulltitle")) # upload_date is YYYYMMDD iso = _iso_from_yyyymmdd(info.get("upload_date")) if iso: meta["pubdate_iso"] = iso # Prefer video duration if present if not meta.get("duration_sec") and info.get("duration"): meta["duration_sec"] = info.get("duration") # thumbnail URL if not meta.get("image"): meta["image"] = info.get("thumbnail") # show/uploader if not meta.get("show"): meta["show"] = info.get("uploader") or uploader # Finally, layer RSS data on top if available (most authoritative for podcasts) if ep: meta.update({ "title": ep.get("title") or meta.get("title"), "episode_title": ep.get("title") or meta.get("episode_title"), "show": ep.get("podcast_title") or ep.get("feed_title") or ep.get("show") or meta.get("show") or uploader, "description": ep.get("description") or ep.get("content") or meta.get("description", ""), "pubdate": ep.get("pubdate") or meta.get("pubdate", ""), "pubdate_iso": ep.get("date_iso") or meta.get("pubdate_iso", meta.get("pubdate")), "duration_sec": ep.get("duration_sec") or ep.get("duration") or meta.get("duration_sec"), "image": ep.get("image") or ep.get("image_url") or meta.get("image", ""), "guid": ep.get("guid") or meta.get("guid", ""), }) return meta # ---------- Kodi/Plex NFO writer ---------- from datetime import datetime def _first_nonempty(*vals): for v in vals: if v is None: continue if isinstance(v, str) and v.strip(): return v.strip() if v: return v return None def _coerce_aired(pubdate: str | None) -> str: """Convert RSS-style pubdate to YYYY-MM-DD if possible.""" if not pubdate: return "" # already ISO-like m = re.match(r"^(\d{4})[-/](\d{2})[-/](\d{2})", pubdate) if m: return f"{m.group(1)}-{m.group(2)}-{m.group(3)}" # RFC 2822 example: Tue, 21 Feb 2023 06:00:00 +0000 try: dt = datetime.strptime(pubdate[:31], "%a, %d %b %Y %H:%M:%S %z") return dt.strftime("%Y-%m-%d") except Exception: # try without tz try: dt = datetime.strptime(pubdate[:25], "%a, %d %b %Y %H:%M:%S") return dt.strftime("%Y-%m-%d") except Exception: return "" def write_episode_nfo(media_path: Path, meta: dict, transcript_text: str | None = None) -> Path: """Write a minimal Kodi/Plex-compatible NFO next to the media file. `meta` may include: title, show, plot, pubdate, duration_sec, thumb, guid. """ try: title = _first_nonempty(meta.get("episode_title"), meta.get("title"), media_path.stem) or media_path.stem show = _first_nonempty(meta.get("show"), meta.get("podcast_title"), meta.get("feed_title"), media_path.parent.name) or media_path.parent.name plot = _first_nonempty(meta.get("description"), meta.get("content"), meta.get("summary"), "") or "" # Optionally append transcript preview to plot if transcript_text: preview = transcript_text.strip() if preview: preview = (preview[:1800] + "…") if len(preview) > 1800 else preview plot = (plot + "\n\n" if plot else "") + preview aired = _coerce_aired(_first_nonempty(meta.get("pubdate_iso"), meta.get("pubdate"))) guid = _first_nonempty(meta.get("guid"), meta.get("id"), "") or "" thumb = _first_nonempty(meta.get("image"), meta.get("image_url"), meta.get("thumbnail"), "") or "" dur_s = meta.get("duration_sec") or meta.get("duration") or 0 try: dur_min = int(round(float(dur_s) / 60.0)) if dur_s else 0 except Exception: dur_min = 0 # Build XML xml = [""] xml.append(f" {xml_escape(title)}") xml.append(f" {xml_escape(show)}") if plot: xml.append(f" {xml_escape(plot)}") if aired: xml.append(f" {xml_escape(aired)}") if guid: xml.append(f" {xml_escape(guid)}") if dur_min: xml.append(f" {dur_min}") if thumb: xml.append(f" {xml_escape(thumb)}") xml.append("\n") nfo_path = media_path.with_suffix(".nfo") nfo_path.write_text("\n".join(xml), encoding="utf-8") return nfo_path except Exception: return media_path.with_suffix(".nfo") def write_plain_transcript(media_path: Path, text: str, language: str = "en") -> Path: """Write minimal transcript artifacts (.txt + .json) from plain text (no timestamps).""" title = media_path.stem base = TRN / title base.parent.mkdir(parents=True, exist_ok=True) (base.with_suffix(".txt")).write_text(text, encoding="utf-8") (base.with_suffix(".json")).write_bytes(orjson.dumps({ "file": str(media_path), "language": language, "segments": [{"start": 0.0, "end": 0.0, "text": text}] })) return base def yt_dlp(url, outdir): # 1) Normalize YouTube Music URLs to standard YouTube yurl = url if 'music.youtube.com' in yurl: yurl = yurl.replace('music.youtube.com', 'www.youtube.com') outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s") base_cmd = [ "yt-dlp", "-o", outtmpl, "-f", "bv*+ba/best", "--write-info-json", "--write-thumbnail", "--convert-thumbnails", "jpg", "--write-subs", "--write-auto-subs", "--sub-langs", os.getenv("YTDLP_SUBS_LANGS", "en.*,en"), "--convert-subs", "srt", "--no-playlist", "--no-warnings", "--restrict-filenames", ] # 3) Optional cookies (set YTDLP_COOKIES=/path/to/cookies.txt in .env and mount it) cookies_path = os.getenv("YTDLP_COOKIES", "").strip() if cookies_path: base_cmd += ["--cookies", cookies_path] # Primary attempt try: subprocess.check_call(base_cmd + [yurl]) except subprocess.CalledProcessError: # 2) Retry with Android client + mobile UA retry_cmd = base_cmd + [ "--extractor-args", "youtube:player_client=android", "--user-agent", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Mobile Safari/537.36", yurl, ] subprocess.check_call(retry_cmd) media = ( list(outdir.rglob("*.[mM][pP]4")) + list(outdir.rglob("*.mkv")) + list(outdir.rglob("*.webm")) + list(outdir.rglob("*.m4a")) + list(outdir.rglob("*.mp3")) ) return sorted(media, key=lambda p: p.stat().st_mtime)[-1:] def extract_audio(src: Path, outdir: Path) -> Path: """Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs).""" outdir.mkdir(parents=True, exist_ok=True) wav_path = outdir / (src.stem + ".wav") # Force audio-only, mono, 16kHz WAV cmd = [ "ffmpeg", "-nostdin", "-y", "-threads", str(FFMPEG_THREADS), "-i", str(src), "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", str(wav_path), ] try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}") return wav_path # --- WAV trimming helper --- def trim_wav(src_wav: Path, start_sec: float, outdir: Path) -> Path: """Return a trimmed 16k mono WAV starting at start_sec from src_wav.""" outdir.mkdir(parents=True, exist_ok=True) if not start_sec or start_sec <= 0.0: return src_wav dst = outdir / (src_wav.stem + f".from_{int(start_sec)}s.wav") try: subprocess.check_output([ "ffmpeg", "-nostdin", "-y", "-ss", str(max(0.0, float(start_sec))), "-i", str(src_wav), "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", str(dst), ], stderr=subprocess.STDOUT) return dst except subprocess.CalledProcessError as e: # If trimming fails, fall back to full file print(f"[whisper] trim failed, using full WAV: {e.output.decode(errors='ignore')}", flush=True) return src_wav def media_duration_seconds(path: Path) -> float: """Return duration in seconds using ffprobe; fallback to 0.0 on error.""" try: out = subprocess.check_output([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nokey=1:noprint_wrappers=1", str(path) ], stderr=subprocess.STDOUT, text=True).strip() return float(out) if out else 0.0 except Exception: return 0.0 # --- Partial transcript helpers --- def _partial_paths(title: str) -> tuple[Path, Path]: base = TRN / title return base.with_suffix(".partial.json"), base.with_suffix(".partial.txt") def _save_partial(title: str, language: str, segs: list[dict]): pjson, ptxt = _partial_paths(title) try: # Save JSON pjson.write_bytes(orjson.dumps({"file": str((TRN / title).with_suffix('.wav')), "language": language, "segments": segs})) except Exception as e: print(f"[whisper] partial json save failed: {e}", flush=True) try: # Save TXT snapshot ptxt.write_text(" ".join(s.get("text","") for s in segs), encoding="utf-8") except Exception as e: print(f"[whisper] partial txt save failed: {e}", flush=True) def transcribe(media_path: Path): backend = TRANSCRIBE_BACKEND print(f"[transcribe] start backend={backend}: {media_path}", flush=True) # If paused, abort before any heavy work (no ffmpeg, no model load) if transcribe_paused(): print(f"[pause] transcribe: pause active before heavy work; aborting {media_path}", flush=True) raise PauseInterrupt("pause requested before start") # 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases) wav = extract_audio(media_path, TMP) # Check again after extraction to avoid loading the model if a pause was requested meanwhile if transcribe_paused(): try: if wav.exists(): wav.unlink() except Exception: pass print(f"[pause] transcribe: pause activated; stopping before model load for {media_path}", flush=True) raise PauseInterrupt("pause requested after extract") title = media_path.stem base = TRN / title resume_enabled = (backend != "openai") and WHISPER_RESUME # Resume support: if a partial checkpoint exists, load it and trim input resume_segments = [] resume_offset = 0.0 language_hint = None if resume_enabled: pjson, ptxt = _partial_paths(title) if pjson.exists(): try: pdata = json.loads(pjson.read_text(encoding="utf-8")) resume_segments = pdata.get("segments", []) or [] if resume_segments: resume_offset = float(resume_segments[-1].get("end", 0.0)) language_hint = pdata.get("language") print(f"[whisper] resuming from ~{resume_offset:.2f}s with {len(resume_segments)} segments", flush=True) except Exception as e: print(f"[whisper] failed to load partial: {e}", flush=True) # If resuming, trim WAV from last end time if resume_enabled and resume_offset > 0.0: wav_for_run = trim_wav(wav, resume_offset, TMP) else: wav_for_run = wav # 2) Language selection lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE if language_hint and WHISPER_LANGUAGE.lower() == "auto": # carry hint forward if available lang = language_hint # 3) Transcribe (local Whisper or OpenAI backend) payload = None if backend == "openai": segments, info, payload = run_transcribe_openai(wav_for_run, lang) else: segments, info = run_transcribe_with_fallback(wav_for_run, lang) # Determine duration for progress; use full WAV duration for consistent % regardless of resume dur = media_duration_seconds(wav) or 0.0 # Start wall clock timer for speed/ETA start_wall = time.time() if resume_enabled and resume_offset and dur and resume_offset >= dur: print(f"[whisper] resume offset {resume_offset:.2f}s >= duration {dur:.2f}s; resetting resume.", flush=True) resume_offset = 0.0 last_pct = -1 segs = list(resume_segments) # start with what we already have text_parts = [s.get("text","") for s in resume_segments] # Walk new segments; shift their timestamps by resume_offset if trimmed seg_count_since_save = 0 seg_index = len(resume_segments) for s in segments: seg_index += 1 start = (s.start or 0.0) + resume_offset end = (s.end or 0.0) + resume_offset seg = {"start": start, "end": end, "text": s.text} segs.append(seg) text_parts.append(s.text) # --- Cooperative pause: save checkpoint and abort as soon as pause is requested --- if resume_enabled and transcribe_paused(): try: pct = int(min(100, max(0, (end / dur) * 100))) if dur > 0 else 0 except Exception: pct = 0 _save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs) log({ "status": "paused", "path": str(media_path), "title": title, "progress": pct }) print(f"[pause] transcribe: pause requested mid-run; aborting at ~{end:.2f}s for {media_path}", flush=True) raise PauseInterrupt("pause requested") if WHISPER_LOG_SEGMENTS: print(f"[whisper] {start:8.2f}–{end:8.2f} {s.text.strip()}", flush=True) # progress logging every +5% if dur > 0 and end is not None: pct = int(min(100, max(0, (end / dur) * 100))) if pct >= last_pct + 5: log({ "status": "transcribing", "path": str(media_path), "title": title, "progress": pct }) last_pct = pct # compute realtime speed and ETA for console logs try: elapsed = max(0.001, time.time() - start_wall) processed = max(0.0, float(end)) speed = (processed / elapsed) if elapsed > 0 else 0.0 # seconds processed per second # represent as X real-time factor rtf = speed # 1.0 == real-time eta = ((dur - processed) / speed) if (speed > 0 and dur > 0) else 0 print(f"[whisper] progress {pct:3d}% seg={seg_index:5d} rtf={rtf:0.2f}x eta={_fmt_eta(eta)}", flush=True) # also mirror to feed log with speed/eta try: log({ "status": "transcribing", "path": str(media_path), "title": title, "progress": pct, "speed_rtf": round(rtf, 2), "eta_sec": int(max(0, eta)) }) except Exception: pass except Exception: pass # periodic partial save seg_count_since_save += 1 if resume_enabled and seg_count_since_save >= PARTIAL_SAVE_EVERY_SEGS: _save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs) seg_count_since_save = 0 # ensure we mark 100% on completion if last_pct < 100: log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100}) txt = " ".join(text_parts).strip() # Write final transcript artifacts (base.with_suffix(".json")).write_bytes(orjson.dumps({ "file": str(media_path), "language": info.language, "segments": segs })) (base.with_suffix(".txt")).write_text(txt, encoding="utf-8") def fmt_ts(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt: for i,s in enumerate(segs,1): srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n") with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: vtt.write("WEBVTT\n\n") for s in segs: vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n") # 4) Copy SRT next to media for Plex (language-suffixed) try: lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower() srt_src = base.with_suffix(".srt") srt_dst = media_path.with_suffix(f".{lang_code}.srt") shutil.copy2(srt_src, srt_dst) except Exception as e: print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True) # Write Kodi/Plex-compatible NFO using enhanced metadata (same as before) try: fallback = { "title": title, "episode_title": title, "show": media_path.parent.name, "description": "", "pubdate": _extract_date_from_stem(title), "duration_sec": media_duration_seconds(media_path), "image": "", "guid": "", } meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None) ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8") write_episode_nfo(media_path, meta, ttxt) try: save_episode_artwork(meta.get("image"), media_path, meta.get("show")) except Exception: pass except Exception as e: print(f"[post] NFO write failed: {e}", flush=True) # Cleanup temp WAVs try: if wav_for_run != wav and wav_for_run.exists(): wav_for_run.unlink() if wav.exists(): wav.unlink() except Exception: pass # Remove partial checkpoints on success if resume_enabled: try: pjson, ptxt = _partial_paths(title) if pjson.exists(): pjson.unlink() if ptxt.exists(): ptxt.unlink() except Exception: pass # Final average speed over whole transcription try: total_elapsed = max(0.001, time.time() - start_wall) avg_rtf = (dur / total_elapsed) if total_elapsed > 0 else 0.0 print(f"[whisper] avg speed ~{avg_rtf:0.2f}x (audio_seconds / wall_seconds)", flush=True) except Exception: pass print( f"[transcribe] backend={backend} finished: {media_path} lang={info.language} segments={len(segs)} dur={dur:.2f}s", flush=True, ) return base # --- Meilisearch helpers --- def _safe_doc_id(s: str) -> str: """ Meilisearch document IDs must be [A-Za-z0-9_-]. Convert the title to a safe slug. If the result is empty, fall back to a short SHA1 hash. """ import hashlib slug = re.sub(r"\s+", "_", (s or "").strip()) slug = re.sub(r"[^A-Za-z0-9_-]", "", slug) if not slug: slug = hashlib.sha1((s or "").encode("utf-8", errors="ignore")).hexdigest()[:16] return slug def ensure_meili_index(): """Create index 'library' with primaryKey 'id' if it does not already exist.""" try: r = requests.get(f"{MEILI_URL}/indexes/library", headers={"Authorization": f"Bearer {MEILI_KEY}"}, timeout=10) if r.status_code == 200: return # Attempt to create it cr = requests.post( f"{MEILI_URL}/indexes", headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type": "application/json"}, data=orjson.dumps({"uid": "library", "primaryKey": "id"}), timeout=10, ) # Ignore errors if another process created it first try: cr.raise_for_status() except Exception: pass except Exception: # Non-fatal; indexing will fail later if the index truly doesn't exist pass def index_meili(json_path: Path): # Make sure the index exists and is configured with a primary key ensure_meili_index() doc = json.loads(open(json_path, "r", encoding="utf-8").read()) file_field = doc.get("file", "") title = Path(file_field).stem if file_field else json_path.stem # Build a Meili-safe document ID doc_id = _safe_doc_id(title) # Extract a YYYYMMDD date if present m = re.search(r"\b(\d{8})\b", title) date = m.group(1) if m else "" payload = { "id": doc_id, "type": "podcast", "title": title, "date": date, "source": str(Path(LIB, Path(file_field or title).name)), "text": " ".join(s.get("text", "") for s in doc.get("segments", [])), "segments": doc.get("segments", []), "meta": {"language": doc.get("language", "")}, } for attempt in range(5): try: r = requests.post( f"{MEILI_URL}/indexes/library/documents", headers={ "Authorization": f"Bearer {MEILI_KEY}", "Content-Type": "application/json", }, data=orjson.dumps(payload), timeout=15, ) r.raise_for_status() break except Exception: if attempt == 4: raise time.sleep(2 * (attempt + 1)) import tldextract, trafilatura, requests as _requests def slugify(text): text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_') return text[:120] or 'page' def _norm(s: str | None) -> str: """Normalize strings for stable comparisons across Unicode lookalikes and stray whitespace.""" if s is None: return "" try: return unicodedata.normalize("NFKC", s).strip() except Exception: return (s or "").strip() def save_web_snapshot(url: str): r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"}) r.raise_for_status() html = r.text downloaded = trafilatura.load_html(html, url=url) text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or "" meta = trafilatura.metadata.extract_metadata(downloaded) or None title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r']*>(.*?)', html, re.I|re.S).group(1).strip() if re.search(r']*>(.*?)', html, re.I|re.S) else url) date = (meta.date if meta and getattr(meta, 'date', None) else "") parts = tldextract.extract(url) domain = ".".join([p for p in [parts.domain, parts.suffix] if p]) slug = slugify(title) outdir = LIB / "web" / domain outdir.mkdir(parents=True, exist_ok=True) base = outdir / slug open(base.with_suffix(".html"), "w", encoding="utf-8", errors="ignore").write(html) open(base.with_suffix(".txt"), "w", encoding="utf-8", errors="ignore").write(text) return base, title, domain, date, text def index_web(base: Path, title: str, domain: str, date: str, text: str, url: str): payload = { "id": f"web:{domain}:{base.stem}", "type": "web", "title": title, "date": re.sub(r'[^0-9]', '', date)[:8] if date else "", "source": f"file://{str(base.with_suffix('.html'))}", "text": text, "segments": [], "meta": {"url": url, "domain": domain} } r = requests.post(f"{MEILI_URL}/indexes/library/documents", headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"}, data=orjson.dumps(payload)) r.raise_for_status() def is_media_url(url: str): lowered = url.lower() media_hosts = ["youtube.com","youtu.be","rumble.com","vimeo.com","soundcloud.com","spotify.com","podbean.com","buzzsprout.com"] return any(h in lowered for h in media_hosts) def owui_headers(): return {"Authorization": f"Bearer {OWUI_KEY}"} if OWUI_KEY else {} def _owui_metadata_template_payload(): """Return the metadata template payload to apply when auto-fix is enabled.""" if not OWUI_METADATA_TEMPLATE_JSON: return {} try: return json.loads(OWUI_METADATA_TEMPLATE_JSON) except Exception: # Treat value as a raw string template if parsing fails return OWUI_METADATA_TEMPLATE_JSON def owui_fix_metadata_template(kb_id: str, force: bool = False) -> bool: """Ensure the target knowledge base has a safe metadata template. Attempts PATCH/PUT with either a user-provided template or an empty object. Returns True if an update succeeded; False otherwise. """ if not OWUI_AUTO_FIX_METADATA or not OWUI_URL or not OWUI_KEY or not kb_id: return False if not force and kb_id in _OWUI_TEMPLATE_PATCHED: return False payload_variants: list[object] = [] template_payload = _owui_metadata_template_payload() payload_variants.append({"metadata_template": template_payload}) if template_payload not in ({}, "", None): payload_variants.append({"metadata_template": {}}) payload_variants.append({"metadata_template": None}) headers = {**owui_headers(), "Content-Type": "application/json"} url = f"{OWUI_URL}/api/v1/knowledge/{kb_id}" success_codes = {200, 201, 202, 204} for payload in payload_variants: try: body = orjson.dumps(payload) except Exception: body = json.dumps(payload).encode("utf-8") for method in ("PATCH", "PUT"): try: resp = requests.request(method, url, headers=headers, data=body, timeout=15) except Exception: continue if resp.status_code in success_codes: print(f"[owui] metadata template adjusted via {method} for KB {kb_id}", flush=True) _OWUI_TEMPLATE_PATCHED.add(kb_id) return True return False # ---------- Media normalisation helpers ---------- VIDEO_ENCODER_MAP = { "hevc": "libx265", "h265": "libx265", "h.265": "libx265", "h264": "libx264", "h.264": "libx264", "av1": "libaom-av1", } AUDIO_ENCODER_MAP = { "mp3": "libmp3lame", "libmp3lame": "libmp3lame", "aac": "aac", "libfdk_aac": "libfdk_aac", "opus": "libopus", "flac": "flac", } def _resolve_video_encoder(codec: str) -> str: key = (codec or "").lower() return VIDEO_ENCODER_MAP.get(key, codec or "libx265") def _resolve_audio_encoder(codec: str) -> str: key = (codec or "").lower() return AUDIO_ENCODER_MAP.get(key, codec or "libmp3lame") def _ffprobe_streams(path: Path) -> dict[str, str]: try: out = subprocess.check_output( ["ffprobe", "-v", "error", "-show_entries", "stream=codec_type,codec_name", "-of", "json", str(path)], text=True, ) data = json.loads(out) except Exception: return {} info: dict[str, str] = {"video": "", "audio": ""} for stream in data.get("streams", []) or []: ctype = (stream.get("codec_type") or "").lower() cname = (stream.get("codec_name") or "").lower() if ctype == "video" and not info["video"]: info["video"] = cname elif ctype == "audio" and not info["audio"]: info["audio"] = cname return info def _unique_backup_path(path: Path) -> Path: base = path.name candidate = path.parent / f"{base}.orig" if not candidate.exists(): return candidate counter = 1 while True: candidate = path.parent / f"{base}.orig{counter}" if not candidate.exists(): return candidate counter += 1 def _is_sidecar_name(name: str, base_stem: str, base_name: str) -> bool: exact_suffixes = [".info.json", ".nfo", ".jpg", ".jpeg", ".png", ".webp", ".prov.json"] for suf in exact_suffixes: if name == f"{base_name}{suf}" or name == f"{base_stem}{suf}": return True text_exts = {".srt", ".vtt", ".txt", ".json", ".md"} for ext in text_exts: if name == f"{base_stem}{ext}" or name == f"{base_name}{ext}": return True if name.startswith(f"{base_stem}.") and name.endswith(ext): return True if name.startswith(f"{base_name}.") and name.endswith(ext): return True return False def rename_media_sidecars(src: Path, dst: Path, skip: set[Path] | None = None) -> None: if src == dst: return skip = skip or set() parent = src.parent stem_src, stem_dst = src.stem, dst.stem name_src, name_dst = src.name, dst.name for f in list(parent.glob("*")): if not f.exists() or f == src or f == dst or f in skip: continue new_name = None fname = f.name if not _is_sidecar_name(fname, stem_src, name_src): continue if fname.startswith(name_src): new_name = name_dst + fname[len(name_src):] elif fname.startswith(stem_src): new_name = stem_dst + fname[len(stem_src):] if not new_name: continue target = parent / new_name if target.exists(): continue try: f.rename(target) except Exception: pass def _finalize_normalized_output(original: Path, final_path: Path, tmp_path: Path) -> Path: if final_path.exists(): try: final_path.unlink() except Exception: pass if MEDIA_NORMALIZE_KEEP_ORIGINAL: try: backup = _unique_backup_path(original) if original.exists(): original.rename(backup) except Exception as e: print(f"[normalize] could not preserve original for {original}: {e}", flush=True) try: if original.exists(): original.unlink() except Exception: pass else: try: if original.exists(): original.unlink() except Exception: pass os.replace(tmp_path, final_path) return final_path def _normalize_video_file(path: Path, info: dict[str, str]) -> Path: current_codec = (info.get("video") or "").lower() ext_match = path.suffix.lower() == VIDEO_NORMALIZE_EXTENSION if current_codec == VIDEO_NORMALIZE_CODEC and ext_match: return path encoder = _resolve_video_encoder(VIDEO_NORMALIZE_CODEC) final_path = path if ext_match else path.with_suffix(VIDEO_NORMALIZE_EXTENSION) tmp_path = final_path.parent / f"{final_path.stem}.tmp{VIDEO_NORMALIZE_EXTENSION}" if tmp_path.exists(): tmp_path.unlink() cmd = [ "ffmpeg", "-nostdin", "-y", "-i", str(path), "-map", "0", "-c:v", encoder, ] if VIDEO_NORMALIZE_PRESET: cmd.extend(["-preset", VIDEO_NORMALIZE_PRESET]) if VIDEO_NORMALIZE_TUNE: cmd.extend(["-tune", VIDEO_NORMALIZE_TUNE]) if VIDEO_NORMALIZE_CRF: cmd.extend(["-crf", VIDEO_NORMALIZE_CRF]) if info.get("audio"): if VIDEO_NORMALIZE_AUDIO_CODEC == "copy": cmd.extend(["-c:a", "copy"]) else: cmd.extend(["-c:a", _resolve_audio_encoder(VIDEO_NORMALIZE_AUDIO_CODEC)]) if VIDEO_NORMALIZE_AUDIO_BITRATE: cmd.extend(["-b:a", VIDEO_NORMALIZE_AUDIO_BITRATE]) else: cmd.append("-an") cmd.extend(["-c:s", "copy", str(tmp_path)]) print(f"[normalize] video -> {final_path.name} codec={VIDEO_NORMALIZE_CODEC}", flush=True) try: subprocess.check_call(cmd) except subprocess.CalledProcessError as e: if tmp_path.exists(): tmp_path.unlink() raise RuntimeError(f"ffmpeg video normalize failed: {e}") rename_media_sidecars(path, final_path, skip={tmp_path}) return _finalize_normalized_output(path, final_path, tmp_path) def _normalize_audio_file(path: Path, info: dict[str, str]) -> Path: current_codec = (info.get("audio") or "").lower() ext_match = path.suffix.lower() == AUDIO_NORMALIZE_EXTENSION target_encoder = _resolve_audio_encoder(AUDIO_NORMALIZE_CODEC) equivalent_codecs = {AUDIO_NORMALIZE_CODEC.lower(), target_encoder.lower()} if target_encoder.lower() == "libmp3lame": equivalent_codecs.add("mp3") if target_encoder.lower() in {"aac", "libfdk_aac"}: equivalent_codecs.update({"aac", "mp4a"}) if current_codec in equivalent_codecs and ext_match: return path final_path = path if ext_match else path.with_suffix(AUDIO_NORMALIZE_EXTENSION) tmp_path = final_path.parent / f"{final_path.stem}.tmp{AUDIO_NORMALIZE_EXTENSION}" if tmp_path.exists(): tmp_path.unlink() cmd = [ "ffmpeg", "-nostdin", "-y", "-i", str(path), "-vn", "-c:a", target_encoder, ] if AUDIO_NORMALIZE_BITRATE: cmd.extend(["-b:a", AUDIO_NORMALIZE_BITRATE]) if AUDIO_NORMALIZE_CHANNELS: cmd.extend(["-ac", AUDIO_NORMALIZE_CHANNELS]) cmd.append(str(tmp_path)) print(f"[normalize] audio -> {final_path.name} codec={AUDIO_NORMALIZE_CODEC}", flush=True) try: subprocess.check_call(cmd) except subprocess.CalledProcessError as e: if tmp_path.exists(): tmp_path.unlink() raise RuntimeError(f"ffmpeg audio normalize failed: {e}") rename_media_sidecars(path, final_path, skip={tmp_path}) return _finalize_normalized_output(path, final_path, tmp_path) def normalize_media_file(path: Path) -> Path: if not MEDIA_NORMALIZE or not path.exists() or not path.is_file(): return path try: info = _ffprobe_streams(path) except Exception as e: print(f"[normalize] ffprobe failed for {path}: {e}", flush=True) return path try: if info.get("video"): return _normalize_video_file(path, info) if info.get("audio"): return _normalize_audio_file(path, info) except Exception as e: print(f"[normalize] failed for {path}: {e}", flush=True) return path def owui_get_or_create_kb(): """Return a KB id for OWUI_KB without creating duplicates. Honors OPENWEBUI_KB_ID, and tolerates both list and {"data": ...} response shapes. """ if not OWUI_URL or not OWUI_KEY: return None # 1) If an explicit id is provided, trust it forced = os.getenv("OPENWEBUI_KB_ID", "").strip() if forced: return forced # 2) List and try to find an exact name match try: r = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15) r.raise_for_status() body = r.json() items = body if isinstance(body, list) else body.get("data", []) # Prefer exact normalized name match; if multiple, pick the most recently updated kb_target = _norm(OWUI_KB) matches = [kb for kb in items if _norm(kb.get("name")) == kb_target] if matches: try: matches.sort(key=lambda k: k.get("updated_at") or 0, reverse=True) except Exception: pass return matches[0].get("id") except Exception: pass # 3) Create only if not found cr = requests.post( f"{OWUI_URL}/api/v1/knowledge/create", headers={**owui_headers(), "Content-Type": "application/json"}, data=orjson.dumps({"name": OWUI_KB, "description": "All local content indexed by podx"}), timeout=15, ) cr.raise_for_status() created = cr.json() if isinstance(created, dict) and created.get("id"): return created["id"] if isinstance(created, dict) and created.get("data") and created["data"].get("id"): return created["data"]["id"] # Fallback: try to resolve again by name try: rr = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15) rr.raise_for_status() body = rr.json() items = body if isinstance(body, list) else body.get("data", []) kb_target = _norm(OWUI_KB) for kb in items: if _norm(kb.get("name")) == kb_target: return kb.get("id") except Exception: pass return None def owui_upload_and_attach(path: Path, kb_id: str): if OWUI_AUTO_FIX_METADATA: owui_fix_metadata_template(kb_id) with open(path, "rb") as f: r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10) r.raise_for_status() up = r.json() file_id = (up.get("id") or (up.get("data") or {}).get("id")) if not file_id: raise RuntimeError(f"OWUI upload: could not get file id from response: {up}") payload = {"file_id": file_id} attach_headers = {**owui_headers(), "Content-Type": "application/json"} body = orjson.dumps(payload) r = requests.post( f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add", headers=attach_headers, data=body, timeout=180, ) if r.status_code == 400 and OWUI_AUTO_FIX_METADATA: txt = "" try: txt = r.text.lower() except Exception: txt = str(r.content).lower() if "metadata" in txt and owui_fix_metadata_template(kb_id, force=True): r = requests.post( f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add", headers=attach_headers, data=body, timeout=180, ) r.raise_for_status() try: time.sleep(0.5) except Exception: pass return True def publish_to_openwebui(paths): if not OWUI_URL or not OWUI_KEY: return try: kb_id = owui_get_or_create_kb() if not kb_id: print("[owui] KB resolve failed; skipping attach to avoid accidental duplicates", flush=True) return owui_fix_metadata_template(kb_id) for p in paths: p = Path(p) if not p.exists(): continue try: owui_upload_and_attach(p, kb_id) except Exception as e: log({"url": str(p), "status": "owui_error", "error": str(e)}) except Exception as e: log({"status": "owui_error", "error": str(e)}) # --------- Post-transcribe pipeline and job/queue helpers --------- def _postprocess_after_transcribe(media_path: Path, base: Path): """Common steps after we have a `base` transcript path: index, publish, NFO, artwork.""" try: index_meili(base.with_suffix(".json")) except Exception as e: print(f"[post] meili index failed: {e}", flush=True) try: publish_to_openwebui([base.with_suffix(".txt")]) except Exception as e: print(f"[post] owui publish failed: {e}", flush=True) # Build metadata using existing helper try: title = media_path.stem fallback = { "title": title, "episode_title": title, "show": media_path.parent.name, "description": "", "pubdate": _extract_date_from_stem(title), "duration_sec": media_duration_seconds(media_path), "image": "", "guid": "", } meta = build_meta_from_sources(media_path, media_path.parent.name, fallback, ep=None) ttxt = (TRN / title).with_suffix(".txt").read_text(encoding="utf-8") write_episode_nfo(media_path, meta, ttxt) try: save_episode_artwork(meta.get("image"), media_path, meta.get("show")) except Exception: pass except Exception as e: print(f"[post] NFO write failed: {e}", flush=True) def transcribe_job(path_str: str): """RQ job: heavy transcription only. Safe to import by dotted path 'worker.transcribe_job'.""" # Do NOT block when paused; exit quickly so CPU-heavy work stops ASAP. if transcribe_paused(): print(f"[pause] transcribe_job: pause is active; skipping start for {path_str}", flush=True) return "paused" p = Path(path_str) try: base = transcribe(p) except PauseInterrupt: print(f"[pause] transcribe_job: paused mid-run for {p}", flush=True) return "paused" _postprocess_after_transcribe(p, base) return str(base) def enqueue_transcribe(path: Path) -> bool: """Enqueue a transcription job to the 'transcribe' queue. Returns True on success.""" try: if transcribe_paused(): print(f"[queue] pause flag present; enqueuing job for {path} but workers will wait", flush=True) conn = Redis.from_url(REDIS_URL) q = Queue("transcribe", connection=conn, default_timeout=60*60*24) # Use dotted path so workers in other processes can import q.enqueue("worker.transcribe_job", str(path), job_timeout=60*60*24) print(f"[queue] enqueued transcribe job for {path}", flush=True) return True except Exception as e: print(f"[queue] enqueue failed, will transcribe inline: {e}", flush=True) return False def handle_local_file(path_str: str): """Transcribe & index a local media file that already exists in /library. If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper. Safe to call repeatedly; it skips if transcript JSON already exists. """ try: p = Path(path_str) if not p.exists(): log({"url": path_str, "status": "error", "error": "file_not_found"}) return normalized = normalize_media_file(p) if normalized != p: print(f"[normalize] local media: {p.name} -> {normalized.name}", flush=True) p = normalized path_str = str(p) if WORKER_MODE == "transcribe": print(f"[mode] transcribe-only worker handling local file: {p}", flush=True) title = p.stem base_json = TRN / f"{title}.json" if base_json.exists(): log({"url": path_str, "status": "skip", "reason": "already_transcribed"}) return info = {"url": path_str, "status": "transcribing", "title": title, "uploader": p.parent.name, "date": "", "path": str(p), "progress": 0} log(info) # 0) Try RSS resolver first: if episode with transcript exists, use it (skip Whisper) try: ep = match_media_to_rss(p) except Exception as _e: ep = None if ep: base = use_rss_transcript(p, ep) if base: index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) log({**info, **{"status": "done", "note": "used_rss_transcript"}}) return # 1) Prefer an existing transcript sidecar if present sidecar = find_sidecar_transcript(p) if sidecar: plain = transcript_text_from_file(sidecar) lang = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en" base = write_plain_transcript(p, plain, language=lang) ensure_sidecar_next_to_media(sidecar, p, lang=lang) index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) try: # Use info.json (if present) to enrich metadata fallback = { "title": title, "episode_title": title, "show": p.parent.name, "description": "", "pubdate": _extract_date_from_stem(title), "duration_sec": media_duration_seconds(p), "image": "", "guid": "", } meta = build_meta_from_sources(p, p.parent.name, fallback, ep=None) ttxt = base.with_suffix(".txt").read_text(encoding="utf-8") write_episode_nfo(p, meta, ttxt) # Try to fetch and save artwork locally try: save_episode_artwork(meta.get("image"), p, meta.get("show")) except Exception: pass except Exception as e: print(f"[post] NFO write failed: {e}", flush=True) log({**info, **{"status": "done", "note": "used_existing_transcript"}}) return # 1.5) Reuse a transcript that exists in the repository for a matching episode repo_json = find_repo_transcript_for_media(p) if repo_json: base = reuse_repo_transcript(p, repo_json) if base: index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) try: data = json.loads((base.with_suffix(".json")).read_text(encoding="utf-8")) # Start with repo metadata, then enrich from yt-dlp info.json if any meta_repo = { "title": data.get("title") or title, "episode_title": data.get("title") or title, "show": data.get("show") or p.parent.name, "description": data.get("description") or "", "pubdate": data.get("pubdate") or _extract_date_from_stem(title), "duration_sec": media_duration_seconds(p), "image": data.get("image"), "guid": data.get("guid") or data.get("id"), } meta = build_meta_from_sources(p, p.parent.name, meta_repo, ep=None) ttxt = base.with_suffix(".txt").read_text(encoding="utf-8") write_episode_nfo(p, meta, ttxt) try: save_episode_artwork(meta.get("image"), p, meta.get("show")) except Exception: pass except Exception as e: print(f"[post] NFO write failed: {e}", flush=True) log({**info, **{"status": "done", "note": "reused_repo_transcript"}}) return # 2) Otherwise, run transcription (offload to queue if enabled and not in transcribe-only worker) # If paused, do not block; either enqueue (so worker will pause) or skip now. if transcribe_paused(): if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p): log({**info, **{"status": "queued_transcribe"}}) return log({**info, **{"status": "paused"}}) print(f"[pause] handle_local_file: pause active; not starting {p}", flush=True) return if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(p): log({**info, **{"status": "queued_transcribe"}}) return base = transcribe(p) _postprocess_after_transcribe(p, base) log({**info, **{"status": "done"}}) except Exception as e: log({"url": path_str, "status": "error", "error": str(e)}) raise # --- Refresh sidecar metadata and subtitles for an already-downloaded media file --- def refresh_media(path_str: str): """ Refresh sidecar metadata (info.json, thumbnail) and subtitles for an already-downloaded media file. Requires a companion .info.json next to the media (to supply the original URL). No media re-download. """ try: p = Path(path_str) if not p.exists() or not p.is_file(): log({"url": path_str, "status": "error", "error": "file_not_found"}) return normalized = normalize_media_file(p) if normalized != p: print(f"[normalize] refresh media: {p.name} -> {normalized.name}", flush=True) p = normalized path_str = str(p) # Locate existing info.json to get the original URL info_json = None for cand in [p.parent / f"{p.name}.info.json", p.parent / f"{p.stem}.info.json"]: if cand.exists(): info_json = cand break if not info_json: log({"path": str(p), "status": "refresh-skip", "reason": "no_info_json"}) print(f"[refresh] skip: no info.json next to {p}", flush=True) return info = load_info_json(info_json) or {} url = info.get("webpage_url") or info.get("original_url") or info.get("url") if not url: log({"path": str(p), "status": "refresh-skip", "reason": "no_url_in_info"}) print(f"[refresh] skip: no URL in {info_json}", flush=True) return # Prepare yt-dlp command to refresh sidecars only, writing files exactly next to the media outtmpl = str(p.with_suffix(".%(ext)s")) sub_langs = os.getenv("YTDLP_SUBS_LANGS", "en.*,en") cmd = [ "yt-dlp", "--skip-download", "--write-info-json", "--write-thumbnail", "--convert-thumbnails", "jpg", "--write-subs", "--write-auto-subs", "--sub-langs", sub_langs, "--convert-subs", "srt", "-o", outtmpl, url, ] print(f"[refresh] refreshing sidecars for {p} via yt-dlp", flush=True) try: subprocess.check_call(cmd) except subprocess.CalledProcessError as e: print(f"[refresh] yt-dlp failed: {e}", flush=True) raise # Ensure language-suffixed SRT exists (Plex-friendly) if any subs were fetched try: # Pick any .srt just fetched that matches base for s in p.parent.glob(f"{p.stem}*.srt"): # If it's already lang-suffixed, keep; also copy to .en.srt when only plain .srt exists if s.name == f"{p.stem}.srt": shutil.copy2(s, p.with_suffix(".en.srt")) except Exception: pass # Rebuild NFO using fresh info.json (and RSS if available) try: # Try RSS match to enrich metadata (non-fatal if not present) ep = None try: ep = match_media_to_rss(p) except Exception: ep = None fallback = { "title": p.stem, "episode_title": p.stem, "show": p.parent.name, "description": "", "pubdate": _extract_date_from_stem(p.stem), "duration_sec": media_duration_seconds(p), "image": "", "guid": "", } meta = build_meta_from_sources(p, p.parent.name, fallback, ep) # Save local artwork too try: save_episode_artwork(meta.get("image"), p, meta.get("show")) except Exception: pass # If a transcript already exists, include it in the NFO plot preview ttxt_path = (TRN / p.stem).with_suffix(".txt") ttxt = ttxt_path.read_text(encoding="utf-8") if ttxt_path.exists() else None write_episode_nfo(p, meta, ttxt) except Exception as e: print(f"[refresh] NFO/artwork update failed: {e}", flush=True) log({"path": str(p), "status": "refresh-done"}) print(f"[refresh] done for {p}", flush=True) except Exception as e: log({"path": path_str, "status": "error", "error": str(e)}) raise def handle_web(url: str): if not _mode_allows("web"): log({"url": url, "status": "skip", "reason": "mode_transcribe_only"}) print(f"[mode] transcribe-only: skipping web snapshot job: {url}", flush=True) return info = {"url": url, "status":"web-downloading", "title":"", "uploader":"", "date":"", "path":""} log(info) base, title, domain, date, text = save_web_snapshot(url) info.update({"title": title, "uploader": domain, "date": date, "path": str(base.with_suffix('.html'))}) log({**info, **{"status":"web-indexing"}}) index_web(base, title, domain, date, text, url) push = [p for p in [base.with_suffix('.txt'), base.with_suffix('.html')] if p.exists()] publish_to_openwebui(push) log({**info, **{"status":"done"}}) def handle_url(url: str): try: # In transcribe-only mode, refuse non-local/download jobs if not _mode_allows("download"): # Only permit local file paths in this mode if url.startswith("/") or url.startswith("file://"): return handle_local_file(url[7:] if url.startswith("file://") else url) log({"url": url, "status": "skip", "reason": "mode_transcribe_only"}) print(f"[mode] transcribe-only: skipping non-local job: {url}", flush=True) return # If a local file path (or file:// URL) is provided, process it directly if url.startswith("file://"): return handle_local_file(url[7:]) if url.startswith("/") and Path(url).exists(): return handle_local_file(url) if not is_media_url(url): handle_web(url) return info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""} log({**info, **{"status":"downloading"}}) files = yt_dlp(url, TMP) for f in files: parts = f.relative_to(TMP).parts uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown" dest_dir = LIB / uploader dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / sanitize(f.name) shutil.move(str(f), dest) # Move companion files produced by yt-dlp (info.json, thumbnail, subtitles) try: companions = find_companion_files(f) # info.json -> prefer ".info.json", fallback to ".info.json" if companions.get("info") and companions["info"].exists(): dest_info = dest.parent / f"{dest.name}.info.json" try: shutil.move(str(companions["info"]), dest_info) except Exception: # fallback naming without extension dest_info2 = dest.parent / f"{dest.stem}.info.json" try: shutil.move(str(companions['info']), dest_info2) except Exception: pass # thumbnail -> ".jpg" if companions.get("thumb") and companions["thumb"].exists(): try: shutil.move(str(companions["thumb"]), str(dest.with_suffix(".jpg"))) except Exception: pass # subtitles -> preserve language suffix: "" for s in companions.get("subs", []): if not s.exists(): continue suffix_tail = "" s_name = s.name f_stem = f.stem if s_name.startswith(f_stem): suffix_tail = s_name[len(f_stem):] # includes leading dot if present else: suffix_tail = s.suffix dest_sub = dest.parent / f"{dest.stem}{suffix_tail}" try: shutil.move(str(s), str(dest_sub)) except Exception: pass except Exception: pass normalized_dest = normalize_media_file(dest) if normalized_dest != dest: print(f"[normalize] download media: {dest.name} -> {normalized_dest.name}", flush=True) dest = normalized_dest info.update({"title": dest.stem, "uploader": uploader, "date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""), "path": str(dest)}) log({**info, **{"status":"transcribing", "progress": 0}}) # Try RSS transcript resolver first ep = None try: ep = match_media_to_rss(dest) except Exception: ep = None if ep: base = use_rss_transcript(dest, ep) else: base = None # 1.5) If we didn't get an RSS transcript and there is a matching one already in the repo, reuse it if not base: repo_json = find_repo_transcript_for_media(dest) if repo_json: base = reuse_repo_transcript(dest, repo_json) if not base: # If paused, do not block; either enqueue (so worker will pause) or skip now. if transcribe_paused(): if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest): log({**info, **{"status": "queued_transcribe"}}) continue log({**info, **{"status": "paused"}}) print(f"[pause] handle_url: pause active; not starting {dest}", flush=True) continue if OFFLOAD_TRANSCRIBE and WORKER_MODE != "transcribe" and enqueue_transcribe(dest): log({**info, **{"status": "queued_transcribe"}}) continue base = transcribe(dest) _postprocess_after_transcribe(dest, base) log({**info, **{"status":"done"}}) except Exception as e: log({"url": url, "status":"error", "error": str(e)}) raise