152 lines
4.9 KiB
Python
152 lines
4.9 KiB
Python
|
|
|
|
import os
|
|
from typing import Set
|
|
import time
|
|
import signal
|
|
import sys
|
|
from pathlib import Path
|
|
from redis import Redis
|
|
from rq import Queue
|
|
|
|
# Config via env (matches docker-compose)
|
|
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
|
|
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
|
|
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
|
|
SCAN_INTERVAL = int(os.getenv("SCAN_INTERVAL", "30")) # seconds
|
|
|
|
# RQ job configuration
|
|
JOB_TIMEOUT = int(os.getenv("JOB_TIMEOUT", "14400")) # 4 hours
|
|
JOB_TTL = int(os.getenv("JOB_TTL", "86400")) # 24 hours
|
|
RESULT_TTL = int(os.getenv("RESULT_TTL", "86400")) # 24 hours
|
|
FAILURE_TTL = int(os.getenv("FAILURE_TTL", "86400")) # 24 hours
|
|
|
|
# Optional refresh of existing items to fetch metadata/subtitles/thumbnails
|
|
REFRESH_EXISTING = os.getenv("REFRESH_EXISTING", "1").strip() not in ("0", "false", "False", "")
|
|
REFRESH_TTL = int(os.getenv("REFRESH_TTL", "21600")) # 6 hours
|
|
REFRESH_FAILURE_TTL = int(os.getenv("REFRESH_FAILURE_TTL", "21600"))
|
|
|
|
# Media types to track
|
|
MEDIA_EXT = {
|
|
".mp3", ".m4a", ".mp4", ".mkv", ".wav", ".flac", ".webm", ".ogg", ".opus"
|
|
}
|
|
|
|
# In-memory seen set to avoid re-enqueueing during a single run
|
|
_seen: Set[str] = set()
|
|
_seen_refresh: Set[str] = set()
|
|
|
|
|
|
def already_transcribed(p: Path) -> bool:
|
|
"""Heuristic: if <basename>.json exists in transcripts, consider it done."""
|
|
base_json = TRN / f"{p.stem}.json"
|
|
return base_json.exists()
|
|
|
|
|
|
# Helper to decide when to refresh sidecars
|
|
def needs_refresh(p: Path) -> bool:
|
|
"""
|
|
Decide whether to refresh sidecars for a media file:
|
|
- If metadata (*.info.json) is missing
|
|
- If no subtitle SRT is present next to media (either .srt or .en.srt)
|
|
- If no thumbnail JPG/PNG is present next to media
|
|
"""
|
|
stem = p.with_suffix("")
|
|
info_json = stem.with_suffix(".info.json")
|
|
# Accept any language-suffixed SRT as well
|
|
srt_plain = stem.with_suffix(".srt")
|
|
srt_en = p.with_suffix(".en.srt")
|
|
has_any_srt = srt_plain.exists() or srt_en.exists() or any(p.parent.glob(p.stem + ".*.srt"))
|
|
thumb_jpg = stem.with_suffix(".jpg")
|
|
thumb_png = stem.with_suffix(".png")
|
|
missing_info = not info_json.exists()
|
|
missing_subs = not has_any_srt
|
|
missing_thumb = not (thumb_jpg.exists() or thumb_png.exists())
|
|
return missing_info or missing_subs or missing_thumb
|
|
|
|
|
|
def iter_media_files(root: Path):
|
|
for path in root.rglob("*"):
|
|
if not path.is_file():
|
|
continue
|
|
if path.suffix.lower() in MEDIA_EXT:
|
|
yield path
|
|
|
|
|
|
def enqueue_new_files():
|
|
q = Queue(connection=Redis.from_url(REDIS_URL))
|
|
|
|
# Ensure target dirs exist
|
|
TRN.mkdir(parents=True, exist_ok=True)
|
|
LIB.mkdir(parents=True, exist_ok=True)
|
|
|
|
new_jobs = 0
|
|
for p in iter_media_files(LIB):
|
|
key = str(p.resolve())
|
|
if key in _seen:
|
|
continue
|
|
if already_transcribed(p):
|
|
_seen.add(key)
|
|
if REFRESH_EXISTING and needs_refresh(p):
|
|
if key not in _seen_refresh:
|
|
# Ask worker to refresh metadata/subtitles/thumbnails without redownloading media
|
|
q.enqueue(
|
|
"worker.refresh_media",
|
|
key,
|
|
job_timeout=JOB_TIMEOUT,
|
|
ttl=REFRESH_TTL,
|
|
result_ttl=RESULT_TTL,
|
|
failure_ttl=REFRESH_FAILURE_TTL,
|
|
)
|
|
_seen_refresh.add(key)
|
|
print(f"[scanner] Refresh enqueued: {p}", flush=True)
|
|
else:
|
|
print(f"[scanner] Skip (already queued refresh): {p}", flush=True)
|
|
else:
|
|
print(f"[scanner] Skip (already transcribed): {p}", flush=True)
|
|
continue
|
|
# Enqueue the worker to process this local file (with generous timeouts)
|
|
q.enqueue(
|
|
"worker.handle_local_file",
|
|
key,
|
|
job_timeout=JOB_TIMEOUT,
|
|
ttl=JOB_TTL,
|
|
result_ttl=RESULT_TTL,
|
|
failure_ttl=FAILURE_TTL,
|
|
)
|
|
_seen.add(key)
|
|
new_jobs += 1
|
|
print(f"[scanner] Enqueued: {p}", flush=True)
|
|
return new_jobs
|
|
|
|
|
|
_shutdown = False
|
|
|
|
|
|
def _handle_sig(sig, frame):
|
|
global _shutdown
|
|
_shutdown = True
|
|
|
|
|
|
def main():
|
|
signal.signal(signal.SIGINT, _handle_sig)
|
|
signal.signal(signal.SIGTERM, _handle_sig)
|
|
|
|
print(f"[scanner] Watching {LIB} → transcripts in {TRN}; interval={SCAN_INTERVAL}s", flush=True)
|
|
while not _shutdown:
|
|
try:
|
|
jobs = enqueue_new_files()
|
|
if jobs:
|
|
print(f"[scanner] Enqueued {jobs} new file(s)", flush=True)
|
|
except Exception as e:
|
|
print(f"[scanner] Error: {e}", file=sys.stderr, flush=True)
|
|
# Sleep between passes
|
|
for _ in range(SCAN_INTERVAL):
|
|
if _shutdown:
|
|
break
|
|
time.sleep(1)
|
|
|
|
print("[scanner] Shutting down", flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |