import os, subprocess, shutil, json, re, orjson, requests from pathlib import Path from faster_whisper import WhisperModel MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") MEILI_KEY = os.getenv("MEILI_KEY", "") LIB = Path(os.getenv("LIBRARY_ROOT", "/library")) TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) TMP = Path(os.getenv("TMP_ROOT", "/tmpdl")) MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3") COMPUTE = os.getenv("WHISPER_PRECISION","int8") TRN.mkdir(parents=True, exist_ok=True) LIB.mkdir(parents=True, exist_ok=True) TMP.mkdir(parents=True, exist_ok=True) model = WhisperModel(MODEL_NAME, compute_type=COMPUTE) def log(feed): with open(TRN / "_feed.log", "a", encoding="utf-8") as f: import orjson as _oj f.write(_oj.dumps(feed).decode()+"\n") def sanitize(name): return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip() def yt_dlp(url, outdir): outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s") cmd = [ "yt-dlp", "-o", outtmpl, "-f", "bv*+ba/best", "-x", "--audio-format", "m4a", "--write-thumbnail", "--no-playlist", "--no-warnings", "--restrict-filenames", url ] subprocess.check_call(cmd) media = list(outdir.rglob("*.[mM][pP]4")) + list(outdir.rglob("*.mkv")) + list(outdir.rglob("*.m4a")) + list(outdir.rglob("*.mp3")) return sorted(media, key=lambda p: p.stat().st_mtime)[-1:] def transcribe(media_path: Path): segments, info = model.transcribe(str(media_path), vad_filter=True, language="auto") title = media_path.stem base = TRN / title segs = [] text_parts = [] for s in segments: segs.append({"start": s.start, "end": s.end, "text": s.text}) text_parts.append(s.text) txt = " ".join(text_parts).strip() import orjson as _oj open(base.with_suffix(".json"), "wb").write(_oj.dumps({"file": str(media_path), "language": info.language, "segments": segs})) open(base.with_suffix(".txt"), "w", encoding="utf-8").write(txt) def fmt_ts(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt: for i,s in enumerate(segs,1): srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n") with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: vtt.write("WEBVTT\n\n") for s in segs: vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n") return base def index_meili(json_path: Path): doc = json.loads(open(json_path, "r", encoding="utf-8").read()) title = Path(doc["file"]).stem date = re.findall(r"\b(\d{8})\b", title) payload = { "id": title, "type": "podcast", "title": title, "date": date[0] if date else "", "source": str(Path(LIB, Path(doc["file"]).name)), "text": " ".join(s["text"] for s in doc.get("segments", [])), "segments": doc.get("segments", []), "meta": {"language": doc.get("language", "")} } r = requests.post(f"{MEILI_URL}/indexes/library/documents", headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"}, data=__import__('orjson').dumps(payload)) r.raise_for_status() def handle_url(url: str): try: info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""} log({**info, **{"status":"downloading"}}) files = yt_dlp(url, TMP) for f in files: parts = f.relative_to(TMP).parts uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown" dest_dir = LIB / uploader dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / sanitize(f.name) shutil.move(str(f), dest) import re as _re info.update({"title": dest.stem, "uploader": uploader, "date": _re.findall(r"\b(\d{8})\b", dest.stem)[0] if _re.findall(r"\b(\d{8})\b", dest.stem) else "", "path": str(dest)}) log({**info, **{"status":"transcribing"}}) base = transcribe(dest) index_meili(base.with_suffix(".json")) log({**info, **{"status":"done"}}) except Exception as e: log({"url": url, "status":"error", "error": str(e)}) raise