105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
# resolver.py
|
|
from __future__ import annotations
|
|
import json, os, re, subprocess
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import Optional, Dict, Any, Tuple, List
|
|
|
|
try:
|
|
from rapidfuzz import fuzz, process
|
|
except Exception:
|
|
fuzz = None
|
|
process = None
|
|
|
|
def _norm(s: str) -> str:
|
|
s = s.lower()
|
|
s = re.sub(r"[\[\]\(\)\{\}|_]+", " ", s)
|
|
s = re.sub(r"[^0-9a-zá-žà-ÿ\u00C0-\u024F\s]+", " ", s) # keep latin accents, cz/diacritics
|
|
s = re.sub(r"\s+", " ", s).strip()
|
|
return s
|
|
|
|
def _title_from_filename(p: Path) -> str:
|
|
name = p.stem # drop extension
|
|
# common yt-dlp patterns like "YYYYMMDD - Title"
|
|
name = re.sub(r"^\d{8}\s*-\s*", "", name)
|
|
return name
|
|
|
|
def _ffprobe_duration_seconds(p: Path) -> Optional[int]:
|
|
try:
|
|
out = subprocess.check_output([
|
|
"ffprobe", "-v", "error", "-show_entries", "format=duration",
|
|
"-of", "default=nw=1:nk=1", str(p)
|
|
], stderr=subprocess.STDOUT, text=True).strip()
|
|
return int(float(out))
|
|
except Exception:
|
|
return None
|
|
|
|
def load_index(index_path: Path) -> List[Dict[str, Any]]:
|
|
if not index_path.exists():
|
|
return []
|
|
with index_path.open("r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
# expected per-item keys:
|
|
# title, pubdate_ts (int), duration_s (int or null),
|
|
# transcript_urls: {"srt": str|None, "vtt": str|None, "txt": str|None},
|
|
# audio_url, guid, feed_url
|
|
return data if isinstance(data, list) else []
|
|
|
|
def match_episode(
|
|
media_path: Path,
|
|
index_items: List[Dict[str, Any]],
|
|
duration_tolerance_s: int = 120,
|
|
min_ratio: int = 82,
|
|
date_window_days: int = 14,
|
|
) -> Optional[Dict[str, Any]]:
|
|
title_guess = _title_from_filename(media_path)
|
|
tnorm = _norm(title_guess)
|
|
if not tnorm:
|
|
return None
|
|
|
|
media_secs = _ffprobe_duration_seconds(media_path)
|
|
media_date = None
|
|
# try to parse upload date prefix in filename if present
|
|
m = re.search(r"(\d{8})", media_path.stem)
|
|
if m:
|
|
try:
|
|
media_date = datetime.strptime(m.group(1), "%Y%m%d").replace(tzinfo=timezone.utc)
|
|
except Exception:
|
|
media_date = None
|
|
|
|
candidates = []
|
|
for item in index_items:
|
|
item_title = _norm(item.get("title", ""))
|
|
if not item_title:
|
|
continue
|
|
ratio = (fuzz.token_sort_ratio(tnorm, item_title) if fuzz else (100 if tnorm == item_title else 0))
|
|
if ratio < min_ratio:
|
|
continue
|
|
|
|
# duration filter (if both known)
|
|
ok_duration = True
|
|
if media_secs and item.get("duration_s"):
|
|
ok_duration = abs(media_secs - int(item["duration_s"])) <= duration_tolerance_s
|
|
|
|
# date window (if both known)
|
|
ok_date = True
|
|
if media_date and item.get("pubdate_ts"):
|
|
dt_item = datetime.fromtimestamp(int(item["pubdate_ts"]), tz=timezone.utc)
|
|
delta_days = abs((media_date - dt_item).days)
|
|
ok_date = delta_days <= date_window_days
|
|
|
|
if ok_duration and ok_date:
|
|
candidates.append((ratio, item))
|
|
|
|
if not candidates:
|
|
return None
|
|
candidates.sort(key=lambda x: x[0], reverse=True)
|
|
return candidates[0][1]
|
|
|
|
def choose_transcript_url(item: Dict[str, Any]) -> Optional[Tuple[str, str]]:
|
|
urls = item.get("transcript_urls") or {}
|
|
# prefer text/plain, then VTT, then SRT:
|
|
if urls.get("txt"): return (urls["txt"], "txt")
|
|
if urls.get("vtt"): return (urls["vtt"], "vtt")
|
|
if urls.get("srt"): return (urls["srt"], "srt")
|
|
return None |