Files
podx/app/rss_ingest.py
2025-09-07 17:30:45 +02:00

357 lines
13 KiB
Python

import os, re, json, time, shutil, difflib
from pathlib import Path
from urllib.parse import urlparse
import requests
import xml.etree.ElementTree as ET
# ---- Config ----
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
OUT_INDEX = Path(os.getenv("RSS_INDEX_PATH", str(TRN / "rss_index.json")))
FEEDS_FILE = Path(os.getenv("RSS_FEEDS_FILE", "/app/config/feeds.txt"))
FEEDS_ENV = os.getenv("RSS_FEEDS", "").strip()
TIMEOUT = int(os.getenv("RSS_HTTP_TIMEOUT", "30"))
DOWNLOAD_TRANSCRIPTS = os.getenv("RSS_DOWNLOAD_TRANSCRIPTS", "true").lower() in {"1", "true", "yes", "y"}
DEFAULT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
# Where media files live; used to sidecar RSS transcripts next to matching media
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
MEDIA_EXTS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".opus", ".mp4", ".m4v", ".mkv", ".webm", ".mov", ".avi"}
# Fuzzy title match threshold for media ↔ transcript pairing
TITLE_MATCH_THRESHOLD = float(os.getenv("RSS_TITLE_MATCH_THRESHOLD", "0.60"))
# Download podcast audio (enclosures) to a local library
PODCASTS_ROOT = Path(os.getenv("PODCASTS_ROOT", str(LIB / "Podcasts")))
PODCASTS_PER_SHOW = os.getenv("PODCASTS_PER_SHOW", "true").lower() in {"1","true","yes","y"}
DOWNLOAD_AUDIO = os.getenv("RSS_DOWNLOAD_AUDIO", "true").lower() in {"1","true","yes","y"}
# Namespace map (extend as needed)
NS = {
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
"content": "http://purl.org/rss/1.0/modules/content/",
"media": "http://search.yahoo.com/mrss/",
"podcast": "https://podcastindex.org/namespace/1.0",
"atom": "http://www.w3.org/2005/Atom",
}
TRN.mkdir(parents=True, exist_ok=True)
OUT_INDEX.parent.mkdir(parents=True, exist_ok=True)
PODCASTS_ROOT.mkdir(parents=True, exist_ok=True)
def _text(el):
return (el.text or "").strip() if el is not None else ""
def _parse_duration(d):
if not d:
return None
s = str(d).strip()
if s.isdigit():
return int(s)
parts = [p for p in s.split(":") if p != ""]
try:
if len(parts) == 3:
h, m, sec = parts
return int(h) * 3600 + int(m) * 60 + int(float(sec))
if len(parts) == 2:
m, sec = parts
return int(m) * 60 + int(float(sec))
return int(float(parts[0]))
except Exception:
return None
def _slug(text: str) -> str:
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"[^A-Za-z0-9\-._ ]+", "", text)
return (text[:120] or "episode").replace(" ", "_")
def _yymmdd_from_pubdate(pubdate: str) -> str:
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(pubdate)
if dt is not None:
return dt.strftime("%Y%m%d")
except Exception:
pass
m = re.search(r"(\d{4})[-/](\d{1,2})[-/](\d{1,2})", pubdate or "")
if m:
y, mo, d = m.groups()
return f"{int(y):04d}{int(mo):02d}{int(d):02d}"
return ""
def _iter_items(channel: ET.Element):
for tag in ["item", "{http://www.w3.org/2005/Atom}entry"]:
for it in channel.findall(tag):
yield it
def _findall_ns(el, path):
res = el.findall(path, NS)
if not res and ":" in path:
last = path.split("/")[-1]
res = el.findall(last)
return res
def _find_ns(el, path):
found = el.find(path, NS)
if found is None and ":" in path:
found = el.find(path.split("/")[-1])
return found
def _download(url: str, dst: Path) -> Path | None:
try:
r = requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
r.raise_for_status()
dst.parent.mkdir(parents=True, exist_ok=True)
with open(dst, "wb") as f:
f.write(r.content)
return dst
except Exception:
return None
def _guess_ext_from_type(mime: str) -> str:
if not mime:
return ".txt"
mime = mime.lower()
if "vtt" in mime:
return ".vtt"
if "srt" in mime or "subrip" in mime:
return ".srt"
if "json" in mime:
return ".json"
return ".txt"
def _norm_text(s: str) -> str:
s = (s or "").lower()
s = re.sub(r"\s+", " ", s)
s = re.sub(r"[^a-z0-9 _.-]+", "", s)
return s.strip()
def _strip_leading_date(basename: str) -> str:
m = re.match(r"^(\d{8})\s*-\s*(.+)$", basename)
return m.group(2) if m else basename
def _find_matching_media(date: str, title: str) -> list[Path]:
"""Find media files in LIB that likely correspond to this episode.
Strategy:
1) If YYYYMMDD is present, prefer files starting with that date.
2) Otherwise, fuzzy title match using difflib on stems (date stripped).
"""
matches: list[Path] = []
# 1) Date-based scan
if date:
for p in LIB.rglob(f"{date} - *"):
if p.is_file() and p.suffix.lower() in MEDIA_EXTS:
matches.append(p)
if matches:
return matches
# 2) Fuzzy title scan (can be expensive on huge libraries)
tkey = _norm_text(title)
if not tkey:
return matches
for p in LIB.rglob("*"):
if not (p.is_file() and p.suffix.lower() in MEDIA_EXTS):
continue
stem = _strip_leading_date(p.stem)
fkey = _norm_text(stem)
if not fkey:
continue
if difflib.SequenceMatcher(None, tkey, fkey).ratio() >= TITLE_MATCH_THRESHOLD:
matches.append(p)
return matches
def _sidecar_path_for(media_path: Path, lang: str, ext: str) -> Path:
base = media_path.with_suffix("")
lang = (lang or DEFAULT_LANG or "en").lower().replace("_", "-")
# Prefer language-suffixed sidecars (e.g., episode.en.srt)
return base.with_name(f"{base.name}.{lang}{ext}")
def _propagate_transcript_to_media(local_tr_path: Path, lang: str, date: str, title: str) -> list[str]:
"""Copy a downloaded transcript next to any matching media under LIB.
Returns a list of created sidecar file paths (as strings)."""
created: list[str] = []
if not local_tr_path.exists():
return created
ext = local_tr_path.suffix.lower()
if ext not in {".srt", ".vtt", ".txt"}:
# Unknown/unsupported transcript type for sidecar; skip silently
return created
for media in _find_matching_media(date, title):
dst = _sidecar_path_for(media, lang, ext)
try:
dst.parent.mkdir(parents=True, exist_ok=True)
if not dst.exists():
shutil.copy2(local_tr_path, dst)
created.append(str(dst))
except Exception:
# best-effort; continue
pass
return created
def _gather_transcripts(item: ET.Element):
transcripts = []
# podcast:transcript elements
for t in _findall_ns(item, "podcast:transcript"):
url = t.get("url") or t.get("href")
ttype = t.get("type") or ""
lang = t.get("language") or t.get("lang") or ""
if url:
transcripts.append({"url": url, "type": ttype, "language": lang})
# Atom-style transcript link
for link in _findall_ns(item, "atom:link"):
if (link.get("rel") or "").lower() == "transcript" and link.get("href"):
transcripts.append(
{
"url": link.get("href"),
"type": link.get("type") or "",
"language": link.get("hreflang") or "",
}
)
return transcripts
def parse_feed(feed_url: str):
items = []
try:
r = requests.get(feed_url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
r.raise_for_status()
root = ET.fromstring(r.content)
channel = root.find("channel") or root
show_title = _text(_find_ns(channel, "title")) or _text(_find_ns(root, "title"))
for it in _iter_items(channel):
title = _text(_find_ns(it, "title"))
guid = _text(_find_ns(it, "guid")) or _text(_find_ns(it, "id"))
link = _text(_find_ns(it, "link"))
pub = _text(_find_ns(it, "pubDate")) or _text(_find_ns(it, "published"))
date = _yymmdd_from_pubdate(pub)
dur = _text(_find_ns(it, "itunes:duration"))
duration_sec = _parse_duration(dur) or None
enclosure = _find_ns(it, "enclosure")
audio_url = enclosure.get("url") if enclosure is not None else ""
audio_type = enclosure.get("type") if enclosure is not None else ""
if not audio_url:
for mc in _findall_ns(it, "media:content"):
if (mc.get("type") or "").startswith("audio") and mc.get("url"):
audio_url = mc.get("url")
break
transcripts = _gather_transcripts(it)
item_rec = {
"show": show_title,
"feed_url": feed_url,
"title": title,
"guid": guid,
"link": link,
"date": date,
"duration_sec": duration_sec,
"audio_url": audio_url,
"audio_type": audio_type,
"language": DEFAULT_LANG,
"transcripts": transcripts,
}
# Optionally download transcripts locally
if DOWNLOAD_TRANSCRIPTS and transcripts:
base_name = f"{date or '00000000'} - {_slug(title)}"
for t in item_rec["transcripts"]:
ext = _guess_ext_from_type(t.get("type", ""))
parsed = urlparse(t["url"])
url_ext = Path(parsed.path).suffix.lower()
if url_ext in {".vtt", ".srt", ".txt", ".json"}:
ext = url_ext
local_file = (TRN / base_name).with_suffix(ext)
saved = _download(t["url"], local_file)
if saved:
t["local_path"] = str(saved)
# If we saved a readable sidecar type, try to place it next to matching media
if ext in {".vtt", ".srt", ".txt"}:
created = _propagate_transcript_to_media(saved, t.get("language") or DEFAULT_LANG, date, title)
if created:
t["sidecars"] = created
# Optionally download podcast audio locally
local_audio_path = None
if DOWNLOAD_AUDIO and audio_url:
show_dir = PODCASTS_ROOT / _slug(show_title or "Podcast") if PODCASTS_PER_SHOW else PODCASTS_ROOT
base_name = f"{(date or '00000000')} - {_slug(title or guid or 'episode')}"
ext = _guess_audio_ext(audio_type, audio_url)
target = (show_dir / base_name).with_suffix(ext)
# Avoid re-download if already exists
if not target.exists():
saved = _download_stream(audio_url, target)
if saved is None:
# Try a non-streaming fallback
saved = _download(audio_url, target)
else:
saved = target
if saved and saved.exists():
local_audio_path = saved
# If we previously downloaded transcript sidecars, try to place them next to this audio
for t in item_rec.get("transcripts", []) or []:
lp = t.get("local_path")
if lp:
try:
lp = Path(lp)
if lp.exists() and lp.suffix.lower() in {'.srt','.vtt','.txt'}:
sc = _sidecar_path_for(local_audio_path, t.get('language') or DEFAULT_LANG, lp.suffix.lower())
if not sc.exists():
sc.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(lp, sc)
t.setdefault("sidecars", []).append(str(sc))
except Exception:
pass
if local_audio_path:
item_rec["local_audio"] = str(local_audio_path)
items.append(item_rec)
return {"feed_url": feed_url, "show": show_title, "episodes": items}
except Exception as e:
return {"feed_url": feed_url, "error": str(e), "episodes": []}
def load_feeds_list():
feeds = []
if FEEDS_ENV:
feeds.extend([u.strip() for u in FEEDS_ENV.split(",") if u.strip()])
if FEEDS_FILE.exists():
try:
for line in FEEDS_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
feeds.append(line)
except Exception:
pass
# unique, keep order
return sorted(list(dict.fromkeys(feeds)))
def build_index():
feeds = load_feeds_list()
out = {"generated_at": int(time.time()), "feeds": feeds, "episodes": []}
for url in feeds:
data = parse_feed(url)
out["episodes"].extend(data.get("episodes", []))
OUT_INDEX.write_text(json.dumps(out, ensure_ascii=False, indent=2))
print(f"[rss] wrote index with {len(out['episodes'])} episode(s) -> {OUT_INDEX}")
return OUT_INDEX
if __name__ == "__main__":
build_index()