From e6582e9a6b710cd6375ca86748fbd25a435dce5e Mon Sep 17 00:00:00 2001 From: Tomas Kracmar Date: Sun, 7 Sep 2025 15:30:22 +0200 Subject: [PATCH] Add RSS feed downloading --- .gitignore | 3 + app/rss_ingest.py | 388 +++++++++++++++++++++++++++++++++++++++++++++ app/worker.py | 92 ++++++++++- docker-compose.yml | 35 +++- feeds.txt.example | 6 + 5 files changed, 522 insertions(+), 2 deletions(-) create mode 100644 app/rss_ingest.py create mode 100644 feeds.txt.example diff --git a/.gitignore b/.gitignore index 4dec629..c878265 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # Local env and secrets .env +# RSS Feeds +feeds.txt + # Runtime data data/ models/ diff --git a/app/rss_ingest.py b/app/rss_ingest.py new file mode 100644 index 0000000..20bcc1a --- /dev/null +++ b/app/rss_ingest.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python3 +""" +RSS ingester for PodX +- Reads feeds from env var RSS_FEEDS (comma-separated) *and*/or from a file (FEEDS_FILE, default /library/feeds.txt) +- Fetches RSS with ETag/Last-Modified caching to avoid re-downloading unchanged feeds +- Saves audio to LIBRARY_ROOT//. +- Saves transcript sidecars when `` links are present (prefers TextWithTimestamps → WebVTT → SRT → TXT) +- Enqueues `worker.handle_local_file` for indexing/transcription (worker will skip Whisper if a transcript sidecar exists) +- Keeps a small state JSON with per-feed ETag/Last-Modified and per-item processed GUIDs to avoid duplicate work + +Environment variables (with sane defaults): + MEILI_URL (unused directly here, but kept for parity) + REDIS_URL redis://redis:6379/0 + LIBRARY_ROOT /library + TRANSCRIPT_ROOT /transcripts + RSS_FEEDS "" (comma-separated list) + FEEDS_FILE /library/feeds.txt + RSS_SCAN_MINUTES 120 + RSS_ONCE 0 ("1" to run once and exit) + USER_AGENT podx-rss/1.0 (+local-archive) + RSS_STATE_FILE /library/.rss_state.json + RSS_CONNECT_TIMEOUT 15 (seconds) + RSS_READ_TIMEOUT 60 (seconds) + AUDIO_MAX_MB 4096 (skip larger-than if HEAD reveals size > max, 0 = unlimited) +""" +import os +import re +import sys +import json +import time +import logging +import itertools +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Tuple, Optional +from urllib.parse import urlparse + +import requests +import xml.etree.ElementTree as ET +import redis +from rq import Queue + +logging.basicConfig(level=logging.INFO, format='[rss] %(message)s') +log = logging.getLogger("rss") + +# Config +MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") +REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") +LIBRARY_ROOT = Path(os.getenv("LIBRARY_ROOT", "/library")) +TRANSCRIPT_ROOT = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) +RSS_FEEDS_ENV = [s.strip() for s in os.getenv("RSS_FEEDS", "").split(",") if s.strip()] +FEEDS_FILE = Path(os.getenv("FEEDS_FILE", str(LIBRARY_ROOT / "feeds.txt"))) +RSS_SCAN_MINUTES = int(os.getenv("RSS_SCAN_MINUTES", "120")) +RSS_ONCE = os.getenv("RSS_ONCE", "0") == "1" +USER_AGENT = os.getenv("USER_AGENT", "podx-rss/1.0 (+local-archive)") +STATE_FILE = Path(os.getenv("RSS_STATE_FILE", str(LIBRARY_ROOT / ".rss_state.json"))) +CONNECT_TIMEOUT = float(os.getenv("RSS_CONNECT_TIMEOUT", "15")) +READ_TIMEOUT = float(os.getenv("RSS_READ_TIMEOUT", "60")) +AUDIO_MAX_MB = int(os.getenv("AUDIO_MAX_MB", "4096")) + +# Redis queue +r = redis.from_url(REDIS_URL) +q = Queue("default", connection=r) + +# HTTP session +SESSION = requests.Session() +SESSION.headers.update({"User-Agent": USER_AGENT}) + +# Namespaces commonly used in podcast RSS +NS = { + "podcast": "https://podcastindex.org/namespace/1.0", + "itunes": "http://www.itunes.com/dtds/podcast-1.0.dtd", + "media": "http://search.yahoo.com/mrss/", + "content": "http://purl.org/rss/1.0/modules/content/", + "dc": "http://purl.org/dc/elements/1.1/", +} + +# ----------------- helpers ----------------- + +def safe(s: str) -> str: + s = re.sub(r"[\\/:*?\"<>|]", "_", s) + # collapse whitespace and trim + s = re.sub(r"\s+", " ", s).strip() + # guard against very long filenames + return s[:200] if len(s) > 200 else s + + +def load_state() -> Dict: + if STATE_FILE.exists(): + try: + return json.loads(STATE_FILE.read_text("utf-8")) + except Exception: + log.warning("State file unreadable, starting fresh") + return {"feeds": {}, "items": {}} # items keyed by GUID / enclosure URL + + +def save_state(state: Dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + tmp = STATE_FILE.with_suffix(STATE_FILE.suffix + ".tmp") + tmp.write_text(json.dumps(state, ensure_ascii=False, indent=2)) + tmp.replace(STATE_FILE) + + +def load_feeds() -> List[str]: + feeds = list(RSS_FEEDS_ENV) + if FEEDS_FILE.exists(): + try: + for line in FEEDS_FILE.read_text("utf-8").splitlines(): + s = line.strip() + if not s or s.startswith("#"): # allow comments + continue + feeds.append(s) + except Exception as e: + log.warning(f"Failed to read {FEEDS_FILE}: {e}") + # de-dup preserving order + seen = set() + uniq = [] + for f in feeds: + if f not in seen: + uniq.append(f) + seen.add(f) + return uniq + + +def fetch(url: str, *, etag: Optional[str]=None, modified: Optional[str]=None, as_text=False): + headers = {} + if etag: + headers["If-None-Match"] = etag + if modified: + headers["If-Modified-Since"] = modified + resp = SESSION.get(url, headers=headers, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT)) + if resp.status_code == 304: + return None, 304, None, None + resp.raise_for_status() + new_etag = resp.headers.get("ETag") + new_mod = resp.headers.get("Last-Modified") + return (resp.text if as_text else resp.content), resp.status_code, new_etag, new_mod + + +def head_size(url: str) -> Optional[int]: + try: + h = SESSION.head(url, allow_redirects=True, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT)) + if h.ok: + cl = h.headers.get("Content-Length") + return int(cl) if cl and cl.isdigit() else None + except Exception: + return None + return None + + +def best_transcript_links(item) -> List[str]: + links: List[Tuple[int, str, str]] = [] + # Try explicit QName first + for tag in item.findall(".//{https://podcastindex.org/namespace/1.0}transcript"): + t = (tag.attrib.get("type") or "").lower() + url = tag.attrib.get("url") + if url: + links.append((0, t, url)) + # Namespace-prefixed fallback + for tag in item.findall(".//podcast:transcript", NS): + t = (tag.attrib.get("type") or "").lower() + url = tag.attrib.get("url") + if url: + links.append((0, t, url)) + + order = [ + "text/plain", # often used for TextWithTimestamps + "application/json", + "text/vtt", + "application/srt", + "application/x-subrip", + "application/text", + "text/plain; charset=utf-8", + ] + key = {v: i for i, v in enumerate(order)} + ranked = [] + for _, t, url in links: + ranked.append((key.get(t, 999), t, url)) + ranked.sort() + return [u for _, _, u in ranked] + + +def get_enclosure(item) -> Optional[str]: + enc = item.find("enclosure") + if enc is not None and enc.attrib.get("url"): + return enc.attrib["url"] + mc = item.find("media:content", NS) + if mc is not None and mc.attrib.get("url"): + return mc.attrib["url"] + return None + + +def parse_pubdate(item) -> datetime: + # Try common fields + candidates = [ + item.findtext("pubDate"), + item.findtext("dc:date", namespaces=NS), + item.findtext("{http://purl.org/dc/elements/1.1/}date"), + ] + for pd in filter(None, candidates): + s = pd.strip() + # Try several common formats + for fmt in [ + "%a, %d %b %Y %H:%M:%S %z", + "%a, %d %b %Y %H:%M:%S", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M:%S", + ]: + try: + return datetime.strptime(s, fmt) + except Exception: + pass + return datetime.utcnow() + + +def save_bytes(path: Path, data: bytes) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".part") + tmp.write_bytes(data) + tmp.replace(path) + + +def decide_audio_ext(audio_url: str) -> str: + p = urlparse(audio_url) + low = p.path.lower() + if low.endswith(".m4a"): + return ".m4a" + if low.endswith(".mp3"): + return ".mp3" + if low.endswith(".ogg") or low.endswith(".oga"): + return ".ogg" + if low.endswith(".aac"): + return ".aac" + if low.endswith(".wav"): + return ".wav" + return ".mp3" + + +def item_key(item) -> str: + # Prefer GUID value, else enclosure URL, else title+date + guid = item.findtext("guid") + if guid: + return guid.strip() + enc = get_enclosure(item) + if enc: + return enc + title = item.findtext("title") or "Episode" + pub = parse_pubdate(item).strftime("%Y%m%d") + return f"{pub}:{title}" + + +# ----------------- core ingest ----------------- + +def ingest_feed(url: str, state: Dict) -> int: + fstate = state.setdefault("feeds", {}).setdefault(url, {}) + etag = fstate.get("etag") + mod = fstate.get("modified") + + log.info(f"Fetch RSS: {url}") + try: + data, status, new_etag, new_mod = fetch(url, etag=etag, modified=mod, as_text=True) + except Exception as e: + log.error(f"Fetch failed: {e}") + return 0 + + if status == 304: + log.info("Not modified") + return 0 + + if new_etag: + fstate["etag"] = new_etag + if new_mod: + fstate["modified"] = new_mod + + try: + root = ET.fromstring(data) + except Exception as e: + log.error(f"XML parse error: {e}") + return 0 + + channel_title = safe((root.findtext("channel/title") or "Podcast")) + + new_items = 0 + for item in root.findall("channel/item"): + key = item_key(item) + already = state.setdefault("items", {}) + if already.get(key): + continue + + title = safe(item.findtext("title") or "Episode") + pub = parse_pubdate(item) + date_prefix = pub.strftime("%Y%m%d") + base = f"{date_prefix} - {title}" + + audio_url = get_enclosure(item) + if not audio_url: + log.info(f"Skip (no enclosure): {title}") + already[key] = {"skipped": "no_enclosure"} + continue + + # HEAD size guard (optional) + if AUDIO_MAX_MB > 0: + size = head_size(audio_url) + if size and size > AUDIO_MAX_MB * 1024 * 1024: + log.info(f"Skip (size>{AUDIO_MAX_MB}MB): {title}") + already[key] = {"skipped": "too_large", "size": size} + continue + + path_ext = decide_audio_ext(audio_url) + audio_out = LIBRARY_ROOT / channel_title / f"{base}{path_ext}" + transcript_links = best_transcript_links(item) + + # If audio exists and a transcript sidecar exists → just enqueue index + sidecars = list((TRANSCRIPT_ROOT / channel_title).glob(f"{base}.*")) + have_transcript = len(sidecars) > 0 + if audio_out.exists() and have_transcript: + log.info(f"Skip download, enqueue index (have audio+transcript): {audio_out.name}") + try: + q.enqueue("worker.handle_local_file", str(audio_out), job_timeout=4*3600, result_ttl=86400, failure_ttl=86400) + except Exception as e: + log.warning(f"Enqueue failed: {e}") + already[key] = {"done": True, "audio": str(audio_out)} + new_items += 1 + continue + + # Download audio + try: + log.info(f"Downloading audio → {audio_out}") + content, _, _, _ = fetch(audio_url, as_text=False) + save_bytes(audio_out, content) + except Exception as e: + log.warning(f"Audio failed: {e}") + already[key] = {"error": f"audio:{e}"} + continue + + # Download transcript if present (take first best) + transcript_out = None + for turl in transcript_links: + try: + ext = ".vtt" if "vtt" in turl.lower() else ".srt" if "srt" in turl.lower() else ".txt" + tout = TRANSCRIPT_ROOT / channel_title / f"{base}{ext}" + log.info(f"Downloading transcript → {tout} ({turl})") + tdata, _, _, _ = fetch(turl, as_text=False) + save_bytes(tout, tdata) + transcript_out = tout + break + except Exception as e: + log.warning(f"Transcript fetch failed ({turl}): {e}") + continue + + # Enqueue for indexing/transcription + try: + q.enqueue("worker.handle_local_file", str(audio_out), job_timeout=4*3600, result_ttl=86400, failure_ttl=86400) + except Exception as e: + log.warning(f"Enqueue failed: {e}") + + already[key] = {"done": True, "audio": str(audio_out), "transcript": str(transcript_out) if transcript_out else None} + new_items += 1 + + return new_items + + +# ----------------- main loop ----------------- + +def main(): + while True: + feeds = load_feeds() + if not feeds: + log.error("No RSS feeds configured. Set RSS_FEEDS or create feeds.txt.") + sys.exit(1) + + state = load_state() + total_new = 0 + for url in feeds: + try: + added = ingest_feed(url, state) + total_new += added + save_state(state) + except Exception as e: + log.error(f"Feed error: {url} -> {e}") + log.info(f"Cycle complete. New items: {total_new}") + if RSS_ONCE: + break + time.sleep(RSS_SCAN_MINUTES * 60) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/app/worker.py b/app/worker.py index 488d4df..eecf0a5 100644 --- a/app/worker.py +++ b/app/worker.py @@ -39,6 +39,78 @@ def log(feed): def sanitize(name): return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip() +def find_sidecar_transcript(media_path: Path) -> Path | None: + """Return a .txt/.srt/.vtt transcript file sitting next to media, if any. + Tries common variants including language-suffixed SRT/VTT. + """ + candidates: list[Path] = [] + # exact same stem in same folder + for ext in [".txt", ".srt", ".vtt"]: + p = media_path.parent / (media_path.stem + ext) + if p.exists(): + candidates.append(p) + # language-suffixed near the media file (e.g., .en.srt) + for ext in [".srt", ".vtt"]: + p = media_path.with_suffix(f".en{ext}") + if p.exists() and p not in candidates: + candidates.append(p) + return candidates[0] if candidates else None + + +def transcript_text_from_file(path: Path) -> str: + """Extract plain text from .txt/.srt/.vtt by stripping timestamps and counters.""" + try: + raw = path.read_text(encoding="utf-8", errors="ignore") + except Exception: + raw = path.read_text(errors="ignore") + + if path.suffix.lower() == ".txt": + return raw.strip() + + # For SRT/VTT, drop timestamp lines, cue numbers and headers + lines: list[str] = [] + for line in raw.splitlines(): + ls = line.strip() + if not ls: + continue + if "-->" in ls: # timestamp line + continue + if ls.upper().startswith("WEBVTT"): + continue + if re.match(r"^\d+$", ls): # cue index + continue + lines.append(ls) + return " ".join(lines) + + +def ensure_sidecar_next_to_media(sidecar: Path, media_path: Path, lang: str = "en") -> None: + """Ensure an `.lang.srt` sits next to the media for Plex. Convert VTT→SRT if needed.""" + try: + if sidecar.suffix.lower() == ".srt": + dst = media_path.with_suffix(f".{lang}.srt") + shutil.copy2(sidecar, dst) + elif sidecar.suffix.lower() == ".vtt": + tmp_srt = sidecar.with_suffix(".srt") + subprocess.run(["ffmpeg", "-nostdin", "-y", "-i", str(sidecar), str(tmp_srt)], check=True) + dst = media_path.with_suffix(f".{lang}.srt") + shutil.move(str(tmp_srt), dst) + except Exception as e: + print(f"[post] sidecar copy/convert failed: {e}", flush=True) + + +def write_plain_transcript(media_path: Path, text: str, language: str = "en") -> Path: + """Write minimal transcript artifacts (.txt + .json) from plain text (no timestamps).""" + title = media_path.stem + base = TRN / title + base.parent.mkdir(parents=True, exist_ok=True) + (base.with_suffix(".txt")).write_text(text, encoding="utf-8") + (base.with_suffix(".json")).write_bytes(orjson.dumps({ + "file": str(media_path), + "language": language, + "segments": [{"start": 0.0, "end": 0.0, "text": text}] + })) + return base + def yt_dlp(url, outdir): # 1) Normalize YouTube Music URLs to standard YouTube yurl = url @@ -316,6 +388,7 @@ def publish_to_openwebui(paths): def handle_local_file(path_str: str): """Transcribe & index a local media file that already exists in /library. + If a sidecar .txt/.srt/.vtt exists, use it instead of running Whisper. Safe to call repeatedly; it skips if transcript JSON already exists. """ try: @@ -323,13 +396,30 @@ def handle_local_file(path_str: str): if not p.exists(): log({"url": path_str, "status": "error", "error": "file_not_found"}) return + title = p.stem base_json = TRN / f"{title}.json" if base_json.exists(): log({"url": path_str, "status": "skip", "reason": "already_transcribed"}) return - info = {"url": path_str, "status": "transcribing", "title": title, "uploader": p.parent.name, "date": "", "path": str(p), "progress": 0} + + info = {"url": path_str, "status": "transcribing", "title": title, + "uploader": p.parent.name, "date": "", "path": str(p), "progress": 0} log(info) + + # 1) Prefer an existing transcript sidecar if present + sidecar = find_sidecar_transcript(p) + if sidecar: + plain = transcript_text_from_file(sidecar) + lang = os.getenv("DEFAULT_TRANSCRIPT_LANG", "en").strip() or "en" + base = write_plain_transcript(p, plain, language=lang) + ensure_sidecar_next_to_media(sidecar, p, lang=lang) + index_meili(base.with_suffix(".json")) + publish_to_openwebui([base.with_suffix(".txt")]) + log({**info, **{"status": "done", "note": "used_existing_transcript"}}) + return + + # 2) Otherwise, run transcription base = transcribe(p) index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) diff --git a/docker-compose.yml b/docker-compose.yml index 402df5c..d017fee 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,6 +41,10 @@ services: WHISPER_MODEL: large-v3 WHISPER_PRECISION: int8 PYTHONPATH: /app + JOB_TIMEOUT: ${JOB_TIMEOUT:-14400} + JOB_TTL: ${JOB_TTL:-86400} + RESULT_TTL: ${RESULT_TTL:-86400} + FAILURE_TTL: ${FAILURE_TTL:-86400} volumes: - ${LIBRARY_HOST_DIR:-./library}:/library - ${TRANSCRIPTS_HOST_DIR:-./transcripts}:/transcripts @@ -86,7 +90,7 @@ services: # - COOKIE_FILE=/config/cookies.txt # Optional: yt-dlp options (JSON). Example enables Android client fallback # - YTDL_OPTIONS={"extractor_args":{"youtube":{"player_client":"android"}}} - - YTDL_OPTIONS={"extract_flat":"in_playlist","concurrent_fragment_downloads":1} + - YTDL_OPTIONS={"extractor_args":{"youtube":{"player_client":"android"}},"extract_flat":"in_playlist","concurrent_fragment_downloads":1} volumes: - ${LIBRARY_HOST_DIR:-./library}:/downloads # Optional cookies file on host → /config/cookies.txt inside container @@ -115,3 +119,32 @@ services: healthcheck: test: ["CMD-SHELL", "exit 0"] restart: unless-stopped + + podx-rss: + build: ./app + container_name: podx-rss + command: ["python", "rss_ingest.py"] + env_file: [.env] + environment: + MEILI_URL: http://meili:7700 + REDIS_URL: redis://redis:6379/0 + LIBRARY_ROOT: /library + TRANSCRIPT_ROOT: /transcripts + FEEDS_FILE: /library/feeds.txt + RSS_STATE_FILE: /library/.rss_state.json + RSS_SCAN_MINUTES: ${RSS_SCAN_MINUTES:-120} + RSS_CONNECT_TIMEOUT: ${RSS_CONNECT_TIMEOUT:-15} + RSS_READ_TIMEOUT: ${RSS_READ_TIMEOUT:-60} + AUDIO_MAX_MB: ${AUDIO_MAX_MB:-4096} + USER_AGENT: ${USER_AGENT:-podx-rss/1.0 (+local-archive)} + RSS_ONCE: ${RSS_ONCE:-0} + volumes: + - ${LIBRARY_HOST_DIR:-./library}:/library + - ${TRANSCRIPTS_HOST_DIR:-./transcripts}:/transcripts + depends_on: [redis] + healthcheck: + test: ["CMD-SHELL", "python - <<'PY'\nimport os,sys; p=os.getenv('FEEDS_FILE',''); sys.exit(0 if (p and os.path.exists(p)) else 1)\nPY"] + interval: 60s + timeout: 5s + retries: 3 + restart: unless-stopped diff --git a/feeds.txt.example b/feeds.txt.example new file mode 100644 index 0000000..6b187e8 --- /dev/null +++ b/feeds.txt.example @@ -0,0 +1,6 @@ +feeds.txt +--------- +# Apple Podcasts / Omny show +https://www.omnycontent.com/d/playlist/....../podcast.rss +# Another RSS +https://example.com/feed.xml \ No newline at end of file