Files
podx/app/rss_ingest.py

388 lines
13 KiB
Python

#!/usr/bin/env python3
"""
RSS ingester for PodX
- Reads feeds from env var RSS_FEEDS (comma-separated) *and*/or from a file (FEEDS_FILE, default /library/feeds.txt)
- Fetches RSS with ETag/Last-Modified caching to avoid re-downloading unchanged feeds
- Saves audio to LIBRARY_ROOT/<podcast>/<YYYYMMDD - title>.<ext>
- Saves transcript sidecars when `<podcast:transcript>` links are present (prefers TextWithTimestamps → WebVTT → SRT → TXT)
- Enqueues `worker.handle_local_file` for indexing/transcription (worker will skip Whisper if a transcript sidecar exists)
- Keeps a small state JSON with per-feed ETag/Last-Modified and per-item processed GUIDs to avoid duplicate work
Environment variables (with sane defaults):
MEILI_URL (unused directly here, but kept for parity)
REDIS_URL redis://redis:6379/0
LIBRARY_ROOT /library
TRANSCRIPT_ROOT /transcripts
RSS_FEEDS "" (comma-separated list)
FEEDS_FILE /library/feeds.txt
RSS_SCAN_MINUTES 120
RSS_ONCE 0 ("1" to run once and exit)
USER_AGENT podx-rss/1.0 (+local-archive)
RSS_STATE_FILE /library/.rss_state.json
RSS_CONNECT_TIMEOUT 15 (seconds)
RSS_READ_TIMEOUT 60 (seconds)
AUDIO_MAX_MB 4096 (skip larger-than if HEAD reveals size > max, 0 = unlimited)
"""
import os
import re
import sys
import json
import time
import logging
import itertools
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from urllib.parse import urlparse
import requests
import xml.etree.ElementTree as ET
import redis
from rq import Queue
logging.basicConfig(level=logging.INFO, format='[rss] %(message)s')
log = logging.getLogger("rss")
# Config
MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700")
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
LIBRARY_ROOT = Path(os.getenv("LIBRARY_ROOT", "/library"))
TRANSCRIPT_ROOT = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
RSS_FEEDS_ENV = [s.strip() for s in os.getenv("RSS_FEEDS", "").split(",") if s.strip()]
FEEDS_FILE = Path(os.getenv("FEEDS_FILE", str(LIBRARY_ROOT / "feeds.txt")))
RSS_SCAN_MINUTES = int(os.getenv("RSS_SCAN_MINUTES", "120"))
RSS_ONCE = os.getenv("RSS_ONCE", "0") == "1"
USER_AGENT = os.getenv("USER_AGENT", "podx-rss/1.0 (+local-archive)")
STATE_FILE = Path(os.getenv("RSS_STATE_FILE", str(LIBRARY_ROOT / ".rss_state.json")))
CONNECT_TIMEOUT = float(os.getenv("RSS_CONNECT_TIMEOUT", "15"))
READ_TIMEOUT = float(os.getenv("RSS_READ_TIMEOUT", "60"))
AUDIO_MAX_MB = int(os.getenv("AUDIO_MAX_MB", "4096"))
# Redis queue
r = redis.from_url(REDIS_URL)
q = Queue("default", connection=r)
# HTTP session
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": USER_AGENT})
# Namespaces commonly used in podcast RSS
NS = {
"podcast": "https://podcastindex.org/namespace/1.0",
"itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd",
"media": "http://search.yahoo.com/mrss/",
"content": "http://purl.org/rss/1.0/modules/content/",
"dc": "http://purl.org/dc/elements/1.1/",
}
# ----------------- helpers -----------------
def safe(s: str) -> str:
s = re.sub(r"[\\/:*?\"<>|]", "_", s)
# collapse whitespace and trim
s = re.sub(r"\s+", " ", s).strip()
# guard against very long filenames
return s[:200] if len(s) > 200 else s
def load_state() -> Dict:
if STATE_FILE.exists():
try:
return json.loads(STATE_FILE.read_text("utf-8"))
except Exception:
log.warning("State file unreadable, starting fresh")
return {"feeds": {}, "items": {}} # items keyed by GUID / enclosure URL
def save_state(state: Dict) -> None:
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = STATE_FILE.with_suffix(STATE_FILE.suffix + ".tmp")
tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2))
tmp.replace(STATE_FILE)
def load_feeds() -> List[str]:
feeds = list(RSS_FEEDS_ENV)
if FEEDS_FILE.exists():
try:
for line in FEEDS_FILE.read_text("utf-8").splitlines():
s = line.strip()
if not s or s.startswith("#"): # allow comments
continue
feeds.append(s)
except Exception as e:
log.warning(f"Failed to read {FEEDS_FILE}: {e}")
# de-dup preserving order
seen = set()
uniq = []
for f in feeds:
if f not in seen:
uniq.append(f)
seen.add(f)
return uniq
def fetch(url: str, *, etag: Optional[str]=None, modified: Optional[str]=None, as_text=False):
headers = {}
if etag:
headers["If-None-Match"] = etag
if modified:
headers["If-Modified-Since"] = modified
resp = SESSION.get(url, headers=headers, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT))
if resp.status_code == 304:
return None, 304, None, None
resp.raise_for_status()
new_etag = resp.headers.get("ETag")
new_mod = resp.headers.get("Last-Modified")
return (resp.text if as_text else resp.content), resp.status_code, new_etag, new_mod
def head_size(url: str) -> Optional[int]:
try:
h = SESSION.head(url, allow_redirects=True, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT))
if h.ok:
cl = h.headers.get("Content-Length")
return int(cl) if cl and cl.isdigit() else None
except Exception:
return None
return None
def best_transcript_links(item) -> List[str]:
links: List[Tuple[int, str, str]] = []
# Try explicit QName first
for tag in item.findall(".//{https://podcastindex.org/namespace/1.0}transcript"):
t = (tag.attrib.get("type") or "").lower()
url = tag.attrib.get("url")
if url:
links.append((0, t, url))
# Namespace-prefixed fallback
for tag in item.findall(".//podcast:transcript", NS):
t = (tag.attrib.get("type") or "").lower()
url = tag.attrib.get("url")
if url:
links.append((0, t, url))
order = [
"text/plain", # often used for TextWithTimestamps
"application/json",
"text/vtt",
"application/srt",
"application/x-subrip",
"application/text",
"text/plain; charset=utf-8",
]
key = {v: i for i, v in enumerate(order)}
ranked = []
for _, t, url in links:
ranked.append((key.get(t, 999), t, url))
ranked.sort()
return [u for _, _, u in ranked]
def get_enclosure(item) -> Optional[str]:
enc = item.find("enclosure")
if enc is not None and enc.attrib.get("url"):
return enc.attrib["url"]
mc = item.find("media:content", NS)
if mc is not None and mc.attrib.get("url"):
return mc.attrib["url"]
return None
def parse_pubdate(item) -> datetime:
# Try common fields
candidates = [
item.findtext("pubDate"),
item.findtext("dc:date", namespaces=NS),
item.findtext("{http://purl.org/dc/elements/1.1/}date"),
]
for pd in filter(None, candidates):
s = pd.strip()
# Try several common formats
for fmt in [
"%a, %d %b %Y %H:%M:%S %z",
"%a, %d %b %Y %H:%M:%S",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S",
]:
try:
return datetime.strptime(s, fmt)
except Exception:
pass
return datetime.utcnow()
def save_bytes(path: Path, data: bytes) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".part")
tmp.write_bytes(data)
tmp.replace(path)
def decide_audio_ext(audio_url: str) -> str:
p = urlparse(audio_url)
low = p.path.lower()
if low.endswith(".m4a"):
return ".m4a"
if low.endswith(".mp3"):
return ".mp3"
if low.endswith(".ogg") or low.endswith(".oga"):
return ".ogg"
if low.endswith(".aac"):
return ".aac"
if low.endswith(".wav"):
return ".wav"
return ".mp3"
def item_key(item) -> str:
# Prefer GUID value, else enclosure URL, else title+date
guid = item.findtext("guid")
if guid:
return guid.strip()
enc = get_enclosure(item)
if enc:
return enc
title = item.findtext("title") or "Episode"
pub = parse_pubdate(item).strftime("%Y%m%d")
return f"{pub}:{title}"
# ----------------- core ingest -----------------
def ingest_feed(url: str, state: Dict) -> int:
fstate = state.setdefault("feeds", {}).setdefault(url, {})
etag = fstate.get("etag")
mod = fstate.get("modified")
log.info(f"Fetch RSS: {url}")
try:
data, status, new_etag, new_mod = fetch(url, etag=etag, modified=mod, as_text=True)
except Exception as e:
log.error(f"Fetch failed: {e}")
return 0
if status == 304:
log.info("Not modified")
return 0
if new_etag:
fstate["etag"] = new_etag
if new_mod:
fstate["modified"] = new_mod
try:
root = ET.fromstring(data)
except Exception as e:
log.error(f"XML parse error: {e}")
return 0
channel_title = safe((root.findtext("channel/title") or "Podcast"))
new_items = 0
for item in root.findall("channel/item"):
key = item_key(item)
already = state.setdefault("items", {})
if already.get(key):
continue
title = safe(item.findtext("title") or "Episode")
pub = parse_pubdate(item)
date_prefix = pub.strftime("%Y%m%d")
base = f"{date_prefix} - {title}"
audio_url = get_enclosure(item)
if not audio_url:
log.info(f"Skip (no enclosure): {title}")
already[key] = {"skipped": "no_enclosure"}
continue
# HEAD size guard (optional)
if AUDIO_MAX_MB > 0:
size = head_size(audio_url)
if size and size > AUDIO_MAX_MB * 1024 * 1024:
log.info(f"Skip (size>{AUDIO_MAX_MB}MB): {title}")
already[key] = {"skipped": "too_large", "size": size}
continue
path_ext = decide_audio_ext(audio_url)
audio_out = LIBRARY_ROOT / channel_title / f"{base}{path_ext}"
transcript_links = best_transcript_links(item)
# If audio exists and a transcript sidecar exists → just enqueue index
sidecars = list((TRANSCRIPT_ROOT / channel_title).glob(f"{base}.*"))
have_transcript = len(sidecars) > 0
if audio_out.exists() and have_transcript:
log.info(f"Skip download, enqueue index (have audio+transcript): {audio_out.name}")
try:
q.enqueue("worker.handle_local_file", str(audio_out), job_timeout=4*3600, result_ttl=86400, failure_ttl=86400)
except Exception as e:
log.warning(f"Enqueue failed: {e}")
already[key] = {"done": True, "audio": str(audio_out)}
new_items += 1
continue
# Download audio
try:
log.info(f"Downloading audio → {audio_out}")
content, _, _, _ = fetch(audio_url, as_text=False)
save_bytes(audio_out, content)
except Exception as e:
log.warning(f"Audio failed: {e}")
already[key] = {"error": f"audio:{e}"}
continue
# Download transcript if present (take first best)
transcript_out = None
for turl in transcript_links:
try:
ext = ".vtt" if "vtt" in turl.lower() else ".srt" if "srt" in turl.lower() else ".txt"
tout = TRANSCRIPT_ROOT / channel_title / f"{base}{ext}"
log.info(f"Downloading transcript → {tout} ({turl})")
tdata, _, _, _ = fetch(turl, as_text=False)
save_bytes(tout, tdata)
transcript_out = tout
break
except Exception as e:
log.warning(f"Transcript fetch failed ({turl}): {e}")
continue
# Enqueue for indexing/transcription
try:
q.enqueue("worker.handle_local_file", str(audio_out), job_timeout=4*3600, result_ttl=86400, failure_ttl=86400)
except Exception as e:
log.warning(f"Enqueue failed: {e}")
already[key] = {"done": True, "audio": str(audio_out), "transcript": str(transcript_out) if transcript_out else None}
new_items += 1
return new_items
# ----------------- main loop -----------------
def main():
while True:
feeds = load_feeds()
if not feeds:
log.error("No RSS feeds configured. Set RSS_FEEDS or create feeds.txt.")
sys.exit(1)
state = load_state()
total_new = 0
for url in feeds:
try:
added = ingest_feed(url, state)
total_new += added
save_state(state)
except Exception as e:
log.error(f"Feed error: {url} -> {e}")
log.info(f"Cycle complete. New items: {total_new}")
if RSS_ONCE:
break
time.sleep(RSS_SCAN_MINUTES * 60)
if __name__ == "__main__":
main()