Files
podx/app/scanner.py

152 lines
4.9 KiB
Python

import os
from typing import Set
import time
import signal
import sys
from pathlib import Path
from redis import Redis
from rq import Queue
# Config via env (matches docker-compose)
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
SCAN_INTERVAL = int(os.getenv("SCAN_INTERVAL", "30")) # seconds
# RQ job configuration
JOB_TIMEOUT = int(os.getenv("JOB_TIMEOUT", "14400")) # 4 hours
JOB_TTL = int(os.getenv("JOB_TTL", "86400")) # 24 hours
RESULT_TTL = int(os.getenv("RESULT_TTL", "86400")) # 24 hours
FAILURE_TTL = int(os.getenv("FAILURE_TTL", "86400")) # 24 hours
# Optional refresh of existing items to fetch metadata/subtitles/thumbnails
REFRESH_EXISTING = os.getenv("REFRESH_EXISTING", "1").strip() not in ("0", "false", "False", "")
REFRESH_TTL = int(os.getenv("REFRESH_TTL", "21600")) # 6 hours
REFRESH_FAILURE_TTL = int(os.getenv("REFRESH_FAILURE_TTL", "21600"))
# Media types to track
MEDIA_EXT = {
".mp3", ".m4a", ".mp4", ".mkv", ".wav", ".flac", ".webm", ".ogg", ".opus"
}
# In-memory seen set to avoid re-enqueueing during a single run
_seen: Set[str] = set()
_seen_refresh: Set[str] = set()
def already_transcribed(p: Path) -> bool:
"""Heuristic: if <basename>.json exists in transcripts, consider it done."""
base_json = TRN / f"{p.stem}.json"
return base_json.exists()
# Helper to decide when to refresh sidecars
def needs_refresh(p: Path) -> bool:
"""
Decide whether to refresh sidecars for a media file:
- If metadata (*.info.json) is missing
- If no subtitle SRT is present next to media (either .srt or .en.srt)
- If no thumbnail JPG/PNG is present next to media
"""
stem = p.with_suffix("")
info_json = stem.with_suffix(".info.json")
# Accept any language-suffixed SRT as well
srt_plain = stem.with_suffix(".srt")
srt_en = p.with_suffix(".en.srt")
has_any_srt = srt_plain.exists() or srt_en.exists() or any(p.parent.glob(p.stem + ".*.srt"))
thumb_jpg = stem.with_suffix(".jpg")
thumb_png = stem.with_suffix(".png")
missing_info = not info_json.exists()
missing_subs = not has_any_srt
missing_thumb = not (thumb_jpg.exists() or thumb_png.exists())
return missing_info or missing_subs or missing_thumb
def iter_media_files(root: Path):
for path in root.rglob("*"):
if not path.is_file():
continue
if path.suffix.lower() in MEDIA_EXT:
yield path
def enqueue_new_files():
q = Queue(connection=Redis.from_url(REDIS_URL))
# Ensure target dirs exist
TRN.mkdir(parents=True, exist_ok=True)
LIB.mkdir(parents=True, exist_ok=True)
new_jobs = 0
for p in iter_media_files(LIB):
key = str(p.resolve())
if key in _seen:
continue
if already_transcribed(p):
_seen.add(key)
if REFRESH_EXISTING and needs_refresh(p):
if key not in _seen_refresh:
# Ask worker to refresh metadata/subtitles/thumbnails without redownloading media
q.enqueue(
"worker.refresh_media",
key,
job_timeout=JOB_TIMEOUT,
ttl=REFRESH_TTL,
result_ttl=RESULT_TTL,
failure_ttl=REFRESH_FAILURE_TTL,
)
_seen_refresh.add(key)
print(f"[scanner] Refresh enqueued: {p}", flush=True)
else:
print(f"[scanner] Skip (already queued refresh): {p}", flush=True)
else:
print(f"[scanner] Skip (already transcribed): {p}", flush=True)
continue
# Enqueue the worker to process this local file (with generous timeouts)
q.enqueue(
"worker.handle_local_file",
key,
job_timeout=JOB_TIMEOUT,
ttl=JOB_TTL,
result_ttl=RESULT_TTL,
failure_ttl=FAILURE_TTL,
)
_seen.add(key)
new_jobs += 1
print(f"[scanner] Enqueued: {p}", flush=True)
return new_jobs
_shutdown = False
def _handle_sig(sig, frame):
global _shutdown
_shutdown = True
def main():
signal.signal(signal.SIGINT, _handle_sig)
signal.signal(signal.SIGTERM, _handle_sig)
print(f"[scanner] Watching {LIB} → transcripts in {TRN}; interval={SCAN_INTERVAL}s", flush=True)
while not _shutdown:
try:
jobs = enqueue_new_files()
if jobs:
print(f"[scanner] Enqueued {jobs} new file(s)", flush=True)
except Exception as e:
print(f"[scanner] Error: {e}", file=sys.stderr, flush=True)
# Sleep between passes
for _ in range(SCAN_INTERVAL):
if _shutdown:
break
time.sleep(1)
print("[scanner] Shutting down", flush=True)
if __name__ == "__main__":
main()