Files
podx/app/rss_ingest.py

462 lines
17 KiB
Python

import os, re, json, time, shutil, difflib
from pathlib import Path
from urllib.parse import urlparse
import requests
import xml.etree.ElementTree as ET
# ---- Config ----
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
OUT_INDEX = Path(os.getenv("RSS_INDEX_PATH", str(TRN / "rss_index.json")))
FEEDS_FILE = Path(os.getenv("RSS_FEEDS_FILE", "/library/feeds.txt"))
FEEDS_ENV = os.getenv("RSS_FEEDS", "").strip()
TIMEOUT = int(os.getenv("RSS_HTTP_TIMEOUT", "30"))
DOWNLOAD_TRANSCRIPTS = os.getenv("RSS_DOWNLOAD_TRANSCRIPTS", "true").lower() in {"1", "true", "yes", "y"}
DEFAULT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
RSS_SCAN_MINUTES = int(os.getenv("RSS_SCAN_MINUTES", "15"))
RSS_ONCE = os.getenv("RSS_ONCE", "0").lower() in {"1", "true", "yes", "y"}
AUDIO_MAX_MB = int(os.getenv("RSS_AUDIO_MAX_MB", "0")) # 0 = unlimited
# Where media files live; used to sidecar RSS transcripts next to matching media
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
MEDIA_EXTS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".opus", ".mp4", ".m4v", ".mkv", ".webm", ".mov", ".avi"}
# Fuzzy title match threshold for media ↔ transcript pairing
TITLE_MATCH_THRESHOLD = float(os.getenv("RSS_TITLE_MATCH_THRESHOLD", "0.60"))
# Download podcast audio (enclosures) to a local library
# Default to saving directly under LIB (no extra "Podcasts" subfolder)
PODCASTS_ROOT = Path(os.getenv("PODCASTS_ROOT", str(LIB)))
PODCASTS_PER_SHOW = os.getenv("PODCASTS_PER_SHOW", "true").lower() in {"1","true","yes","y"}
DOWNLOAD_AUDIO = os.getenv("RSS_DOWNLOAD_AUDIO", "true").lower() in {"1","true","yes","y"}
# Namespace map (extend as needed)
NS = {
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
"content": "http://purl.org/rss/1.0/modules/content/",
"media": "http://search.yahoo.com/mrss/",
"podcast": "https://podcastindex.org/namespace/1.0",
"atom": "http://www.w3.org/2005/Atom",
}
TRN.mkdir(parents=True, exist_ok=True)
OUT_INDEX.parent.mkdir(parents=True, exist_ok=True)
PODCASTS_ROOT.mkdir(parents=True, exist_ok=True)
# --- Helper to resolve Apple Podcasts URLs to direct RSS feeds ---
def _resolve_feed_url(u: str) -> str:
"""
Accepts a URL that may be an Apple Podcasts show/episode page and tries to resolve it
into a direct RSS feed URL using the public iTunes Lookup API.
For unknown hosts or failures, returns the original URL.
"""
try:
parsed = urlparse(u)
host = (parsed.netloc or "").lower()
if "podcasts.apple.com" in host:
# Apple Podcasts URLs typically end with .../id<digits>
m = re.search(r"id(\d+)", parsed.path)
if m:
pid = m.group(1)
lookup = f"https://itunes.apple.com/lookup?id={pid}"
r = requests.get(lookup, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
if r.ok:
data = r.json()
for res in data.get("results", []) or []:
feed = res.get("feedUrl")
if feed:
return feed.strip()
# otherwise return unchanged
return u
except Exception:
return u
def _text(el):
return (el.text or "").strip() if el is not None else ""
def _parse_duration(d):
if not d:
return None
s = str(d).strip()
if s.isdigit():
return int(s)
parts = [p for p in s.split(":") if p != ""]
try:
if len(parts) == 3:
h, m, sec = parts
return int(h) * 3600 + int(m) * 60 + int(float(sec))
if len(parts) == 2:
m, sec = parts
return int(m) * 60 + int(float(sec))
return int(float(parts[0]))
except Exception:
return None
def _slug(text: str) -> str:
text = re.sub(r"\s+", " ", text).strip()
text = re.sub(r"[^A-Za-z0-9\-._ ]+", "", text)
return (text[:120] or "episode").replace(" ", "_")
def _yymmdd_from_pubdate(pubdate: str) -> str:
try:
from email.utils import parsedate_to_datetime
dt = parsedate_to_datetime(pubdate)
if dt is not None:
return dt.strftime("%Y%m%d")
except Exception:
pass
m = re.search(r"(\d{4})[-/](\d{1,2})[-/](\d{1,2})", pubdate or "")
if m:
y, mo, d = m.groups()
return f"{int(y):04d}{int(mo):02d}{int(d):02d}"
return ""
def _iter_items(channel: ET.Element):
for tag in ["item", "{http://www.w3.org/2005/Atom}entry"]:
for it in channel.findall(tag):
yield it
def _findall_ns(el, path):
res = el.findall(path, NS)
if not res and ":" in path:
last = path.split("/")[-1]
res = el.findall(last)
return res
def _find_ns(el, path):
found = el.find(path, NS)
if found is None and ":" in path:
found = el.find(path.split("/")[-1])
return found
def _download(url: str, dst: Path) -> Path | None:
try:
r = requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
r.raise_for_status()
dst.parent.mkdir(parents=True, exist_ok=True)
with open(dst, "wb") as f:
f.write(r.content)
return dst
except Exception:
return None
def _guess_ext_from_type(mime: str) -> str:
if not mime:
return ".txt"
mime = mime.lower()
if "vtt" in mime:
return ".vtt"
if "srt" in mime or "subrip" in mime:
return ".srt"
if "json" in mime:
return ".json"
return ".txt"
def _guess_audio_ext(mime: str, url: str) -> str:
# Prefer by MIME; fall back to URL suffix
mime = (mime or "").lower()
if "mp3" in mime:
return ".mp3"
if "aac" in mime or "mp4a" in mime:
return ".m4a"
if "m4a" in mime:
return ".m4a"
if "ogg" in mime:
return ".ogg"
if "opus" in mime:
return ".opus"
if "flac" in mime:
return ".flac"
if "wav" in mime:
return ".wav"
# fallback by URL
suf = Path(urlparse(url).path).suffix.lower()
if suf in {".mp3", ".m4a", ".aac", ".ogg", ".opus", ".flac", ".wav"}:
return ".m4a" if suf == ".aac" else suf
return ".mp3"
def _download_stream(url: str, dst: Path) -> Path | None:
try:
dst.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"}, stream=True) as r:
r.raise_for_status()
max_bytes = AUDIO_MAX_MB * 1024 * 1024 if AUDIO_MAX_MB > 0 else None
total = 0
with open(dst, "wb") as f:
for chunk in r.iter_content(chunk_size=1024 * 256):
if not chunk:
continue
f.write(chunk)
total += len(chunk)
if max_bytes and total > max_bytes:
# stop and remove partial
try:
f.close()
except Exception:
pass
try:
dst.unlink(missing_ok=True)
except Exception:
pass
return None
return dst
except Exception:
return None
def _norm_text(s: str) -> str:
s = (s or "").lower()
s = re.sub(r"\s+", " ", s)
s = re.sub(r"[^a-z0-9 _.-]+", "", s)
return s.strip()
def _strip_leading_date(basename: str) -> str:
m = re.match(r"^(\d{8})\s*-\s*(.+)$", basename)
return m.group(2) if m else basename
def _find_matching_media(date: str, title: str) -> list[Path]:
"""Find media files in LIB that likely correspond to this episode.
Strategy:
1) If YYYYMMDD is present, prefer files starting with that date.
2) Otherwise, fuzzy title match using difflib on stems (date stripped).
"""
matches: list[Path] = []
# 1) Date-based scan
if date:
for p in LIB.rglob(f"{date} - *"):
if p.is_file() and p.suffix.lower() in MEDIA_EXTS:
matches.append(p)
if matches:
return matches
# 2) Fuzzy title scan (can be expensive on huge libraries)
tkey = _norm_text(title)
if not tkey:
return matches
for p in LIB.rglob("*"):
if not (p.is_file() and p.suffix.lower() in MEDIA_EXTS):
continue
stem = _strip_leading_date(p.stem)
fkey = _norm_text(stem)
if not fkey:
continue
if difflib.SequenceMatcher(None, tkey, fkey).ratio() >= TITLE_MATCH_THRESHOLD:
matches.append(p)
return matches
def _sidecar_path_for(media_path: Path, lang: str, ext: str) -> Path:
base = media_path.with_suffix("")
lang = (lang or DEFAULT_LANG or "en").lower().replace("_", "-")
# Prefer language-suffixed sidecars (e.g., episode.en.srt)
return base.with_name(f"{base.name}.{lang}{ext}")
def _propagate_transcript_to_media(local_tr_path: Path, lang: str, date: str, title: str) -> list[str]:
"""Copy a downloaded transcript next to any matching media under LIB.
Returns a list of created sidecar file paths (as strings)."""
created: list[str] = []
if not local_tr_path.exists():
return created
ext = local_tr_path.suffix.lower()
if ext not in {".srt", ".vtt", ".txt"}:
# Unknown/unsupported transcript type for sidecar; skip silently
return created
for media in _find_matching_media(date, title):
dst = _sidecar_path_for(media, lang, ext)
try:
dst.parent.mkdir(parents=True, exist_ok=True)
if not dst.exists():
shutil.copy2(local_tr_path, dst)
created.append(str(dst))
except Exception:
# best-effort; continue
pass
return created
def _gather_transcripts(item: ET.Element):
transcripts = []
# podcast:transcript elements
for t in _findall_ns(item, "podcast:transcript"):
url = t.get("url") or t.get("href")
ttype = t.get("type") or ""
lang = t.get("language") or t.get("lang") or ""
if url:
transcripts.append({"url": url, "type": ttype, "language": lang})
# Atom-style transcript link
for link in _findall_ns(item, "atom:link"):
if (link.get("rel") or "").lower() == "transcript" and link.get("href"):
transcripts.append(
{
"url": link.get("href"),
"type": link.get("type") or "",
"language": link.get("hreflang") or "",
}
)
return transcripts
def parse_feed(feed_url: str):
items = []
try:
print(f"[rss] fetching {feed_url}", flush=True)
r = requests.get(feed_url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
r.raise_for_status()
root = ET.fromstring(r.content)
channel = root.find("channel") or root
show_title = _text(_find_ns(channel, "title")) or _text(_find_ns(root, "title"))
if not show_title:
show_title = ""
for it in _iter_items(channel):
title = _text(_find_ns(it, "title"))
guid = _text(_find_ns(it, "guid")) or _text(_find_ns(it, "id"))
link = _text(_find_ns(it, "link"))
pub = _text(_find_ns(it, "pubDate")) or _text(_find_ns(it, "published"))
date = _yymmdd_from_pubdate(pub)
dur = _text(_find_ns(it, "itunes:duration"))
duration_sec = _parse_duration(dur) or None
enclosure = _find_ns(it, "enclosure")
audio_url = enclosure.get("url") if enclosure is not None else ""
audio_type = enclosure.get("type") if enclosure is not None else ""
if not audio_url:
for mc in _findall_ns(it, "media:content"):
if (mc.get("type") or "").startswith("audio") and mc.get("url"):
audio_url = mc.get("url")
break
transcripts = _gather_transcripts(it)
item_rec = {
"show": show_title,
"feed_url": feed_url,
"title": title,
"guid": guid,
"link": link,
"date": date,
"duration_sec": duration_sec,
"audio_url": audio_url,
"audio_type": audio_type,
"language": DEFAULT_LANG,
"transcripts": transcripts,
}
# Optionally download transcripts locally
if DOWNLOAD_TRANSCRIPTS and transcripts:
base_name = f"{date or '00000000'} - {_slug(title)}"
for t in item_rec["transcripts"]:
ext = _guess_ext_from_type(t.get("type", ""))
parsed = urlparse(t["url"])
url_ext = Path(parsed.path).suffix.lower()
if url_ext in {".vtt", ".srt", ".txt", ".json"}:
ext = url_ext
local_file = (TRN / base_name).with_suffix(ext)
saved = _download(t["url"], local_file)
if saved:
t["local_path"] = str(saved)
# If we saved a readable sidecar type, try to place it next to matching media
if ext in {".vtt", ".srt", ".txt"}:
created = _propagate_transcript_to_media(saved, t.get("language") or DEFAULT_LANG, date, title)
if created:
t["sidecars"] = created
# Optionally download podcast audio locally
local_audio_path = None
if DOWNLOAD_AUDIO and audio_url:
show_dir = PODCASTS_ROOT / _slug(show_title or "Podcast") if PODCASTS_PER_SHOW else PODCASTS_ROOT
base_name = f"{(date or '00000000')} - {_slug(title or guid or 'episode')}"
ext = _guess_audio_ext(audio_type, audio_url)
target = (show_dir / base_name).with_suffix(ext)
# Avoid re-download if already exists
if not target.exists():
saved = _download_stream(audio_url, target)
if saved is None:
# Try a non-streaming fallback
saved = _download(audio_url, target)
else:
saved = target
if saved and saved.exists():
local_audio_path = saved
# If we previously downloaded transcript sidecars, try to place them next to this audio
for t in item_rec.get("transcripts", []) or []:
lp = t.get("local_path")
if lp:
try:
lp = Path(lp)
if lp.exists() and lp.suffix.lower() in {'.srt','.vtt','.txt'}:
sc = _sidecar_path_for(local_audio_path, t.get('language') or DEFAULT_LANG, lp.suffix.lower())
if not sc.exists():
sc.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(lp, sc)
t.setdefault("sidecars", []).append(str(sc))
except Exception:
pass
if local_audio_path:
item_rec["local_audio"] = str(local_audio_path)
items.append(item_rec)
print(f"[rss] parsed {len(items)} episode(s) from {show_title or feed_url}", flush=True)
return {"feed_url": feed_url, "show": show_title, "episodes": items}
except Exception as e:
print(f"[rss] ERROR parsing {feed_url}: {e}", flush=True)
return {"feed_url": feed_url, "error": str(e), "episodes": []}
def load_feeds_list():
print(f"[rss] FEEDS_FILE={FEEDS_FILE} FEEDS_ENV={'set' if bool(FEEDS_ENV) else 'unset'}", flush=True)
feeds = []
if FEEDS_ENV:
feeds.extend([u.strip() for u in FEEDS_ENV.split(",") if u.strip()])
if FEEDS_FILE.exists():
try:
for line in FEEDS_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
feeds.append(line)
except Exception:
pass
else:
print(f"[rss] feeds file not found: {FEEDS_FILE}", flush=True)
# unique, keep order
feeds = sorted(list(dict.fromkeys(feeds)))
feeds = [_resolve_feed_url(u) for u in feeds]
print(f"[rss] resolved {len(feeds)} feed URL(s) (after normalization)", flush=True)
print(f"[rss] parsed {len(feeds)} feed URL(s)", flush=True)
return feeds
def build_index():
feeds = load_feeds_list()
out = {"generated_at": int(time.time()), "feeds": feeds, "episodes": []}
for url in feeds:
data = parse_feed(url)
out["episodes"].extend(data.get("episodes", []))
OUT_INDEX.write_text(json.dumps(out, ensure_ascii=False, indent=2))
print(f"[rss] wrote index with {len(out['episodes'])} episode(s) -> {OUT_INDEX}")
return OUT_INDEX
if __name__ == "__main__":
while True:
try:
build_index()
except Exception as e:
print(f"[rss] build error: {e}", flush=True)
if RSS_ONCE:
break
time.sleep(max(1, RSS_SCAN_MINUTES) * 60)