462 lines
17 KiB
Python
462 lines
17 KiB
Python
import os, re, json, time, shutil, difflib
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
|
|
# ---- Config ----
|
|
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
|
|
OUT_INDEX = Path(os.getenv("RSS_INDEX_PATH", str(TRN / "rss_index.json")))
|
|
FEEDS_FILE = Path(os.getenv("RSS_FEEDS_FILE", "/library/feeds.txt"))
|
|
FEEDS_ENV = os.getenv("RSS_FEEDS", "").strip()
|
|
TIMEOUT = int(os.getenv("RSS_HTTP_TIMEOUT", "30"))
|
|
DOWNLOAD_TRANSCRIPTS = os.getenv("RSS_DOWNLOAD_TRANSCRIPTS", "true").lower() in {"1", "true", "yes", "y"}
|
|
DEFAULT_LANG = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en"
|
|
RSS_SCAN_MINUTES = int(os.getenv("RSS_SCAN_MINUTES", "15"))
|
|
RSS_ONCE = os.getenv("RSS_ONCE", "0").lower() in {"1", "true", "yes", "y"}
|
|
AUDIO_MAX_MB = int(os.getenv("RSS_AUDIO_MAX_MB", "0")) # 0 = unlimited
|
|
|
|
# Where media files live; used to sidecar RSS transcripts next to matching media
|
|
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
|
|
MEDIA_EXTS = {".mp3", ".m4a", ".flac", ".wav", ".ogg", ".opus", ".mp4", ".m4v", ".mkv", ".webm", ".mov", ".avi"}
|
|
|
|
# Fuzzy title match threshold for media ↔ transcript pairing
|
|
TITLE_MATCH_THRESHOLD = float(os.getenv("RSS_TITLE_MATCH_THRESHOLD", "0.60"))
|
|
|
|
# Download podcast audio (enclosures) to a local library
|
|
# Default to LIB; compose maps rss container's /library to the host audio root
|
|
PODCASTS_ROOT = Path(os.getenv("PODCASTS_ROOT", str(LIB)))
|
|
PODCASTS_PER_SHOW = os.getenv("PODCASTS_PER_SHOW", "true").lower() in {"1","true","yes","y"}
|
|
DOWNLOAD_AUDIO = os.getenv("RSS_DOWNLOAD_AUDIO", "true").lower() in {"1","true","yes","y"}
|
|
|
|
# Namespace map (extend as needed)
|
|
NS = {
|
|
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
|
|
"content": "http://purl.org/rss/1.0/modules/content/",
|
|
"media": "http://search.yahoo.com/mrss/",
|
|
"podcast": "https://podcastindex.org/namespace/1.0",
|
|
"atom": "http://www.w3.org/2005/Atom",
|
|
}
|
|
|
|
TRN.mkdir(parents=True, exist_ok=True)
|
|
OUT_INDEX.parent.mkdir(parents=True, exist_ok=True)
|
|
PODCASTS_ROOT.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
# --- Helper to resolve Apple Podcasts URLs to direct RSS feeds ---
|
|
def _resolve_feed_url(u: str) -> str:
|
|
"""
|
|
Accepts a URL that may be an Apple Podcasts show/episode page and tries to resolve it
|
|
into a direct RSS feed URL using the public iTunes Lookup API.
|
|
For unknown hosts or failures, returns the original URL.
|
|
"""
|
|
try:
|
|
parsed = urlparse(u)
|
|
host = (parsed.netloc or "").lower()
|
|
if "podcasts.apple.com" in host:
|
|
# Apple Podcasts URLs typically end with .../id<digits>
|
|
m = re.search(r"id(\d+)", parsed.path)
|
|
if m:
|
|
pid = m.group(1)
|
|
lookup = f"https://itunes.apple.com/lookup?id={pid}"
|
|
r = requests.get(lookup, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
|
|
if r.ok:
|
|
data = r.json()
|
|
for res in data.get("results", []) or []:
|
|
feed = res.get("feedUrl")
|
|
if feed:
|
|
return feed.strip()
|
|
# otherwise return unchanged
|
|
return u
|
|
except Exception:
|
|
return u
|
|
|
|
|
|
def _text(el):
|
|
return (el.text or "").strip() if el is not None else ""
|
|
|
|
|
|
def _parse_duration(d):
|
|
if not d:
|
|
return None
|
|
s = str(d).strip()
|
|
if s.isdigit():
|
|
return int(s)
|
|
parts = [p for p in s.split(":") if p != ""]
|
|
try:
|
|
if len(parts) == 3:
|
|
h, m, sec = parts
|
|
return int(h) * 3600 + int(m) * 60 + int(float(sec))
|
|
if len(parts) == 2:
|
|
m, sec = parts
|
|
return int(m) * 60 + int(float(sec))
|
|
return int(float(parts[0]))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _slug(text: str) -> str:
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
text = re.sub(r"[^A-Za-z0-9\-._ ]+", "", text)
|
|
return (text[:120] or "episode").replace(" ", "_")
|
|
|
|
|
|
def _yymmdd_from_pubdate(pubdate: str) -> str:
|
|
try:
|
|
from email.utils import parsedate_to_datetime
|
|
|
|
dt = parsedate_to_datetime(pubdate)
|
|
if dt is not None:
|
|
return dt.strftime("%Y%m%d")
|
|
except Exception:
|
|
pass
|
|
m = re.search(r"(\d{4})[-/](\d{1,2})[-/](\d{1,2})", pubdate or "")
|
|
if m:
|
|
y, mo, d = m.groups()
|
|
return f"{int(y):04d}{int(mo):02d}{int(d):02d}"
|
|
return ""
|
|
|
|
|
|
def _iter_items(channel: ET.Element):
|
|
for tag in ["item", "{http://www.w3.org/2005/Atom}entry"]:
|
|
for it in channel.findall(tag):
|
|
yield it
|
|
|
|
|
|
def _findall_ns(el, path):
|
|
res = el.findall(path, NS)
|
|
if not res and ":" in path:
|
|
last = path.split("/")[-1]
|
|
res = el.findall(last)
|
|
return res
|
|
|
|
|
|
def _find_ns(el, path):
|
|
found = el.find(path, NS)
|
|
if found is None and ":" in path:
|
|
found = el.find(path.split("/")[-1])
|
|
return found
|
|
|
|
|
|
def _download(url: str, dst: Path) -> Path | None:
|
|
try:
|
|
r = requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
|
|
r.raise_for_status()
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(dst, "wb") as f:
|
|
f.write(r.content)
|
|
return dst
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _guess_ext_from_type(mime: str) -> str:
|
|
if not mime:
|
|
return ".txt"
|
|
mime = mime.lower()
|
|
if "vtt" in mime:
|
|
return ".vtt"
|
|
if "srt" in mime or "subrip" in mime:
|
|
return ".srt"
|
|
if "json" in mime:
|
|
return ".json"
|
|
return ".txt"
|
|
|
|
|
|
def _guess_audio_ext(mime: str, url: str) -> str:
|
|
# Prefer by MIME; fall back to URL suffix
|
|
mime = (mime or "").lower()
|
|
if "mp3" in mime:
|
|
return ".mp3"
|
|
if "aac" in mime or "mp4a" in mime:
|
|
return ".m4a"
|
|
if "m4a" in mime:
|
|
return ".m4a"
|
|
if "ogg" in mime:
|
|
return ".ogg"
|
|
if "opus" in mime:
|
|
return ".opus"
|
|
if "flac" in mime:
|
|
return ".flac"
|
|
if "wav" in mime:
|
|
return ".wav"
|
|
# fallback by URL
|
|
suf = Path(urlparse(url).path).suffix.lower()
|
|
if suf in {".mp3", ".m4a", ".aac", ".ogg", ".opus", ".flac", ".wav"}:
|
|
return ".m4a" if suf == ".aac" else suf
|
|
return ".mp3"
|
|
|
|
def _download_stream(url: str, dst: Path) -> Path | None:
|
|
try:
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
with requests.get(url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"}, stream=True) as r:
|
|
r.raise_for_status()
|
|
max_bytes = AUDIO_MAX_MB * 1024 * 1024 if AUDIO_MAX_MB > 0 else None
|
|
total = 0
|
|
with open(dst, "wb") as f:
|
|
for chunk in r.iter_content(chunk_size=1024 * 256):
|
|
if not chunk:
|
|
continue
|
|
f.write(chunk)
|
|
total += len(chunk)
|
|
if max_bytes and total > max_bytes:
|
|
# stop and remove partial
|
|
try:
|
|
f.close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
dst.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
return dst
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _norm_text(s: str) -> str:
|
|
s = (s or "").lower()
|
|
s = re.sub(r"\s+", " ", s)
|
|
s = re.sub(r"[^a-z0-9 _.-]+", "", s)
|
|
return s.strip()
|
|
|
|
def _strip_leading_date(basename: str) -> str:
|
|
m = re.match(r"^(\d{8})\s*-\s*(.+)$", basename)
|
|
return m.group(2) if m else basename
|
|
|
|
def _find_matching_media(date: str, title: str) -> list[Path]:
|
|
"""Find media files in LIB that likely correspond to this episode.
|
|
Strategy:
|
|
1) If YYYYMMDD is present, prefer files starting with that date.
|
|
2) Otherwise, fuzzy title match using difflib on stems (date stripped).
|
|
"""
|
|
matches: list[Path] = []
|
|
# 1) Date-based scan
|
|
if date:
|
|
for p in LIB.rglob(f"{date} - *"):
|
|
if p.is_file() and p.suffix.lower() in MEDIA_EXTS:
|
|
matches.append(p)
|
|
if matches:
|
|
return matches
|
|
|
|
# 2) Fuzzy title scan (can be expensive on huge libraries)
|
|
tkey = _norm_text(title)
|
|
if not tkey:
|
|
return matches
|
|
for p in LIB.rglob("*"):
|
|
if not (p.is_file() and p.suffix.lower() in MEDIA_EXTS):
|
|
continue
|
|
stem = _strip_leading_date(p.stem)
|
|
fkey = _norm_text(stem)
|
|
if not fkey:
|
|
continue
|
|
if difflib.SequenceMatcher(None, tkey, fkey).ratio() >= TITLE_MATCH_THRESHOLD:
|
|
matches.append(p)
|
|
return matches
|
|
|
|
def _sidecar_path_for(media_path: Path, lang: str, ext: str) -> Path:
|
|
base = media_path.with_suffix("")
|
|
lang = (lang or DEFAULT_LANG or "en").lower().replace("_", "-")
|
|
# Prefer language-suffixed sidecars (e.g., episode.en.srt)
|
|
return base.with_name(f"{base.name}.{lang}{ext}")
|
|
|
|
def _propagate_transcript_to_media(local_tr_path: Path, lang: str, date: str, title: str) -> list[str]:
|
|
"""Copy a downloaded transcript next to any matching media under LIB.
|
|
Returns a list of created sidecar file paths (as strings)."""
|
|
created: list[str] = []
|
|
if not local_tr_path.exists():
|
|
return created
|
|
ext = local_tr_path.suffix.lower()
|
|
if ext not in {".srt", ".vtt", ".txt"}:
|
|
# Unknown/unsupported transcript type for sidecar; skip silently
|
|
return created
|
|
for media in _find_matching_media(date, title):
|
|
dst = _sidecar_path_for(media, lang, ext)
|
|
try:
|
|
dst.parent.mkdir(parents=True, exist_ok=True)
|
|
if not dst.exists():
|
|
shutil.copy2(local_tr_path, dst)
|
|
created.append(str(dst))
|
|
except Exception:
|
|
# best-effort; continue
|
|
pass
|
|
return created
|
|
|
|
|
|
def _gather_transcripts(item: ET.Element):
|
|
transcripts = []
|
|
# podcast:transcript elements
|
|
for t in _findall_ns(item, "podcast:transcript"):
|
|
url = t.get("url") or t.get("href")
|
|
ttype = t.get("type") or ""
|
|
lang = t.get("language") or t.get("lang") or ""
|
|
if url:
|
|
transcripts.append({"url": url, "type": ttype, "language": lang})
|
|
# Atom-style transcript link
|
|
for link in _findall_ns(item, "atom:link"):
|
|
if (link.get("rel") or "").lower() == "transcript" and link.get("href"):
|
|
transcripts.append(
|
|
{
|
|
"url": link.get("href"),
|
|
"type": link.get("type") or "",
|
|
"language": link.get("hreflang") or "",
|
|
}
|
|
)
|
|
return transcripts
|
|
|
|
|
|
def parse_feed(feed_url: str):
|
|
items = []
|
|
try:
|
|
print(f"[rss] fetching {feed_url}", flush=True)
|
|
r = requests.get(feed_url, timeout=TIMEOUT, headers={"User-Agent": "podx/1.0"})
|
|
r.raise_for_status()
|
|
root = ET.fromstring(r.content)
|
|
|
|
channel = root.find("channel") or root
|
|
show_title = _text(_find_ns(channel, "title")) or _text(_find_ns(root, "title"))
|
|
if not show_title:
|
|
show_title = ""
|
|
|
|
for it in _iter_items(channel):
|
|
title = _text(_find_ns(it, "title"))
|
|
guid = _text(_find_ns(it, "guid")) or _text(_find_ns(it, "id"))
|
|
link = _text(_find_ns(it, "link"))
|
|
pub = _text(_find_ns(it, "pubDate")) or _text(_find_ns(it, "published"))
|
|
date = _yymmdd_from_pubdate(pub)
|
|
dur = _text(_find_ns(it, "itunes:duration"))
|
|
duration_sec = _parse_duration(dur) or None
|
|
enclosure = _find_ns(it, "enclosure")
|
|
audio_url = enclosure.get("url") if enclosure is not None else ""
|
|
audio_type = enclosure.get("type") if enclosure is not None else ""
|
|
|
|
if not audio_url:
|
|
for mc in _findall_ns(it, "media:content"):
|
|
if (mc.get("type") or "").startswith("audio") and mc.get("url"):
|
|
audio_url = mc.get("url")
|
|
break
|
|
|
|
transcripts = _gather_transcripts(it)
|
|
|
|
item_rec = {
|
|
"show": show_title,
|
|
"feed_url": feed_url,
|
|
"title": title,
|
|
"guid": guid,
|
|
"link": link,
|
|
"date": date,
|
|
"duration_sec": duration_sec,
|
|
"audio_url": audio_url,
|
|
"audio_type": audio_type,
|
|
"language": DEFAULT_LANG,
|
|
"transcripts": transcripts,
|
|
}
|
|
|
|
# Optionally download transcripts locally
|
|
if DOWNLOAD_TRANSCRIPTS and transcripts:
|
|
base_name = f"{date or '00000000'} - {_slug(title)}"
|
|
for t in item_rec["transcripts"]:
|
|
ext = _guess_ext_from_type(t.get("type", ""))
|
|
parsed = urlparse(t["url"])
|
|
url_ext = Path(parsed.path).suffix.lower()
|
|
if url_ext in {".vtt", ".srt", ".txt", ".json"}:
|
|
ext = url_ext
|
|
local_file = (TRN / base_name).with_suffix(ext)
|
|
saved = _download(t["url"], local_file)
|
|
if saved:
|
|
t["local_path"] = str(saved)
|
|
# If we saved a readable sidecar type, try to place it next to matching media
|
|
if ext in {".vtt", ".srt", ".txt"}:
|
|
created = _propagate_transcript_to_media(saved, t.get("language") or DEFAULT_LANG, date, title)
|
|
if created:
|
|
t["sidecars"] = created
|
|
|
|
# Optionally download podcast audio locally
|
|
local_audio_path = None
|
|
if DOWNLOAD_AUDIO and audio_url:
|
|
show_dir = PODCASTS_ROOT / _slug(show_title or "Podcast") if PODCASTS_PER_SHOW else PODCASTS_ROOT
|
|
base_name = f"{(date or '00000000')} - {_slug(title or guid or 'episode')}"
|
|
ext = _guess_audio_ext(audio_type, audio_url)
|
|
target = (show_dir / base_name).with_suffix(ext)
|
|
# Avoid re-download if already exists
|
|
if not target.exists():
|
|
saved = _download_stream(audio_url, target)
|
|
if saved is None:
|
|
# Try a non-streaming fallback
|
|
saved = _download(audio_url, target)
|
|
else:
|
|
saved = target
|
|
if saved and saved.exists():
|
|
local_audio_path = saved
|
|
# If we previously downloaded transcript sidecars, try to place them next to this audio
|
|
for t in item_rec.get("transcripts", []) or []:
|
|
lp = t.get("local_path")
|
|
if lp:
|
|
try:
|
|
lp = Path(lp)
|
|
if lp.exists() and lp.suffix.lower() in {'.srt','.vtt','.txt'}:
|
|
sc = _sidecar_path_for(local_audio_path, t.get('language') or DEFAULT_LANG, lp.suffix.lower())
|
|
if not sc.exists():
|
|
sc.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(lp, sc)
|
|
t.setdefault("sidecars", []).append(str(sc))
|
|
except Exception:
|
|
pass
|
|
if local_audio_path:
|
|
item_rec["local_audio"] = str(local_audio_path)
|
|
|
|
items.append(item_rec)
|
|
|
|
print(f"[rss] parsed {len(items)} episode(s) from {show_title or feed_url}", flush=True)
|
|
return {"feed_url": feed_url, "show": show_title, "episodes": items}
|
|
except Exception as e:
|
|
print(f"[rss] ERROR parsing {feed_url}: {e}", flush=True)
|
|
return {"feed_url": feed_url, "error": str(e), "episodes": []}
|
|
|
|
|
|
def load_feeds_list():
|
|
print(f"[rss] FEEDS_FILE={FEEDS_FILE} FEEDS_ENV={'set' if bool(FEEDS_ENV) else 'unset'}", flush=True)
|
|
feeds = []
|
|
if FEEDS_ENV:
|
|
feeds.extend([u.strip() for u in FEEDS_ENV.split(",") if u.strip()])
|
|
if FEEDS_FILE.exists():
|
|
try:
|
|
for line in FEEDS_FILE.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
feeds.append(line)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
print(f"[rss] feeds file not found: {FEEDS_FILE}", flush=True)
|
|
# unique, keep order
|
|
feeds = sorted(list(dict.fromkeys(feeds)))
|
|
feeds = [_resolve_feed_url(u) for u in feeds]
|
|
print(f"[rss] resolved {len(feeds)} feed URL(s) (after normalization)", flush=True)
|
|
print(f"[rss] parsed {len(feeds)} feed URL(s)", flush=True)
|
|
return feeds
|
|
|
|
|
|
def build_index():
|
|
feeds = load_feeds_list()
|
|
out = {"generated_at": int(time.time()), "feeds": feeds, "episodes": []}
|
|
for url in feeds:
|
|
data = parse_feed(url)
|
|
out["episodes"].extend(data.get("episodes", []))
|
|
OUT_INDEX.write_text(json.dumps(out, ensure_ascii=False, indent=2))
|
|
print(f"[rss] wrote index with {len(out['episodes'])} episode(s) -> {OUT_INDEX}")
|
|
return OUT_INDEX
|
|
|
|
|
|
if __name__ == "__main__":
|
|
while True:
|
|
try:
|
|
build_index()
|
|
except Exception as e:
|
|
print(f"[rss] build error: {e}", flush=True)
|
|
if RSS_ONCE:
|
|
break
|
|
time.sleep(max(1, RSS_SCAN_MINUTES) * 60)
|