import os, subprocess, shutil, json, re, orjson, requests from pathlib import Path import math from faster_whisper import WhisperModel MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") MEILI_KEY = os.getenv("MEILI_KEY", "") LIB = Path(os.getenv("LIBRARY_ROOT", "/library")) TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) TMP = Path(os.getenv("TMP_ROOT", "/tmpdl")) MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3") COMPUTE = os.getenv("WHISPER_PRECISION","int8") WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip() OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/") OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "") OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library") TRN.mkdir(parents=True, exist_ok=True) LIB.mkdir(parents=True, exist_ok=True) TMP.mkdir(parents=True, exist_ok=True) # Lazy Whisper model loader so the worker can start even if model download/setup is slow _model = None def get_model(): global _model if _model is None: _model = WhisperModel(MODEL_NAME, compute_type=COMPUTE) return _model def log(feed): try: with open(TRN / "_feed.log", "a", encoding="utf-8") as f: f.write(orjson.dumps(feed).decode()+"\n") except Exception: pass def sanitize(name): return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip() def yt_dlp(url, outdir): # 1) Normalize YouTube Music URLs to standard YouTube yurl = url if 'music.youtube.com' in yurl: yurl = yurl.replace('music.youtube.com', 'www.youtube.com') outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s") base_cmd = [ "yt-dlp", "-o", outtmpl, "-f", "bv*+ba/best", "-x", "--audio-format", "m4a", "--write-thumbnail", "--no-playlist", "--no-warnings", "--restrict-filenames", ] # 3) Optional cookies (set YTDLP_COOKIES=/path/to/cookies.txt in .env and mount it) cookies_path = os.getenv("YTDLP_COOKIES", "").strip() if cookies_path: base_cmd += ["--cookies", cookies_path] # Primary attempt try: subprocess.check_call(base_cmd + [yurl]) except subprocess.CalledProcessError: # 2) Retry with Android client + mobile UA retry_cmd = base_cmd + [ "--extractor-args", "youtube:player_client=android", "--user-agent", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Mobile Safari/537.36", yurl, ] subprocess.check_call(retry_cmd) media = (list(outdir.rglob("*.[mM][pP]4")) + list(outdir.rglob("*.mkv")) + list(outdir.rglob("*.m4a")) + list(outdir.rglob("*.mp3"))) return sorted(media, key=lambda p: p.stat().st_mtime)[-1:] def extract_audio(src: Path, outdir: Path) -> Path: """Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs).""" outdir.mkdir(parents=True, exist_ok=True) wav_path = outdir / (src.stem + ".wav") # Force audio-only, mono, 16kHz WAV cmd = [ "ffmpeg", "-nostdin", "-y", "-i", str(src), "-vn", "-ac", "1", "-ar", "16000", "-f", "wav", str(wav_path), ] try: subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as e: raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}") return wav_path def media_duration_seconds(path: Path) -> float: """Return duration in seconds using ffprobe; fallback to 0.0 on error.""" try: out = subprocess.check_output([ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=nokey=1:noprint_wrappers=1", str(path) ], stderr=subprocess.STDOUT, text=True).strip() return float(out) if out else 0.0 except Exception: return 0.0 def transcribe(media_path: Path): model = get_model() # 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases) wav = extract_audio(media_path, TMP) # 2) Language lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE # 3) Transcribe segments, info = model.transcribe(str(wav), vad_filter=True, language=lang) title = media_path.stem base = TRN / title # Determine duration for progress; use extracted WAV (accurate for transcription input) dur = media_duration_seconds(wav) or 0.0 last_pct = -1 segs, text_parts = [], [] for s in segments: seg = {"start": s.start, "end": s.end, "text": s.text} segs.append(seg) text_parts.append(s.text) # progress logging every +5% if dur > 0 and s.end is not None: pct = int(min(100, max(0, (s.end / dur) * 100))) if pct >= last_pct + 5: log({ "status": "transcribing", "path": str(media_path), "title": title, "progress": pct }) last_pct = pct # ensure we mark 100% on completion if last_pct < 100: log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100}) txt = " ".join(text_parts).strip() # Write transcript artifacts open(base.with_suffix(".json"), "wb").write(orjson.dumps({ "file": str(media_path), "language": info.language, "segments": segs })) open(base.with_suffix(".txt"), "w", encoding="utf-8").write(txt) def fmt_ts(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt: for i,s in enumerate(segs,1): srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n") with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: vtt.write("WEBVTT\n\n") for s in segs: vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n") # 4) Copy SRT next to media for Plex (language-suffixed) try: lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower() srt_src = base.with_suffix(".srt") srt_dst = media_path.with_suffix(f".{lang_code}.srt") shutil.copy2(srt_src, srt_dst) except Exception as e: print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True) # Optional: cleanup temporary WAV try: if wav.exists(): wav.unlink() except Exception: pass return base def index_meili(json_path: Path): doc = json.loads(open(json_path, "r", encoding="utf-8").read()) title = Path(doc["file"]).stem date = re.findall(r"\b(\d{8})\b", title) payload = { "id": title, "type": "podcast", "title": title, "date": date[0] if date else "", "source": str(Path(LIB, Path(doc["file"]).name)), "text": " ".join(s["text"] for s in doc.get("segments", [])), "segments": doc.get("segments", []), "meta": {"language": doc.get("language", "")} } import time for attempt in range(5): try: r = requests.post( f"{MEILI_URL}/indexes/library/documents", headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"}, data=orjson.dumps(payload), timeout=15, ) r.raise_for_status() break except Exception: if attempt == 4: raise time.sleep(2 * (attempt + 1)) import tldextract, trafilatura, requests as _requests def slugify(text): text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_') return text[:120] or 'page' def save_web_snapshot(url: str): r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"}) r.raise_for_status() html = r.text downloaded = trafilatura.load_html(html, url=url) text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or "" meta = trafilatura.metadata.extract_metadata(downloaded) or None title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r']*>(.*?)', html, re.I|re.S).group(1).strip() if re.search(r']*>(.*?)', html, re.I|re.S) else url) date = (meta.date if meta and getattr(meta, 'date', None) else "") parts = tldextract.extract(url) domain = ".".join([p for p in [parts.domain, parts.suffix] if p]) slug = slugify(title) outdir = LIB / "web" / domain outdir.mkdir(parents=True, exist_ok=True) base = outdir / slug open(base.with_suffix(".html"), "w", encoding="utf-8", errors="ignore").write(html) open(base.with_suffix(".txt"), "w", encoding="utf-8", errors="ignore").write(text) return base, title, domain, date, text def index_web(base: Path, title: str, domain: str, date: str, text: str, url: str): payload = { "id": f"web:{domain}:{base.stem}", "type": "web", "title": title, "date": re.sub(r'[^0-9]', '', date)[:8] if date else "", "source": f"file://{str(base.with_suffix('.html'))}", "text": text, "segments": [], "meta": {"url": url, "domain": domain} } r = requests.post(f"{MEILI_URL}/indexes/library/documents", headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"}, data=orjson.dumps(payload)) r.raise_for_status() def is_media_url(url: str): lowered = url.lower() media_hosts = ["youtube.com","youtu.be","rumble.com","vimeo.com","soundcloud.com","spotify.com","podbean.com","buzzsprout.com"] return any(h in lowered for h in media_hosts) def owui_headers(): return {"Authorization": f"Bearer {OWUI_KEY}"} if OWUI_KEY else {} def owui_get_or_create_kb(): if not OWUI_URL or not OWUI_KEY: return None try: r = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15) r.raise_for_status() for kb in r.json().get("data", []): if kb.get("name") == OWUI_KB: return kb["id"] except Exception: pass r = requests.post( f"{OWUI_URL}/api/v1/knowledge/create", headers={**owui_headers(), "Content-Type": "application/json"}, data=orjson.dumps({"name": OWUI_KB, "description": "All local content indexed by podx"}), timeout=15, ) r.raise_for_status() return r.json()["data"]["id"] def owui_upload_and_attach(path: Path, kb_id: str): with open(path, "rb") as f: r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10) r.raise_for_status() file_id = r.json()["data"]["id"] r = requests.post( f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add", headers={**owui_headers(), "Content-Type": "application/json"}, data=orjson.dumps({"file_id": file_id}), timeout=60, ) r.raise_for_status() return True def publish_to_openwebui(paths): if not OWUI_URL or not OWUI_KEY: return try: kb_id = owui_get_or_create_kb() for p in paths: p = Path(p) if not p.exists(): continue try: owui_upload_and_attach(p, kb_id) except Exception as e: log({"url": str(p), "status": "owui_error", "error": str(e)}) except Exception as e: log({"status": "owui_error", "error": str(e)}) def handle_local_file(path_str: str): """Transcribe & index a local media file that already exists in /library. Safe to call repeatedly; it skips if transcript JSON already exists. """ try: p = Path(path_str) if not p.exists(): log({"url": path_str, "status": "error", "error": "file_not_found"}) return title = p.stem base_json = TRN / f"{title}.json" if base_json.exists(): log({"url": path_str, "status": "skip", "reason": "already_transcribed"}) return info = {"url": path_str, "status": "transcribing", "title": title, "uploader": p.parent.name, "date": "", "path": str(p), "progress": 0} log(info) base = transcribe(p) index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) log({**info, **{"status": "done"}}) except Exception as e: log({"url": path_str, "status": "error", "error": str(e)}) raise def handle_web(url: str): info = {"url": url, "status":"web-downloading", "title":"", "uploader":"", "date":"", "path":""} log(info) base, title, domain, date, text = save_web_snapshot(url) info.update({"title": title, "uploader": domain, "date": date, "path": str(base.with_suffix('.html'))}) log({**info, **{"status":"web-indexing"}}) index_web(base, title, domain, date, text, url) push = [p for p in [base.with_suffix('.txt'), base.with_suffix('.html')] if p.exists()] publish_to_openwebui(push) log({**info, **{"status":"done"}}) def handle_url(url: str): try: # If a local file path (or file:// URL) is provided, process it directly if url.startswith("file://"): return handle_local_file(url[7:]) if url.startswith("/") and Path(url).exists(): return handle_local_file(url) if not is_media_url(url): handle_web(url) return info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""} log({**info, **{"status":"downloading"}}) files = yt_dlp(url, TMP) for f in files: parts = f.relative_to(TMP).parts uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown" dest_dir = LIB / uploader dest_dir.mkdir(parents=True, exist_ok=True) dest = dest_dir / sanitize(f.name) shutil.move(str(f), dest) info.update({"title": dest.stem, "uploader": uploader, "date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""), "path": str(dest)}) log({**info, **{"status":"transcribing", "progress": 0}}) base = transcribe(dest) index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) log({**info, **{"status":"done"}}) except Exception as e: log({"url": url, "status":"error", "error": str(e)}) raise