import os, re, json, time, shutil, difflib from pathlib import Path from urllib.parse import urlparse import requests import xml.etree.ElementTree as ET # ---- Config ---- TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) OUT_INDEX = Path(os.getenv("RSS_INDEX_PATH", str(TRN / "rss_index.json"))) FEEDS_FILE = Path(os.getenv("RSS_FEEDS_FILE", "/library/feeds.txt")) FEEDS_ENV = os.getenv("RSS_FEEDS", "").strip() TIMEOUT = int(os.getenv("RSS_HTTP_TIMEOUT", "30")) DOWNLOAD_TRANSCRIPTS = os.getenv("RSS_DOWNLOAD_TRANSCRIPTS", "true").lower() in {"1", "true", "yes", "y"} DEFAULT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en" RSS_SCAN_MINUTES = int(os.getenv("RSS_SCAN_MINUTES", "15")) RSS_ONCE = os.getenv("RSS_ONCE", "0").lower() in {"1", "true", "yes", "y"} AUDIO_MAX_MB = int(os.getenv("RSS_AUDIO_MAX_MB", "0")) # 0 = unlimited # Where media files live; used to sidecar RSS transcripts next to matching media LIB = Path(os.getenv("LIBRARY_ROOT", "/library")) MEDIA_EXTS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".opus", ".mp4", ".m4v", ".mkv", ".webm", ".mov", ".avi"} # Fuzzy title match threshold for media ↔ transcript pairing TITLE_MATCH_THRESHOLD = float(os.getenv("RSS_TITLE_MATCH_THRESHOLD", "0.60")) # Download podcast audio (enclosures) to a local library PODCASTS_ROOT = Path(os.getenv("PODCASTS_ROOT", str(LIB / "Podcasts"))) PODCASTS_PER_SHOW = os.getenv("PODCASTS_PER_SHOW", "true").lower() in {"1","true","yes","y"} DOWNLOAD_AUDIO = os.getenv("RSS_DOWNLOAD_AUDIO", "true").lower() in {"1","true","yes","y"} # Namespace map (extend as needed) NS = { "itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd", "content": "http://purl.org/rss/1.0/modules/content/", "media": "http://search.yahoo.com/mrss/", "podcast": "https://podcastindex.org/namespace/1.0", "atom": "http://www.w3.org/2005/Atom", } TRN.mkdir(parents=True, exist_ok=True) OUT_INDEX.parent.mkdir(parents=True, exist_ok=True) PODCASTS_ROOT.mkdir(parents=True, exist_ok=True) def _text(el): return (el.text or "").strip() if el is not None else "" def _parse_duration(d): if not d: return None s = str(d).strip() if s.isdigit(): return int(s) parts = [p for p in s.split(":") if p != ""] try: if len(parts) == 3: h, m, sec = parts return int(h) * 3600 + int(m) * 60 + int(float(sec)) if len(parts) == 2: m, sec = parts return int(m) * 60 + int(float(sec)) return int(float(parts[0])) except Exception: return None def _slug(text: str) -> str: text = re.sub(r"\s+", " ", text).strip() text = re.sub(r"[^A-Za-z0-9\-._ ]+", "", text) return (text[:120] or "episode").replace(" ", "_") def _yymmdd_from_pubdate(pubdate: str) -> str: try: from email.utils import parsedate_to_datetime dt = parsedate_to_datetime(pubdate) if dt is not None: return dt.strftime("%Y%m%d") except Exception: pass m = re.search(r"(\d{4})[-/](\d{1,2})[-/](\d{1,2})", pubdate or "") if m: y, mo, d = m.groups() return f"{int(y):04d}{int(mo):02d}{int(d):02d}" return "" def _iter_items(channel: ET.Element): for tag in ["item", "{http://www.w3.org/2005/Atom}entry"]: for it in channel.findall(tag): yield it def _findall_ns(el, path): res = el.findall(path, NS) if not res and ":" in path: last = path.split("/")[-1] res = el.findall(last) return res def _find_ns(el, path): found = el.find(path, NS) if found is None and ":" in path: found = el.find(path.split("/")[-1]) return found def _download(url: str, dst: Path) -> Path | None: try: r = requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"}) r.raise_for_status() dst.parent.mkdir(parents=True, exist_ok=True) with open(dst, "wb") as f: f.write(r.content) return dst except Exception: return None def _guess_ext_from_type(mime: str) -> str: if not mime: return ".txt" mime = mime.lower() if "vtt" in mime: return ".vtt" if "srt" in mime or "subrip" in mime: return ".srt" if "json" in mime: return ".json" return ".txt" def _guess_audio_ext(mime: str, url: str) -> str: # Prefer by MIME; fall back to URL suffix mime = (mime or "").lower() if "mp3" in mime: return ".mp3" if "aac" in mime or "mp4a" in mime: return ".m4a" if "m4a" in mime: return ".m4a" if "ogg" in mime: return ".ogg" if "opus" in mime: return ".opus" if "flac" in mime: return ".flac" if "wav" in mime: return ".wav" # fallback by URL suf = Path(urlparse(url).path).suffix.lower() if suf in {".mp3", ".m4a", ".aac", ".ogg", ".opus", ".flac", ".wav"}: return ".m4a" if suf == ".aac" else suf return ".mp3" def _download_stream(url: str, dst: Path) -> Path | None: try: dst.parent.mkdir(parents=True, exist_ok=True) with requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"}, stream=True) as r: r.raise_for_status() max_bytes = AUDIO_MAX_MB * 1024 * 1024 if AUDIO_MAX_MB > 0 else None total = 0 with open(dst, "wb") as f: for chunk in r.iter_content(chunk_size=1024 * 256): if not chunk: continue f.write(chunk) total += len(chunk) if max_bytes and total > max_bytes: # stop and remove partial try: f.close() except Exception: pass try: dst.unlink(missing_ok=True) except Exception: pass return None return dst except Exception: return None def _norm_text(s: str) -> str: s = (s or "").lower() s = re.sub(r"\s+", " ", s) s = re.sub(r"[^a-z0-9 _.-]+", "", s) return s.strip() def _strip_leading_date(basename: str) -> str: m = re.match(r"^(\d{8})\s*-\s*(.+)$", basename) return m.group(2) if m else basename def _find_matching_media(date: str, title: str) -> list[Path]: """Find media files in LIB that likely correspond to this episode. Strategy: 1) If YYYYMMDD is present, prefer files starting with that date. 2) Otherwise, fuzzy title match using difflib on stems (date stripped). """ matches: list[Path] = [] # 1) Date-based scan if date: for p in LIB.rglob(f"{date} - *"): if p.is_file() and p.suffix.lower() in MEDIA_EXTS: matches.append(p) if matches: return matches # 2) Fuzzy title scan (can be expensive on huge libraries) tkey = _norm_text(title) if not tkey: return matches for p in LIB.rglob("*"): if not (p.is_file() and p.suffix.lower() in MEDIA_EXTS): continue stem = _strip_leading_date(p.stem) fkey = _norm_text(stem) if not fkey: continue if difflib.SequenceMatcher(None, tkey, fkey).ratio() >= TITLE_MATCH_THRESHOLD: matches.append(p) return matches def _sidecar_path_for(media_path: Path, lang: str, ext: str) -> Path: base = media_path.with_suffix("") lang = (lang or DEFAULT_LANG or "en").lower().replace("_", "-") # Prefer language-suffixed sidecars (e.g., episode.en.srt) return base.with_name(f"{base.name}.{lang}{ext}") def _propagate_transcript_to_media(local_tr_path: Path, lang: str, date: str, title: str) -> list[str]: """Copy a downloaded transcript next to any matching media under LIB. Returns a list of created sidecar file paths (as strings).""" created: list[str] = [] if not local_tr_path.exists(): return created ext = local_tr_path.suffix.lower() if ext not in {".srt", ".vtt", ".txt"}: # Unknown/unsupported transcript type for sidecar; skip silently return created for media in _find_matching_media(date, title): dst = _sidecar_path_for(media, lang, ext) try: dst.parent.mkdir(parents=True, exist_ok=True) if not dst.exists(): shutil.copy2(local_tr_path, dst) created.append(str(dst)) except Exception: # best-effort; continue pass return created def _gather_transcripts(item: ET.Element): transcripts = [] # podcast:transcript elements for t in _findall_ns(item, "podcast:transcript"): url = t.get("url") or t.get("href") ttype = t.get("type") or "" lang = t.get("language") or t.get("lang") or "" if url: transcripts.append({"url": url, "type": ttype, "language": lang}) # Atom-style transcript link for link in _findall_ns(item, "atom:link"): if (link.get("rel") or "").lower() == "transcript" and link.get("href"): transcripts.append( { "url": link.get("href"), "type": link.get("type") or "", "language": link.get("hreflang") or "", } ) return transcripts def parse_feed(feed_url: str): items = [] try: print(f"[rss] fetching {feed_url}", flush=True) r = requests.get(feed_url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"}) r.raise_for_status() root = ET.fromstring(r.content) channel = root.find("channel") or root show_title = _text(_find_ns(channel, "title")) or _text(_find_ns(root, "title")) if not show_title: show_title = "" for it in _iter_items(channel): title = _text(_find_ns(it, "title")) guid = _text(_find_ns(it, "guid")) or _text(_find_ns(it, "id")) link = _text(_find_ns(it, "link")) pub = _text(_find_ns(it, "pubDate")) or _text(_find_ns(it, "published")) date = _yymmdd_from_pubdate(pub) dur = _text(_find_ns(it, "itunes:duration")) duration_sec = _parse_duration(dur) or None enclosure = _find_ns(it, "enclosure") audio_url = enclosure.get("url") if enclosure is not None else "" audio_type = enclosure.get("type") if enclosure is not None else "" if not audio_url: for mc in _findall_ns(it, "media:content"): if (mc.get("type") or "").startswith("audio") and mc.get("url"): audio_url = mc.get("url") break transcripts = _gather_transcripts(it) item_rec = { "show": show_title, "feed_url": feed_url, "title": title, "guid": guid, "link": link, "date": date, "duration_sec": duration_sec, "audio_url": audio_url, "audio_type": audio_type, "language": DEFAULT_LANG, "transcripts": transcripts, } # Optionally download transcripts locally if DOWNLOAD_TRANSCRIPTS and transcripts: base_name = f"{date or '00000000'} - {_slug(title)}" for t in item_rec["transcripts"]: ext = _guess_ext_from_type(t.get("type", "")) parsed = urlparse(t["url"]) url_ext = Path(parsed.path).suffix.lower() if url_ext in {".vtt", ".srt", ".txt", ".json"}: ext = url_ext local_file = (TRN / base_name).with_suffix(ext) saved = _download(t["url"], local_file) if saved: t["local_path"] = str(saved) # If we saved a readable sidecar type, try to place it next to matching media if ext in {".vtt", ".srt", ".txt"}: created = _propagate_transcript_to_media(saved, t.get("language") or DEFAULT_LANG, date, title) if created: t["sidecars"] = created # Optionally download podcast audio locally local_audio_path = None if DOWNLOAD_AUDIO and audio_url: show_dir = PODCASTS_ROOT / _slug(show_title or "Podcast") if PODCASTS_PER_SHOW else PODCASTS_ROOT base_name = f"{(date or '00000000')} - {_slug(title or guid or 'episode')}" ext = _guess_audio_ext(audio_type, audio_url) target = (show_dir / base_name).with_suffix(ext) # Avoid re-download if already exists if not target.exists(): saved = _download_stream(audio_url, target) if saved is None: # Try a non-streaming fallback saved = _download(audio_url, target) else: saved = target if saved and saved.exists(): local_audio_path = saved # If we previously downloaded transcript sidecars, try to place them next to this audio for t in item_rec.get("transcripts", []) or []: lp = t.get("local_path") if lp: try: lp = Path(lp) if lp.exists() and lp.suffix.lower() in {'.srt','.vtt','.txt'}: sc = _sidecar_path_for(local_audio_path, t.get('language') or DEFAULT_LANG, lp.suffix.lower()) if not sc.exists(): sc.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(lp, sc) t.setdefault("sidecars", []).append(str(sc)) except Exception: pass if local_audio_path: item_rec["local_audio"] = str(local_audio_path) items.append(item_rec) print(f"[rss] parsed {len(items)} episode(s) from {show_title or feed_url}", flush=True) return {"feed_url": feed_url, "show": show_title, "episodes": items} except Exception as e: print(f"[rss] ERROR parsing {feed_url}: {e}", flush=True) return {"feed_url": feed_url, "error": str(e), "episodes": []} def load_feeds_list(): print(f"[rss] FEEDS_FILE={FEEDS_FILE} FEEDS_ENV={'set' if bool(FEEDS_ENV) else 'unset'}", flush=True) feeds = [] if FEEDS_ENV: feeds.extend([u.strip() for u in FEEDS_ENV.split(",") if u.strip()]) if FEEDS_FILE.exists(): try: for line in FEEDS_FILE.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#"): continue feeds.append(line) except Exception: pass else: print(f"[rss] feeds file not found: {FEEDS_FILE}", flush=True) # unique, keep order feeds = sorted(list(dict.fromkeys(feeds))) print(f"[rss] parsed {len(feeds)} feed URL(s)", flush=True) return feeds def build_index(): feeds = load_feeds_list() out = {"generated_at": int(time.time()), "feeds": feeds, "episodes": []} for url in feeds: data = parse_feed(url) out["episodes"].extend(data.get("episodes", [])) OUT_INDEX.write_text(json.dumps(out, ensure_ascii=False, indent=2)) print(f"[rss] wrote index with {len(out['episodes'])} episode(s) -> {OUT_INDEX}") return OUT_INDEX if __name__ == "__main__": while True: try: build_index() except Exception as e: print(f"[rss] build error: {e}", flush=True) if RSS_ONCE: break time.sleep(max(1, RSS_SCAN_MINUTES) * 60)