From e66414570a1e0b6fa426ea5722b6ec2d3dde3710 Mon Sep 17 00:00:00 2001 From: Tomas Kracmar Date: Sun, 7 Sep 2025 16:01:59 +0200 Subject: [PATCH] Podcast sync --- app/resolver.py | 105 ++++++++ app/rss_ingest.py | 627 ++++++++++++++++++++-------------------------- app/worker.py | 309 ++++++++++++++++++++++- 3 files changed, 689 insertions(+), 352 deletions(-) create mode 100644 app/resolver.py diff --git a/app/resolver.py b/app/resolver.py new file mode 100644 index 0000000..0bbb8ed --- /dev/null +++ b/app/resolver.py @@ -0,0 +1,105 @@ +# resolver.py +from __future__ import annotations +import json, os, re, subprocess +from pathlib import Path +from datetime import datetime, timezone +from typing import Optional, Dict, Any, Tuple, List + +try: + from rapidfuzz import fuzz, process +except Exception: + fuzz = None + process = None + +def _norm(s: str) -> str: + s = s.lower() + s = re.sub(r"[\[\]\(\)\{\}|_]+", " ", s) + s = re.sub(r"[^0-9a-zá-žà-ÿ\u00C0-\u024F\s]+", " ", s) # keep latin accents, cz/diacritics + s = re.sub(r"\s+", " ", s).strip() + return s + +def _title_from_filename(p: Path) -> str: + name = p.stem # drop extension + # common yt-dlp patterns like "YYYYMMDD - Title" + name = re.sub(r"^\d{8}\s*-\s*", "", name) + return name + +def _ffprobe_duration_seconds(p: Path) -> Optional[int]: + try: + out = subprocess.check_output([ + "ffprobe", "-v", "error", "-show_entries", "format=duration", + "-of", "default=nw=1:nk=1", str(p) + ], stderr=subprocess.STDOUT, text=True).strip() + return int(float(out)) + except Exception: + return None + +def load_index(index_path: Path) -> List[Dict[str, Any]]: + if not index_path.exists(): + return [] + with index_path.open("r", encoding="utf-8") as f: + data = json.load(f) + # expected per-item keys: + # title, pubdate_ts (int), duration_s (int or null), + # transcript_urls: {"srt": str|None, "vtt": str|None, "txt": str|None}, + # audio_url, guid, feed_url + return data if isinstance(data, list) else [] + +def match_episode( + media_path: Path, + index_items: List[Dict[str, Any]], + duration_tolerance_s: int = 120, + min_ratio: int = 82, + date_window_days: int = 14, +) -> Optional[Dict[str, Any]]: + title_guess = _title_from_filename(media_path) + tnorm = _norm(title_guess) + if not tnorm: + return None + + media_secs = _ffprobe_duration_seconds(media_path) + media_date = None + # try to parse upload date prefix in filename if present + m = re.search(r"(\d{8})", media_path.stem) + if m: + try: + media_date = datetime.strptime(m.group(1), "%Y%m%d").replace(tzinfo=timezone.utc) + except Exception: + media_date = None + + candidates = [] + for item in index_items: + item_title = _norm(item.get("title", "")) + if not item_title: + continue + ratio = (fuzz.token_sort_ratio(tnorm, item_title) if fuzz else (100 if tnorm == item_title else 0)) + if ratio < min_ratio: + continue + + # duration filter (if both known) + ok_duration = True + if media_secs and item.get("duration_s"): + ok_duration = abs(media_secs - int(item["duration_s"])) <= duration_tolerance_s + + # date window (if both known) + ok_date = True + if media_date and item.get("pubdate_ts"): + dt_item = datetime.fromtimestamp(int(item["pubdate_ts"]), tz=timezone.utc) + delta_days = abs((media_date - dt_item).days) + ok_date = delta_days <= date_window_days + + if ok_duration and ok_date: + candidates.append((ratio, item)) + + if not candidates: + return None + candidates.sort(key=lambda x: x[0], reverse=True) + return candidates[0][1] + +def choose_transcript_url(item: Dict[str, Any]) -> Optional[Tuple[str, str]]: + urls = item.get("transcript_urls") or {} + # prefer text/plain, then VTT, then SRT: + if urls.get("txt"): return (urls["txt"], "txt") + if urls.get("vtt"): return (urls["vtt"], "vtt") + if urls.get("srt"): return (urls["srt"], "srt") + return None \ No newline at end of file diff --git a/app/rss_ingest.py b/app/rss_ingest.py index 20bcc1a..fc1d46b 100644 --- a/app/rss_ingest.py +++ b/app/rss_ingest.py @@ -1,388 +1,315 @@ -#!/usr/bin/env python3 -""" -RSS ingester for PodX -- Reads feeds from env var RSS_FEEDS (comma-separated) *and*/or from a file (FEEDS_FILE, default /library/feeds.txt) -- Fetches RSS with ETag/Last-Modified caching to avoid re-downloading unchanged feeds -- Saves audio to LIBRARY_ROOT//. -- Saves transcript sidecars when `` links are present (prefers TextWithTimestamps → WebVTT → SRT → TXT) -- Enqueues `worker.handle_local_file` for indexing/transcription (worker will skip Whisper if a transcript sidecar exists) -- Keeps a small state JSON with per-feed ETag/Last-Modified and per-item processed GUIDs to avoid duplicate work - -Environment variables (with sane defaults): - MEILI_URL (unused directly here, but kept for parity) - REDIS_URL redis://redis:6379/0 - LIBRARY_ROOT /library - TRANSCRIPT_ROOT /transcripts - RSS_FEEDS "" (comma-separated list) - FEEDS_FILE /library/feeds.txt - RSS_SCAN_MINUTES 120 - RSS_ONCE 0 ("1" to run once and exit) - USER_AGENT podx-rss/1.0 (+local-archive) - RSS_STATE_FILE /library/.rss_state.json - RSS_CONNECT_TIMEOUT 15 (seconds) - RSS_READ_TIMEOUT 60 (seconds) - AUDIO_MAX_MB 4096 (skip larger-than if HEAD reveals size > max, 0 = unlimited) -""" -import os -import re -import sys -import json -import time -import logging -import itertools -from datetime import datetime +import os, re, json, time, shutil, difflib from pathlib import Path -from typing import Dict, List, Tuple, Optional from urllib.parse import urlparse - import requests import xml.etree.ElementTree as ET -import redis -from rq import Queue -logging.basicConfig(level=logging.INFO, format='[rss] %(message)s') -log = logging.getLogger("rss") +# ---- Config ---- +TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) +OUT_INDEX = Path(os.getenv("RSS_INDEX_PATH", str(TRN / "rss_index.json"))) +FEEDS_FILE = Path(os.getenv("RSS_FEEDS_FILE", "/app/config/feeds.txt")) +FEEDS_ENV = os.getenv("RSS_FEEDS", "").strip() +TIMEOUT = int(os.getenv("RSS_HTTP_TIMEOUT", "30")) +DOWNLOAD_TRANSCRIPTS = os.getenv("RSS_DOWNLOAD_TRANSCRIPTS", "true").lower() in {"1", "true", "yes", "y"} +DEFAULT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en" -# Config -MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") -REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") -LIBRARY_ROOT = Path(os.getenv("LIBRARY_ROOT", "/library")) -TRANSCRIPT_ROOT = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) -RSS_FEEDS_ENV = [s.strip() for s in os.getenv("RSS_FEEDS", "").split(",") if s.strip()] -FEEDS_FILE = Path(os.getenv("FEEDS_FILE", str(LIBRARY_ROOT / "feeds.txt"))) -RSS_SCAN_MINUTES = int(os.getenv("RSS_SCAN_MINUTES", "120")) -RSS_ONCE = os.getenv("RSS_ONCE", "0") == "1" -USER_AGENT = os.getenv("USER_AGENT", "podx-rss/1.0 (+local-archive)") -STATE_FILE = Path(os.getenv("RSS_STATE_FILE", str(LIBRARY_ROOT / ".rss_state.json"))) -CONNECT_TIMEOUT = float(os.getenv("RSS_CONNECT_TIMEOUT", "15")) -READ_TIMEOUT = float(os.getenv("RSS_READ_TIMEOUT", "60")) -AUDIO_MAX_MB = int(os.getenv("AUDIO_MAX_MB", "4096")) +# Where media files live; used to sidecar RSS transcripts next to matching media +LIB = Path(os.getenv("LIBRARY_ROOT", "/library")) +MEDIA_EXTS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".opus", ".mp4", ".m4v", ".mkv", ".webm", ".mov", ".avi"} -# Redis queue -r = redis.from_url(REDIS_URL) -q = Queue("default", connection=r) +# Fuzzy title match threshold for media ↔ transcript pairing +TITLE_MATCH_THRESHOLD = float(os.getenv("RSS_TITLE_MATCH_THRESHOLD", "0.60")) -# HTTP session -SESSION = requests.Session() -SESSION.headers.update({"User-Agent": USER_AGENT}) - -# Namespaces commonly used in podcast RSS +# Namespace map (extend as needed) NS = { - "podcast": "https://podcastindex.org/namespace/1.0", "itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd", - "media": "http://search.yahoo.com/mrss/", "content": "http://purl.org/rss/1.0/modules/content/", - "dc": "http://purl.org/dc/elements/1.1/", + "media": "http://search.yahoo.com/mrss/", + "podcast": "https://podcastindex.org/namespace/1.0", + "atom": "http://www.w3.org/2005/Atom", } -# ----------------- helpers ----------------- - -def safe(s: str) -> str: - s = re.sub(r"[\\/:*?\"<>|]", "_", s) - # collapse whitespace and trim - s = re.sub(r"\s+", " ", s).strip() - # guard against very long filenames - return s[:200] if len(s) > 200 else s +TRN.mkdir(parents=True, exist_ok=True) +OUT_INDEX.parent.mkdir(parents=True, exist_ok=True) -def load_state() -> Dict: - if STATE_FILE.exists(): - try: - return json.loads(STATE_FILE.read_text("utf-8")) - except Exception: - log.warning("State file unreadable, starting fresh") - return {"feeds": {}, "items": {}} # items keyed by GUID / enclosure URL +def _text(el): + return (el.text or "").strip() if el is not None else "" -def save_state(state: Dict) -> None: - STATE_FILE.parent.mkdir(parents=True, exist_ok=True) - tmp = STATE_FILE.with_suffix(STATE_FILE.suffix + ".tmp") - tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2)) - tmp.replace(STATE_FILE) - - -def load_feeds() -> List[str]: - feeds = list(RSS_FEEDS_ENV) - if FEEDS_FILE.exists(): - try: - for line in FEEDS_FILE.read_text("utf-8").splitlines(): - s = line.strip() - if not s or s.startswith("#"): # allow comments - continue - feeds.append(s) - except Exception as e: - log.warning(f"Failed to read {FEEDS_FILE}: {e}") - # de-dup preserving order - seen = set() - uniq = [] - for f in feeds: - if f not in seen: - uniq.append(f) - seen.add(f) - return uniq - - -def fetch(url: str, *, etag: Optional[str]=None, modified: Optional[str]=None, as_text=False): - headers = {} - if etag: - headers["If-None-Match"] = etag - if modified: - headers["If-Modified-Since"] = modified - resp = SESSION.get(url, headers=headers, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT)) - if resp.status_code == 304: - return None, 304, None, None - resp.raise_for_status() - new_etag = resp.headers.get("ETag") - new_mod = resp.headers.get("Last-Modified") - return (resp.text if as_text else resp.content), resp.status_code, new_etag, new_mod - - -def head_size(url: str) -> Optional[int]: +def _parse_duration(d): + if not d: + return None + s = str(d).strip() + if s.isdigit(): + return int(s) + parts = [p for p in s.split(":") if p != ""] try: - h = SESSION.head(url, allow_redirects=True, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT)) - if h.ok: - cl = h.headers.get("Content-Length") - return int(cl) if cl and cl.isdigit() else None + if len(parts) == 3: + h, m, sec = parts + return int(h) * 3600 + int(m) * 60 + int(float(sec)) + if len(parts) == 2: + m, sec = parts + return int(m) * 60 + int(float(sec)) + return int(float(parts[0])) except Exception: return None - return None -def best_transcript_links(item) -> List[str]: - links: List[Tuple[int, str, str]] = [] - # Try explicit QName first - for tag in item.findall(".//{https://podcastindex.org/namespace/1.0}transcript"): - t = (tag.attrib.get("type") or "").lower() - url = tag.attrib.get("url") - if url: - links.append((0, t, url)) - # Namespace-prefixed fallback - for tag in item.findall(".//podcast:transcript", NS): - t = (tag.attrib.get("type") or "").lower() - url = tag.attrib.get("url") - if url: - links.append((0, t, url)) - - order = [ - "text/plain", # often used for TextWithTimestamps - "application/json", - "text/vtt", - "application/srt", - "application/x-subrip", - "application/text", - "text/plain; charset=utf-8", - ] - key = {v: i for i, v in enumerate(order)} - ranked = [] - for _, t, url in links: - ranked.append((key.get(t, 999), t, url)) - ranked.sort() - return [u for _, _, u in ranked] +def _slug(text: str) -> str: + text = re.sub(r"\s+", " ", text).strip() + text = re.sub(r"[^A-Za-z0-9\-._ ]+", "", text) + return (text[:120] or "episode").replace(" ", "_") -def get_enclosure(item) -> Optional[str]: - enc = item.find("enclosure") - if enc is not None and enc.attrib.get("url"): - return enc.attrib["url"] - mc = item.find("media:content", NS) - if mc is not None and mc.attrib.get("url"): - return mc.attrib["url"] - return None - - -def parse_pubdate(item) -> datetime: - # Try common fields - candidates = [ - item.findtext("pubDate"), - item.findtext("dc:date", namespaces=NS), - item.findtext("{http://purl.org/dc/elements/1.1/}date"), - ] - for pd in filter(None, candidates): - s = pd.strip() - # Try several common formats - for fmt in [ - "%a, %d %b %Y %H:%M:%S %z", - "%a, %d %b %Y %H:%M:%S", - "%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%dT%H:%M:%S", - ]: - try: - return datetime.strptime(s, fmt) - except Exception: - pass - return datetime.utcnow() - - -def save_bytes(path: Path, data: bytes) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - tmp = path.with_suffix(path.suffix + ".part") - tmp.write_bytes(data) - tmp.replace(path) - - -def decide_audio_ext(audio_url: str) -> str: - p = urlparse(audio_url) - low = p.path.lower() - if low.endswith(".m4a"): - return ".m4a" - if low.endswith(".mp3"): - return ".mp3" - if low.endswith(".ogg") or low.endswith(".oga"): - return ".ogg" - if low.endswith(".aac"): - return ".aac" - if low.endswith(".wav"): - return ".wav" - return ".mp3" - - -def item_key(item) -> str: - # Prefer GUID value, else enclosure URL, else title+date - guid = item.findtext("guid") - if guid: - return guid.strip() - enc = get_enclosure(item) - if enc: - return enc - title = item.findtext("title") or "Episode" - pub = parse_pubdate(item).strftime("%Y%m%d") - return f"{pub}:{title}" - - -# ----------------- core ingest ----------------- - -def ingest_feed(url: str, state: Dict) -> int: - fstate = state.setdefault("feeds", {}).setdefault(url, {}) - etag = fstate.get("etag") - mod = fstate.get("modified") - - log.info(f"Fetch RSS: {url}") +def _yymmdd_from_pubdate(pubdate: str) -> str: try: - data, status, new_etag, new_mod = fetch(url, etag=etag, modified=mod, as_text=True) - except Exception as e: - log.error(f"Fetch failed: {e}") - return 0 + from email.utils import parsedate_to_datetime - if status == 304: - log.info("Not modified") - return 0 + dt = parsedate_to_datetime(pubdate) + if dt is not None: + return dt.strftime("%Y%m%d") + except Exception: + pass + m = re.search(r"(\d{4})[-/](\d{1,2})[-/](\d{1,2})", pubdate or "") + if m: + y, mo, d = m.groups() + return f"{int(y):04d}{int(mo):02d}{int(d):02d}" + return "" - if new_etag: - fstate["etag"] = new_etag - if new_mod: - fstate["modified"] = new_mod +def _iter_items(channel: ET.Element): + for tag in ["item", "{http://www.w3.org/2005/Atom}entry"]: + for it in channel.findall(tag): + yield it + + +def _findall_ns(el, path): + res = el.findall(path, NS) + if not res and ":" in path: + last = path.split("/")[-1] + res = el.findall(last) + return res + + +def _find_ns(el, path): + found = el.find(path, NS) + if found is None and ":" in path: + found = el.find(path.split("/")[-1]) + return found + + +def _download(url: str, dst: Path) -> Path | None: try: - root = ET.fromstring(data) + r = requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"}) + r.raise_for_status() + dst.parent.mkdir(parents=True, exist_ok=True) + with open(dst, "wb") as f: + f.write(r.content) + return dst + except Exception: + return None + + +def _guess_ext_from_type(mime: str) -> str: + if not mime: + return ".txt" + mime = mime.lower() + if "vtt" in mime: + return ".vtt" + if "srt" in mime or "subrip" in mime: + return ".srt" + if "json" in mime: + return ".json" + return ".txt" + + +def _norm_text(s: str) -> str: + s = (s or "").lower() + s = re.sub(r"\s+", " ", s) + s = re.sub(r"[^a-z0-9 _.-]+", "", s) + return s.strip() + +def _strip_leading_date(basename: str) -> str: + m = re.match(r"^(\d{8})\s*-\s*(.+)$", basename) + return m.group(2) if m else basename + +def _find_matching_media(date: str, title: str) -> list[Path]: + """Find media files in LIB that likely correspond to this episode. + Strategy: + 1) If YYYYMMDD is present, prefer files starting with that date. + 2) Otherwise, fuzzy title match using difflib on stems (date stripped). + """ + matches: list[Path] = [] + # 1) Date-based scan + if date: + for p in LIB.rglob(f"{date} - *"): + if p.is_file() and p.suffix.lower() in MEDIA_EXTS: + matches.append(p) + if matches: + return matches + + # 2) Fuzzy title scan (can be expensive on huge libraries) + tkey = _norm_text(title) + if not tkey: + return matches + for p in LIB.rglob("*"): + if not (p.is_file() and p.suffix.lower() in MEDIA_EXTS): + continue + stem = _strip_leading_date(p.stem) + fkey = _norm_text(stem) + if not fkey: + continue + if difflib.SequenceMatcher(None, tkey, fkey).ratio() >= TITLE_MATCH_THRESHOLD: + matches.append(p) + return matches + +def _sidecar_path_for(media_path: Path, lang: str, ext: str) -> Path: + base = media_path.with_suffix("") + lang = (lang or DEFAULT_LANG or "en").lower().replace("_", "-") + # Prefer language-suffixed sidecars (e.g., episode.en.srt) + return base.with_name(f"{base.name}.{lang}{ext}") + +def _propagate_transcript_to_media(local_tr_path: Path, lang: str, date: str, title: str) -> list[str]: + """Copy a downloaded transcript next to any matching media under LIB. + Returns a list of created sidecar file paths (as strings).""" + created: list[str] = [] + if not local_tr_path.exists(): + return created + ext = local_tr_path.suffix.lower() + if ext not in {".srt", ".vtt", ".txt"}: + # Unknown/unsupported transcript type for sidecar; skip silently + return created + for media in _find_matching_media(date, title): + dst = _sidecar_path_for(media, lang, ext) + try: + dst.parent.mkdir(parents=True, exist_ok=True) + if not dst.exists(): + shutil.copy2(local_tr_path, dst) + created.append(str(dst)) + except Exception: + # best-effort; continue + pass + return created + + +def _gather_transcripts(item: ET.Element): + transcripts = [] + # podcast:transcript elements + for t in _findall_ns(item, "podcast:transcript"): + url = t.get("url") or t.get("href") + ttype = t.get("type") or "" + lang = t.get("language") or t.get("lang") or "" + if url: + transcripts.append({"url": url, "type": ttype, "language": lang}) + # Atom-style transcript link + for link in _findall_ns(item, "atom:link"): + if (link.get("rel") or "").lower() == "transcript" and link.get("href"): + transcripts.append( + { + "url": link.get("href"), + "type": link.get("type") or "", + "language": link.get("hreflang") or "", + } + ) + return transcripts + + +def parse_feed(feed_url: str): + items = [] + try: + r = requests.get(feed_url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"}) + r.raise_for_status() + root = ET.fromstring(r.content) + + channel = root.find("channel") or root + show_title = _text(_find_ns(channel, "title")) or _text(_find_ns(root, "title")) + + for it in _iter_items(channel): + title = _text(_find_ns(it, "title")) + guid = _text(_find_ns(it, "guid")) or _text(_find_ns(it, "id")) + link = _text(_find_ns(it, "link")) + pub = _text(_find_ns(it, "pubDate")) or _text(_find_ns(it, "published")) + date = _yymmdd_from_pubdate(pub) + dur = _text(_find_ns(it, "itunes:duration")) + duration_sec = _parse_duration(dur) or None + enclosure = _find_ns(it, "enclosure") + audio_url = enclosure.get("url") if enclosure is not None else "" + + if not audio_url: + for mc in _findall_ns(it, "media:content"): + if (mc.get("type") or "").startswith("audio") and mc.get("url"): + audio_url = mc.get("url") + break + + transcripts = _gather_transcripts(it) + + item_rec = { + "show": show_title, + "feed_url": feed_url, + "title": title, + "guid": guid, + "link": link, + "date": date, + "duration_sec": duration_sec, + "audio_url": audio_url, + "language": DEFAULT_LANG, + "transcripts": transcripts, + } + + # Optionally download transcripts locally + if DOWNLOAD_TRANSCRIPTS and transcripts: + base_name = f"{date or '00000000'} - {_slug(title)}" + for t in item_rec["transcripts"]: + ext = _guess_ext_from_type(t.get("type", "")) + parsed = urlparse(t["url"]) + url_ext = Path(parsed.path).suffix.lower() + if url_ext in {".vtt", ".srt", ".txt", ".json"}: + ext = url_ext + local_file = (TRN / base_name).with_suffix(ext) + saved = _download(t["url"], local_file) + if saved: + t["local_path"] = str(saved) + # If we saved a readable sidecar type, try to place it next to matching media + if ext in {".vtt", ".srt", ".txt"}: + created = _propagate_transcript_to_media(saved, t.get("language") or DEFAULT_LANG, date, title) + if created: + t["sidecars"] = created + + items.append(item_rec) + + return {"feed_url": feed_url, "show": show_title, "episodes": items} except Exception as e: - log.error(f"XML parse error: {e}") - return 0 + return {"feed_url": feed_url, "error": str(e), "episodes": []} - channel_title = safe((root.findtext("channel/title") or "Podcast")) - new_items = 0 - for item in root.findall("channel/item"): - key = item_key(item) - already = state.setdefault("items", {}) - if already.get(key): - continue - - title = safe(item.findtext("title") or "Episode") - pub = parse_pubdate(item) - date_prefix = pub.strftime("%Y%m%d") - base = f"{date_prefix} - {title}" - - audio_url = get_enclosure(item) - if not audio_url: - log.info(f"Skip (no enclosure): {title}") - already[key] = {"skipped": "no_enclosure"} - continue - - # HEAD size guard (optional) - if AUDIO_MAX_MB > 0: - size = head_size(audio_url) - if size and size > AUDIO_MAX_MB * 1024 * 1024: - log.info(f"Skip (size>{AUDIO_MAX_MB}MB): {title}") - already[key] = {"skipped": "too_large", "size": size} - continue - - path_ext = decide_audio_ext(audio_url) - audio_out = LIBRARY_ROOT / channel_title / f"{base}{path_ext}" - transcript_links = best_transcript_links(item) - - # If audio exists and a transcript sidecar exists → just enqueue index - sidecars = list((TRANSCRIPT_ROOT / channel_title).glob(f"{base}.*")) - have_transcript = len(sidecars) > 0 - if audio_out.exists() and have_transcript: - log.info(f"Skip download, enqueue index (have audio+transcript): {audio_out.name}") - try: - q.enqueue("worker.handle_local_file", str(audio_out), job_timeout=4*3600, result_ttl=86400, failure_ttl=86400) - except Exception as e: - log.warning(f"Enqueue failed: {e}") - already[key] = {"done": True, "audio": str(audio_out)} - new_items += 1 - continue - - # Download audio +def load_feeds_list(): + feeds = [] + if FEEDS_ENV: + feeds.extend([u.strip() for u in FEEDS_ENV.split(",") if u.strip()]) + if FEEDS_FILE.exists(): try: - log.info(f"Downloading audio → {audio_out}") - content, _, _, _ = fetch(audio_url, as_text=False) - save_bytes(audio_out, content) - except Exception as e: - log.warning(f"Audio failed: {e}") - already[key] = {"error": f"audio:{e}"} - continue - - # Download transcript if present (take first best) - transcript_out = None - for turl in transcript_links: - try: - ext = ".vtt" if "vtt" in turl.lower() else ".srt" if "srt" in turl.lower() else ".txt" - tout = TRANSCRIPT_ROOT / channel_title / f"{base}{ext}" - log.info(f"Downloading transcript → {tout} ({turl})") - tdata, _, _, _ = fetch(turl, as_text=False) - save_bytes(tout, tdata) - transcript_out = tout - break - except Exception as e: - log.warning(f"Transcript fetch failed ({turl}): {e}") - continue - - # Enqueue for indexing/transcription - try: - q.enqueue("worker.handle_local_file", str(audio_out), job_timeout=4*3600, result_ttl=86400, failure_ttl=86400) - except Exception as e: - log.warning(f"Enqueue failed: {e}") - - already[key] = {"done": True, "audio": str(audio_out), "transcript": str(transcript_out) if transcript_out else None} - new_items += 1 - - return new_items + for line in FEEDS_FILE.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + feeds.append(line) + except Exception: + pass + # unique, keep order + return sorted(list(dict.fromkeys(feeds))) -# ----------------- main loop ----------------- - -def main(): - while True: - feeds = load_feeds() - if not feeds: - log.error("No RSS feeds configured. Set RSS_FEEDS or create feeds.txt.") - sys.exit(1) - - state = load_state() - total_new = 0 - for url in feeds: - try: - added = ingest_feed(url, state) - total_new += added - save_state(state) - except Exception as e: - log.error(f"Feed error: {url} -> {e}") - log.info(f"Cycle complete. New items: {total_new}") - if RSS_ONCE: - break - time.sleep(RSS_SCAN_MINUTES * 60) +def build_index(): + feeds = load_feeds_list() + out = {"generated_at": int(time.time()), "feeds": feeds, "episodes": []} + for url in feeds: + data = parse_feed(url) + out["episodes"].extend(data.get("episodes", [])) + OUT_INDEX.write_text(json.dumps(out, ensure_ascii=False, indent=2)) + print(f"[rss] wrote index with {len(out['episodes'])} episode(s) -> {OUT_INDEX}") + return OUT_INDEX if __name__ == "__main__": - main() \ No newline at end of file + build_index() \ No newline at end of file diff --git a/app/worker.py b/app/worker.py index eecf0a5..4af503e 100644 --- a/app/worker.py +++ b/app/worker.py @@ -1,6 +1,7 @@ import os, subprocess, shutil, json, re, orjson, requests from pathlib import Path import math +import difflib from faster_whisper import WhisperModel MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") @@ -12,6 +13,11 @@ MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3") COMPUTE = os.getenv("WHISPER_PRECISION","int8") WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip() +# RSS resolver config +RSS_INDEX_PATH = Path(os.getenv("RSS_INDEX_PATH", "/transcripts/rss_index.json")) +RSS_DURATION_TOLERANCE = int(os.getenv("RSS_DURATION_TOLERANCE", "150")) # seconds +DEFAULT_TRANSCRIPT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en" + OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/") OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "") OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library") @@ -39,6 +45,160 @@ def log(feed): def sanitize(name): return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip() +# ---------- RSS transcript resolver ---------- + +def _normalize_title(t: str) -> str: + t = (t or "").lower() + t = re.sub(r"\s+", " ", t) + # remove punctuation-ish + t = re.sub(r"[^a-z0-9 _-]+", "", t) + return t.strip() + +def _stem_without_date(stem: str) -> str: + # drop leading YYYYMMDD - from filenames created by yt-dlp template + m = re.match(r"^\d{8}\s*-\s*(.*)$", stem) + return m.group(1) if m else stem + +def _extract_date_from_stem(stem: str) -> str | None: + m = re.search(r"\b(\d{8})\b", stem) + return m.group(1) if m else None + +def _best_title_match(title: str, candidates: list[str]) -> tuple[str, float]: + """Return (best_title, score 0..1) using difflib SequenceMatcher.""" + if not candidates: + return "", 0.0 + norm_title = _normalize_title(title) + best = ("", 0.0) + for c in candidates: + score = difflib.SequenceMatcher(None, norm_title, _normalize_title(c)).ratio() + if score > best[1]: + best = (c, score) + return best + +def _load_rss_index() -> list[dict]: + try: + if RSS_INDEX_PATH.exists(): + data = json.loads(RSS_INDEX_PATH.read_text(encoding="utf-8")) + # supports {"episodes":[...]} or a flat list + if isinstance(data, dict) and "episodes" in data: + return data["episodes"] or [] + if isinstance(data, list): + return data + except Exception as e: + print(f"[resolver] failed to load RSS index: {e}", flush=True) + return [] + +def match_media_to_rss(media_path: Path) -> dict | None: + """Try to match a local media file to an RSS episode entry.""" + episodes = _load_rss_index() + if not episodes: + return None + + stem = media_path.stem + title_no_date = _stem_without_date(stem) + file_date = _extract_date_from_stem(stem) + # duration tolerance + media_dur = media_duration_seconds(media_path) + + # Candidates: filter by date if present, else all + if file_date: + pool = [e for e in episodes if (str(e.get("date", "")) == file_date or str(e.get("pubdate", "")) == file_date)] + if not pool: + pool = episodes + else: + pool = episodes + + # Pick best by (title similarity, duration proximity) + best_ep, best_score = None, -1.0 + for ep in pool: + ep_title = ep.get("title") or ep.get("itunes_title") or "" + sim = _best_title_match(title_no_date, [ep_title])[1] + dur = float(ep.get("duration_sec") or ep.get("duration") or 0.0) + dur_ok = True + if media_dur and dur: + dur_ok = abs(media_dur - dur) <= RSS_DURATION_TOLERANCE + score = sim + (0.1 if dur_ok else 0.0) + if score > best_score: + best_score, best_ep = score, ep + + if best_ep and best_score >= 0.5: + print(f"[resolver] matched '{stem}' -> '{best_ep.get('title','')}' score={best_score:.2f}", flush=True) + return best_ep + return None + +def _choose_transcript_url(ep: dict) -> tuple[str, str] | tuple[None, None]: + """Return (url, kind) preferring txt, vtt, then srt. 'kind' in {'txt','vtt','srt'}.""" + # unified structure from rss_ingest.py: ep["transcripts"] = [{"url":..., "type": ...}, ...] + items = ep.get("transcripts") or [] + # some ingesters store separate keys + if not items: + for key, kind in [("transcript_txt","txt"), ("transcript_vtt","vtt"), ("transcript_srt","srt")]: + if ep.get(key): + items.append({"url": ep[key], "type": kind}) + # preference order + for kind in ["txt", "vtt", "srt"]: + for it in items: + t = (it.get("type") or "").lower() + u = it.get("url") or "" + if u and (kind in t or (kind == "txt" and t in ["text","plain","text/plain"]) or (kind in u.lower())): + return u, kind + return (None, None) + +def fetch_rss_transcript(ep: dict, dest_dir: Path) -> Path | None: + """Download transcript to dest_dir and return local Path; convert VTT->SRT if needed.""" + url, kind = _choose_transcript_url(ep) + if not url: + return None + dest_dir.mkdir(parents=True, exist_ok=True) + # filename from episode title + safe = sanitize(ep.get("title") or ep.get("guid") or "episode") + path = dest_dir / f"{safe}.{kind if kind!='txt' else 'txt'}" + try: + r = requests.get(url, timeout=30) + r.raise_for_status() + mode = "wb" if kind in ("vtt","srt") else "w" + if mode == "wb": + path.write_bytes(r.content) + else: + path.write_text(r.text, encoding="utf-8") + print(f"[resolver] downloaded transcript ({kind}) from {url}", flush=True) + return path + except Exception as e: + print(f"[resolver] failed to fetch transcript: {e}", flush=True) + return None + +def use_rss_transcript(media_path: Path, ep: dict) -> Path | None: + """Create standard transcript artifacts from an RSS transcript (txt/vtt/srt).""" + # Prefer direct download; else if rss_ingest already saved a local file path, try that. + sidecar = None + local_hint = ep.get("transcript_local") + if local_hint: + p = Path(local_hint) + if p.exists(): + sidecar = p + if sidecar is None: + sidecar = fetch_rss_transcript(ep, TMP) + + if not sidecar or not sidecar.exists(): + return None + + # Convert to plain text + plain = transcript_text_from_file(sidecar) + lang = (ep.get("language") or ep.get("lang") or DEFAULT_TRANSCRIPT_LANG).split("-")[0] + base = write_plain_transcript(media_path, plain, language=lang) + # Place an SRT next to video for Plex + ensure_sidecar_next_to_media(sidecar, media_path, lang=lang) + # Write provenance sidecar + (base.with_suffix(".prov.json")).write_bytes(orjson.dumps({ + "source": "rss", + "feed": ep.get("feed_url"), + "guid": ep.get("guid"), + "episode_title": ep.get("title"), + "transcript_kind": sidecar.suffix.lower().lstrip("."), + "transcript_url": _choose_transcript_url(ep)[0] or "", + })) + return base + def find_sidecar_transcript(media_path: Path) -> Path | None: """Return a .txt/.srt/.vtt transcript file sitting next to media, if any. Tries common variants including language-suffixed SRT/VTT. @@ -57,6 +217,110 @@ def find_sidecar_transcript(media_path: Path) -> Path | None: return candidates[0] if candidates else None +# ---------- Transcript repository reuse helpers ---------- + +def find_repo_transcript_for_media(media_path: Path) -> Path | None: + """Search the transcript repository (/transcripts) for an existing transcript + that likely belongs to this media file (match by YYYYMMDD in filename and/or + fuzzy title similarity). Returns a path to a matching .json if found.""" + try: + stem = media_path.stem + title_no_date = _stem_without_date(stem) + file_date = _extract_date_from_stem(stem) + best_json, best_score = None, 0.0 + for j in TRN.glob("*.json"): + try: + data = json.loads(j.read_text(encoding="utf-8")) + except Exception: + continue + other_file = Path(data.get("file", "")) + other_stem = other_file.stem if other_file else j.stem + other_date = _extract_date_from_stem(other_stem) + # If both have dates and they differ a lot, skip + if file_date and other_date and file_date != other_date: + continue + # Compare titles (without dates) + sim = difflib.SequenceMatcher( + None, + _normalize_title(title_no_date), + _normalize_title(_stem_without_date(other_stem)), + ).ratio() + # Nudge score when dates match + if file_date and other_date and file_date == other_date: + sim += 0.1 + if sim > best_score: + best_score, best_json = sim, j + # Require a reasonable similarity + return best_json if best_json and best_score >= 0.60 else None + except Exception: + return None + + +def reuse_repo_transcript(media_path: Path, repo_json: Path) -> Path | None: + """Copy/retarget an existing transcript JSON/TXT (and make SRT/VTT if possible) + from the repository so that it belongs to the provided media_path. Returns + the new base path in /transcripts or None.""" + try: + # load the source transcript + data = json.loads(repo_json.read_text(encoding="utf-8")) + src_base = TRN / Path(repo_json).stem + src_txt = src_base.with_suffix(".txt") + src_srt = src_base.with_suffix(".srt") + src_vtt = src_base.with_suffix(".vtt") + + # write the retargeted artifacts + new_title = media_path.stem + new_base = TRN / new_title + new_base.parent.mkdir(parents=True, exist_ok=True) + + # update file path + data["file"] = str(media_path) + (new_base.with_suffix(".json")).write_bytes(orjson.dumps(data)) + + # copy or synthesize TXT + if src_txt.exists(): + shutil.copy2(src_txt, new_base.with_suffix(".txt")) + else: + # fallback: concatenate segments + txt = " ".join(s.get("text", "") for s in data.get("segments", [])) + (new_base.with_suffix(".txt")).write_text(txt, encoding="utf-8") + + # copy SRT/VTT if present; otherwise synthesize SRT from segments + if src_srt.exists(): + shutil.copy2(src_srt, new_base.with_suffix(".srt")) + else: + # synthesize SRT + def fmt_ts(t): + h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) + return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') + with open(new_base.with_suffix(".srt"), "w", encoding="utf-8") as srt: + for i, s in enumerate(data.get("segments", []), 1): + srt.write(f"{i}\n{fmt_ts(s.get('start',0.0))} --> {fmt_ts(s.get('end',0.0))}\n{s.get('text','').strip()}\n\n") + if src_vtt.exists(): + shutil.copy2(src_vtt, new_base.with_suffix(".vtt")) + else: + # synthesize VTT from segments + def fmt_ts_vtt(t): + h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) + return f"{h:02}:{m:02}:{s:06.3f}" + with open(new_base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: + vtt.write("WEBVTT\n\n") + for s in data.get("segments", []): + vtt.write(f"{fmt_ts_vtt(s.get('start',0.0))} --> {fmt_ts_vtt(s.get('end',0.0))} \n{s.get('text','').strip()}\n\n") + + # ensure sidecar next to media + try: + lang = (data.get("language") or DEFAULT_TRANSCRIPT_LANG).split("-")[0] + ensure_sidecar_next_to_media(new_base.with_suffix(".srt"), media_path, lang=lang) + except Exception: + pass + + return new_base + except Exception as e: + print(f"[resolver] failed to reuse repo transcript: {e}", flush=True) + return None + + def transcript_text_from_file(path: Path) -> str: """Extract plain text from .txt/.srt/.vtt by stripping timestamps and counters.""" try: @@ -84,8 +348,10 @@ def transcript_text_from_file(path: Path) -> str: def ensure_sidecar_next_to_media(sidecar: Path, media_path: Path, lang: str = "en") -> None: - """Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed.""" + """Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed. If the sidecar is .txt, do nothing.""" try: + if sidecar.suffix.lower() == ".txt": + return if sidecar.suffix.lower() == ".srt": dst = media_path.with_suffix(f".{lang}.srt") shutil.copy2(sidecar, dst) @@ -407,6 +673,19 @@ def handle_local_file(path_str: str): "uploader": p.parent.name, "date": "", "path": str(p), "progress": 0} log(info) + # 0) Try RSS resolver first: if episode with transcript exists, use it (skip Whisper) + try: + ep = match_media_to_rss(p) + except Exception as _e: + ep = None + if ep: + base = use_rss_transcript(p, ep) + if base: + index_meili(base.with_suffix(".json")) + publish_to_openwebui([base.with_suffix(".txt")]) + log({**info, **{"status": "done", "note": "used_rss_transcript"}}) + return + # 1) Prefer an existing transcript sidecar if present sidecar = find_sidecar_transcript(p) if sidecar: @@ -419,6 +698,16 @@ def handle_local_file(path_str: str): log({**info, **{"status": "done", "note": "used_existing_transcript"}}) return + # 1.5) Reuse a transcript that exists in the repository for a matching episode + repo_json = find_repo_transcript_for_media(p) + if repo_json: + base = reuse_repo_transcript(p, repo_json) + if base: + index_meili(base.with_suffix(".json")) + publish_to_openwebui([base.with_suffix(".txt")]) + log({**info, **{"status": "done", "note": "reused_repo_transcript"}}) + return + # 2) Otherwise, run transcription base = transcribe(p) index_meili(base.with_suffix(".json")) @@ -464,7 +753,23 @@ def handle_url(url: str): "date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""), "path": str(dest)}) log({**info, **{"status":"transcribing", "progress": 0}}) - base = transcribe(dest) + # Try RSS transcript resolver first + ep = None + try: + ep = match_media_to_rss(dest) + except Exception: + ep = None + if ep: + base = use_rss_transcript(dest, ep) + else: + base = None + # 1.5) If we didn't get an RSS transcript and there is a matching one already in the repo, reuse it + if not base: + repo_json = find_repo_transcript_for_media(dest) + if repo_json: + base = reuse_repo_transcript(dest, repo_json) + if not base: + base = transcribe(dest) index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) log({**info, **{"status":"done"}})