import os, subprocess, shutil, json, re, orjson, requests from pathlib import Path import math import difflib from faster_whisper import WhisperModel MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") MEILI_KEY = os.getenv("MEILI_KEY", "") LIB = Path(os.getenv("LIBRARY_ROOT", "/library")) TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) TMP = Path(os.getenv("TMP_ROOT", "/tmpdl")) MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3") COMPUTE = os.getenv("WHISPER_PRECISION","int8") WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip() # RSS resolver config RSS_INDEX_PATH = Path(os.getenv("RSS_INDEX_PATH", "/transcripts/rss_index.json")) RSS_DURATION_TOLERANCE = int(os.getenv("RSS_DURATION_TOLERANCE", "150")) # seconds DEFAULT_TRANSCRIPT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en" OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/") OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "") OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library") TRN.mkdir(parents=True, exist_ok=True) LIB.mkdir(parents=True, exist_ok=True) TMP.mkdir(parents=True, exist_ok=True) # Lazy Whisper model loader so the worker can start even if model download/setup is slow _model = None def get_model(): global _model if _model is None: _model = WhisperModel(MODEL_NAME, compute_type=COMPUTE) return _model def log(feed): try: with open(TRN / "_feed.log", "a", encoding="utf-8") as f: f.write(orjson.dumps(feed).decode()+"\n") except Exception: pass def sanitize(name): return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip() # ---------- RSS transcript resolver ---------- def _normalize_title(t: str) -> str: t = (t or "").lower() t = re.sub(r"\s+", " ", t) # remove punctuation-ish t = re.sub(r"[^a-z0-9 _-]+", "", t) return t.strip() def _stem_without_date(stem: str) -> str: # drop leading YYYYMMDD - from filenames created by yt-dlp template m = re.match(r"^\d{8}\s*-\s*(.*)$", stem) return m.group(1) if m else stem def _extract_date_from_stem(stem: str) -> str | None: m = re.search(r"\b(\d{8})\b", stem) return m.group(1) if m else None def _best_title_match(title: str, candidates: list[str]) -> tuple[str, float]: """Return (best_title, score 0..1) using difflib SequenceMatcher.""" if not candidates: return "", 0.0 norm_title = _normalize_title(title) best = ("", 0.0) for c in candidates: score = difflib.SequenceMatcher(None, norm_title, _normalize_title(c)).ratio() if score > best[1]: best = (c, score) return best def _load_rss_index() -> list[dict]: try: if RSS_INDEX_PATH.exists(): data = json.loads(RSS_INDEX_PATH.read_text(encoding="utf-8")) # supports {"episodes":[...]} or a flat list if isinstance(data, dict) and "episodes" in data: return data["episodes"] or [] if isinstance(data, list): return data except Exception as e: print(f"[resolver] failed to load RSS index: {e}", flush=True) return [] def match_media_to_rss(media_path: Path) -> dict | None: """Try to match a local media file to an RSS episode entry.""" episodes = _load_rss_index() if not episodes: return None stem = media_path.stem title_no_date = _stem_without_date(stem) file_date = _extract_date_from_stem(stem) # duration tolerance media_dur = media_duration_seconds(media_path) # Candidates: filter by date if present, else all if file_date: pool = [e for e in episodes if (str(e.get("date", "")) == file_date or str(e.get("pubdate", "")) == file_date)] if not pool: pool = episodes else: pool = episodes # Pick best by (title similarity, duration proximity) best_ep, best_score = None, -1.0 for ep in pool: ep_title = ep.get("title") or ep.get("itunes_title") or "" sim = _best_title_match(title_no_date, [ep_title])[1] dur = float(ep.get("duration_sec") or ep.get("duration") or 0.0) dur_ok = True if media_dur and dur: dur_ok = abs(media_dur - dur) <= RSS_DURATION_TOLERANCE score = sim + (0.1 if dur_ok else 0.0) if score > best_score: best_score, best_ep = score, ep if best_ep and best_score >= 0.5: print(f"[resolver] matched '{stem}' -> '{best_ep.get('title','')}' score={best_score:.2f}", flush=True) return best_ep return None def _choose_transcript_url(ep: dict) -> tuple[str, str] | tuple[None, None]: """Return (url, kind) preferring txt, vtt, then srt. 'kind' in {'txt','vtt','srt'}.""" # unified structure from rss_ingest.py: ep["transcripts"] = [{"url":..., "type": ...}, ...] items = ep.get("transcripts") or [] # some ingesters store separate keys if not items: for key, kind in [("transcript_txt","txt"), ("transcript_vtt","vtt"), ("transcript_srt","srt")]: if ep.get(key): items.append({"url": ep[key], "type": kind}) # preference order for kind in ["txt", "vtt", "srt"]: for it in items: t = (it.get("type") or "").lower() u = it.get("url") or "" if u and (kind in t or (kind == "txt" and t in ["text","plain","text/plain"]) or (kind in u.lower())): return u, kind return (None, None) def fetch_rss_transcript(ep: dict, dest_dir: Path) -> Path | None: """Download transcript to dest_dir and return local Path; convert VTT->SRT if needed.""" url, kind = _choose_transcript_url(ep) if not url: return None dest_dir.mkdir(parents=True, exist_ok=True) # filename from episode title safe = sanitize(ep.get("title") or ep.get("guid") or "episode") path = dest_dir / f"{safe}.{kind if kind!='txt' else 'txt'}" try: r = requests.get(url, timeout=30) r.raise_for_status() mode = "wb" if kind in ("vtt","srt") else "w" if mode == "wb": path.write_bytes(r.content) else: path.write_text(r.text, encoding="utf-8") print(f"[resolver] downloaded transcript ({kind}) from {url}", flush=True) return path except Exception as e: print(f"[resolver] failed to fetch transcript: {e}", flush=True) return None def use_rss_transcript(media_path: Path, ep: dict) -> Path | None: """Create standard transcript artifacts from an RSS transcript (txt/vtt/srt).""" # Prefer direct download; else if rss_ingest already saved a local file path, try that. sidecar = None local_hint = ep.get("transcript_local") if local_hint: p = Path(local_hint) if p.exists(): sidecar = p if sidecar is None: sidecar = fetch_rss_transcript(ep, TMP) if not sidecar or not sidecar.exists(): return None # Convert to plain text plain = transcript_text_from_file(sidecar) lang = (ep.get("language") or ep.get("lang") or DEFAULT_TRANSCRIPT_LANG).split("-")[0] base = write_plain_transcript(media_path, plain, language=lang) # Place an SRT next to video for Plex ensure_sidecar_next_to_media(sidecar, media_path, lang=lang) # Write provenance sidecar (base.with_suffix(".prov.json")).write_bytes(orjson.dumps({ "source": "rss", "feed": ep.get("feed_url"), "guid": ep.get("guid"), "episode_title": ep.get("title"), "transcript_kind": sidecar.suffix.lower().lstrip("."), "transcript_url": _choose_transcript_url(ep)[0] or "", })) return base def find_sidecar_transcript(media_path: Path) -> Path | None: """Return a .txt/.srt/.vtt transcript file sitting next to media, if any. Tries common variants including language-suffixed SRT/VTT. """ candidates: list[Path] = [] # exact same stem in same folder for ext in [".txt", ".srt", ".vtt"]: p = media_path.parent / (media_path.stem + ext) if p.exists(): candidates.append(p) # language-suffixed near the media file (e.g., .en.srt) for ext in [".srt", ".vtt"]: p = media_path.with_suffix(f".en{ext}") if p.exists() and p not in candidates: candidates.append(p) return candidates[0] if candidates else None # ---------- Transcript repository reuse helpers ---------- def find_repo_transcript_for_media(media_path: Path) -> Path | None: """Search the transcript repository (/transcripts) for an existing transcript that likely belongs to this media file (match by YYYYMMDD in filename and/or fuzzy title similarity). Returns a path to a matching .json if found.""" try: stem = media_path.stem title_no_date = _stem_without_date(stem) file_date = _extract_date_from_stem(stem) best_json, best_score = None, 0.0 for j in TRN.glob("*.json"): try: data = json.loads(j.read_text(encoding="utf-8")) except Exception: continue other_file = Path(data.get("file", "")) other_stem = other_file.stem if other_file else j.stem other_date = _extract_date_from_stem(other_stem) # If both have dates and they differ a lot, skip if file_date and other_date and file_date != other_date: continue # Compare titles (without dates) sim = difflib.SequenceMatcher( None, _normalize_title(title_no_date), _normalize_title(_stem_without_date(other_stem)), ).ratio() # Nudge score when dates match if file_date and other_date and file_date == other_date: sim += 0.1 if sim > best_score: best_score, best_json = sim, j # Require a reasonable similarity return best_json if best_json and best_score >= 0.60 else None except Exception: return None def reuse_repo_transcript(media_path: Path, repo_json: Path) -> Path | None: """Copy/retarget an existing transcript JSON/TXT (and make SRT/VTT if possible) from the repository so that it belongs to the provided media_path. Returns the new base path in /transcripts or None.""" try: # load the source transcript data = json.loads(repo_json.read_text(encoding="utf-8")) src_base = TRN / Path(repo_json).stem src_txt = src_base.with_suffix(".txt") src_srt = src_base.with_suffix(".srt") src_vtt = src_base.with_suffix(".vtt") # write the retargeted artifacts new_title = media_path.stem new_base = TRN / new_title new_base.parent.mkdir(parents=True, exist_ok=True) # update file path data["file"] = str(media_path) (new_base.with_suffix(".json")).write_bytes(orjson.dumps(data)) # copy or synthesize TXT if src_txt.exists(): shutil.copy2(src_txt, new_base.with_suffix(".txt")) else: # fallback: concatenate segments txt = " ".join(s.get("text", "") for s in data.get("segments", [])) (new_base.with_suffix(".txt")).write_text(txt, encoding="utf-8") # copy SRT/VTT if present; otherwise synthesize SRT from segments if src_srt.exists(): shutil.copy2(src_srt, new_base.with_suffix(".srt")) else: # synthesize SRT def fmt_ts(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') with open(new_base.with_suffix(".srt"), "w", encoding="utf-8") as srt: for i, s in enumerate(data.get("segments", []), 1): srt.write(f"{i}\n{fmt_ts(s.get('start',0.0))} --> {fmt_ts(s.get('end',0.0))}\n{s.get('text','').strip()}\n\n") if src_vtt.exists(): shutil.copy2(src_vtt, new_base.with_suffix(".vtt")) else: # synthesize VTT from segments def fmt_ts_vtt(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}" with open(new_base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: vtt.write("WEBVTT\n\n") for s in data.get("segments", []): vtt.write(f"{fmt_ts_vtt(s.get('start',0.0))} --> {fmt_ts_vtt(s.get('end',0.0))} \n{s.get('text','').strip()}\n\n") # ensure sidecar next to media try: lang = (data.get("language") or DEFAULT_TRANSCRIPT_LANG).split("-")[0] ensure_sidecar_next_to_media(new_base.with_suffix(".srt"), media_path, lang=lang) except Exception: pass return new_base except Exception as e: print(f"[resolver] failed to reuse repo transcript: {e}", flush=True) return None def transcript_text_from_file(path: Path) -> str: """Extract plain text from .txt/.srt/.vtt by stripping timestamps and counters.""" try: raw = path.read_text(encoding="utf-8", errors="ignore") except Exception: raw = path.read_text(errors="ignore") if path.suffix.lower() == ".txt": return raw.strip() # For SRT/VTT, drop timestamp lines, cue numbers and headers lines: list[str] = [] for line in raw.splitlines(): ls = line.strip() if not ls: continue if "-->" in ls: # timestamp line continue if ls.upper().startswith("WEBVTT"): continue if re.match(r"^\d+$", ls): # cue index continue lines.append(ls) return " ".join(lines) def ensure_sidecar_next_to_media(sidecar: Path, media_path: Path, lang: str = "en") -> None: """Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed. If the sidecar is .txt, do nothing.""" try: if sidecar.suffix.lower() == ".txt": return if sidecar.suffix.lower() == ".srt": dst = media_path.with_suffix(f".{lang}.srt") shutil.copy2(sidecar, dst) elif sidecar.suffix.lower() == ".vtt": tmp_srt = sidecar.with_suffix(".srt") subprocess.run(["ffmpeg", "-nostdin", "-y", "-i", str(sidecar), str(tmp_srt)], check=True) dst = media_path.with_suffix(f".{lang}.srt") shutil.move(str(tmp_srt), dst) except Exception as e: print(f"[post] sidecar copy/convert failed: {e}", flush=True) def write_plain_transcript(media_path: Path, text: str, language: str = "en") -> Path: """Write minimal transcript artifacts (.txt + .json) from plain text (no timestamps).""" title = media_path.stem base = TRN / title base.parent.mkdir(parents=True, exist_ok=True) (base.with_suffix(".txt")).write_text(text, encoding="utf-8") (base.with_suffix(".json")).write_bytes(orjson.dumps({ "file": str(media_path), "language": language, "segments": [{"start": 0.0, "end": 0.0, "text": text}] })) return base def yt_dlp(url, outdir): # 1) Normalize YouTube Music URLs to standard YouTube yurl = url if 'music.youtube.com' in yurl: yurl = yurl.replace('music.youtube.com', 'www.youtube.com') outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s") base_cmd = [ "yt-dlp", "-o", outtmpl, "-f", "bv*+ba/best", "-x", "--audio-format", "m4a", "--write-thumbnail", "--no-playlist", "--no-warnings", "--restrict-filenames", ] # 3) Optional cookies (set YTDLP_COOKIES=/path/to/cookies.txt in .env and mount it) cookies_path = os.getenv("YTDLP_COOKIES", "").strip() if cookies_path: base_cmd += ["--cookies", cookies_path] # Primary attempt try: subprocess.check_call(base_cmd + [yurl]) except subprocess.CalledProcessError: # 2) Retry with Android client + mobile UA retry_cmd = base_cmd + [ "--extractor-args", "youtube:player_client=android", "--user-agent", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Mobile Safari/537.36", yurl, ] subprocess.check_call(retry_cmd) media = (list(outdir.rglob("*.[mM][pP]4")) + list(outdir.rglob("*.mkv")) + list(outdir.rglob("*.m4a")) + list(outdir.rglob("*.mp3"))) return sorted(media, key=lambda p: p.stat().st_mtime)[-1:] def extract_audio(src: Path, outdir: Path) -> Path: """Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs).""" outdir.mkdir(parents=True, exist_ok=True) wav_path = outdir / (src.stem + ".wav") # Force audio-only, mono, 16kHz WAV cmd = [ "ffmpeg", "-nostdin", "-y", "-i", str(src), "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", str(wav_path), ] try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}") return wav_path def media_duration_seconds(path: Path) -> float: """Return duration in seconds using ffprobe; fallback to 0.0 on error.""" try: out = subprocess.check_output([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nokey=1:noprint_wrappers=1", str(path) ], stderr=subprocess.STDOUT, text=True).strip() return float(out) if out else 0.0 except Exception: return 0.0 def transcribe(media_path: Path): model = get_model() # 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases) wav = extract_audio(media_path, TMP) # 2) Language lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE # 3) Transcribe segments, info = model.transcribe(str(wav), vad_filter=True, language=lang) title = media_path.stem base = TRN / title # Determine duration for progress; use extracted WAV (accurate for transcription input) dur = media_duration_seconds(wav) or 0.0 last_pct = -1 segs, text_parts = [], [] for s in segments: seg = {"start": s.start, "end": s.end, "text": s.text} segs.append(seg) text_parts.append(s.text) # progress logging every +5% if dur > 0 and s.end is not None: pct = int(min(100, max(0, (s.end / dur) * 100))) if pct >= last_pct + 5: log({ "status": "transcribing", "path": str(media_path), "title": title, "progress": pct }) last_pct = pct # ensure we mark 100% on completion if last_pct < 100: log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100}) txt = " ".join(text_parts).strip() # Write transcript artifacts open(base.with_suffix(".json"), "wb").write(orjson.dumps({ "file": str(media_path), "language": info.language, "segments": segs })) open(base.with_suffix(".txt"), "w", encoding="utf-8").write(txt) def fmt_ts(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt: for i,s in enumerate(segs,1): srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n") with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: vtt.write("WEBVTT\n\n") for s in segs: vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n") # 4) Copy SRT next to media for Plex (language-suffixed) try: lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower() srt_src = base.with_suffix(".srt") srt_dst = media_path.with_suffix(f".{lang_code}.srt") shutil.copy2(srt_src, srt_dst) except Exception as e: print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True) # Optional: cleanup temporary WAV try: if wav.exists(): wav.unlink() except Exception: pass return base def index_meili(json_path: Path): doc = json.loads(open(json_path, "r", encoding="utf-8").read()) title = Path(doc["file"]).stem date = re.findall(r"\b(\d{8})\b", title) payload = { "id": title, "type": "podcast", "title": title, "date": date[0] if date else "", "source": str(Path(LIB, Path(doc["file"]).name)), "text": " ".join(s["text"] for s in doc.get("segments", [])), "segments": doc.get("segments", []), "meta": {"language": doc.get("language", "")} } import time for attempt in range(5): try: r = requests.post( f"{MEILI_URL}/indexes/library/documents", headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"}, data=orjson.dumps(payload), timeout=15, ) r.raise_for_status() break except Exception: if attempt == 4: raise time.sleep(2 * (attempt + 1)) import tldextract, trafilatura, requests as _requests def slugify(text): text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_') return text[:120] or 'page' def save_web_snapshot(url: str): r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"}) r.raise_for_status() html = r.text downloaded = trafilatura.load_html(html, url=url) text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or "" meta = trafilatura.metadata.extract_metadata(downloaded) or None title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r'