384 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			384 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os, subprocess, shutil, json, re, orjson, requests
 | |
| from pathlib import Path
 | |
| import math
 | |
| from faster_whisper import WhisperModel
 | |
| 
 | |
| MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700")
 | |
| MEILI_KEY = os.getenv("MEILI_KEY", "")
 | |
| LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
 | |
| TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
 | |
| TMP = Path(os.getenv("TMP_ROOT", "/tmpdl"))
 | |
| MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3")
 | |
| COMPUTE = os.getenv("WHISPER_PRECISION","int8")
 | |
| WHISPER_LANGUAGE = os.getenv("WHISPER_LANGUAGE", "auto").strip()
 | |
| 
 | |
| OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/")
 | |
| OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "")
 | |
| OWUI_KB  = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library")
 | |
| 
 | |
| TRN.mkdir(parents=True, exist_ok=True)
 | |
| LIB.mkdir(parents=True, exist_ok=True)
 | |
| TMP.mkdir(parents=True, exist_ok=True)
 | |
| 
 | |
| # Lazy Whisper model loader so the worker can start even if model download/setup is slow
 | |
| _model = None
 | |
| 
 | |
| def get_model():
 | |
|     global _model
 | |
|     if _model is None:
 | |
|         _model = WhisperModel(MODEL_NAME, compute_type=COMPUTE)
 | |
|     return _model
 | |
| 
 | |
| def log(feed):
 | |
|     try:
 | |
|         with open(TRN / "_feed.log", "a", encoding="utf-8") as f:
 | |
|             f.write(orjson.dumps(feed).decode()+"\n")
 | |
|     except Exception:
 | |
|         pass
 | |
| 
 | |
| def sanitize(name):
 | |
|     return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip()
 | |
| 
 | |
| def yt_dlp(url, outdir):
 | |
|     # 1) Normalize YouTube Music URLs to standard YouTube
 | |
|     yurl = url
 | |
|     if 'music.youtube.com' in yurl:
 | |
|         yurl = yurl.replace('music.youtube.com', 'www.youtube.com')
 | |
| 
 | |
|     outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s")
 | |
| 
 | |
|     base_cmd = [
 | |
|         "yt-dlp", "-o", outtmpl,
 | |
|         "-f", "bv*+ba/best",
 | |
|         "-x", "--audio-format", "m4a",
 | |
|         "--write-thumbnail",
 | |
|         "--no-playlist", "--no-warnings", "--restrict-filenames",
 | |
|     ]
 | |
| 
 | |
|     # 3) Optional cookies (set YTDLP_COOKIES=/path/to/cookies.txt in .env and mount it)
 | |
|     cookies_path = os.getenv("YTDLP_COOKIES", "").strip()
 | |
|     if cookies_path:
 | |
|         base_cmd += ["--cookies", cookies_path]
 | |
| 
 | |
|     # Primary attempt
 | |
|     try:
 | |
|         subprocess.check_call(base_cmd + [yurl])
 | |
|     except subprocess.CalledProcessError:
 | |
|         # 2) Retry with Android client + mobile UA
 | |
|         retry_cmd = base_cmd + [
 | |
|             "--extractor-args", "youtube:player_client=android",
 | |
|             "--user-agent", "Mozilla/5.0 (Linux; Android 10; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0 Mobile Safari/537.36",
 | |
|             yurl,
 | |
|         ]
 | |
|         subprocess.check_call(retry_cmd)
 | |
| 
 | |
|     media = (list(outdir.rglob("*.[mM][pP]4")) +
 | |
|              list(outdir.rglob("*.mkv")) +
 | |
|              list(outdir.rglob("*.m4a")) +
 | |
|              list(outdir.rglob("*.mp3")))
 | |
|     return sorted(media, key=lambda p: p.stat().st_mtime)[-1:]
 | |
| 
 | |
| def extract_audio(src: Path, outdir: Path) -> Path:
 | |
|     """Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs)."""
 | |
|     outdir.mkdir(parents=True, exist_ok=True)
 | |
|     wav_path = outdir / (src.stem + ".wav")
 | |
|     # Force audio-only, mono, 16kHz WAV
 | |
|     cmd = [
 | |
|         "ffmpeg", "-nostdin", "-y",
 | |
|         "-i", str(src),
 | |
|         "-vn", "-ac", "1", "-ar", "16000",
 | |
|         "-f", "wav", str(wav_path),
 | |
|     ]
 | |
|     try:
 | |
|         subprocess.check_output(cmd, stderr=subprocess.STDOUT)
 | |
|     except subprocess.CalledProcessError as e:
 | |
|         raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}")
 | |
|     return wav_path
 | |
| 
 | |
| def media_duration_seconds(path: Path) -> float:
 | |
|     """Return duration in seconds using ffprobe; fallback to 0.0 on error."""
 | |
|     try:
 | |
|         out = subprocess.check_output([
 | |
|             "ffprobe", "-v", "error", "-show_entries", "format=duration",
 | |
|             "-of", "default=nokey=1:noprint_wrappers=1", str(path)
 | |
|         ], stderr=subprocess.STDOUT, text=True).strip()
 | |
|         return float(out) if out else 0.0
 | |
|     except Exception:
 | |
|         return 0.0
 | |
| 
 | |
| def transcribe(media_path: Path):
 | |
|     model = get_model()
 | |
|     # 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases)
 | |
|     wav = extract_audio(media_path, TMP)
 | |
| 
 | |
|     # 2) Language
 | |
|     lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE
 | |
| 
 | |
|     # 3) Transcribe
 | |
|     segments, info = model.transcribe(str(wav), vad_filter=True, language=lang)
 | |
| 
 | |
|     title = media_path.stem
 | |
|     base = TRN / title
 | |
| 
 | |
|     # Determine duration for progress; use extracted WAV (accurate for transcription input)
 | |
|     dur = media_duration_seconds(wav) or 0.0
 | |
|     last_pct = -1
 | |
| 
 | |
|     segs, text_parts = [], []
 | |
|     for s in segments:
 | |
|         seg = {"start": s.start, "end": s.end, "text": s.text}
 | |
|         segs.append(seg)
 | |
|         text_parts.append(s.text)
 | |
|         # progress logging every +5%
 | |
|         if dur > 0 and s.end is not None:
 | |
|             pct = int(min(100, max(0, (s.end / dur) * 100)))
 | |
|             if pct >= last_pct + 5:
 | |
|                 log({
 | |
|                     "status": "transcribing",
 | |
|                     "path": str(media_path),
 | |
|                     "title": title,
 | |
|                     "progress": pct
 | |
|                 })
 | |
|                 last_pct = pct
 | |
|     # ensure we mark 100% on completion
 | |
|     if last_pct < 100:
 | |
|         log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100})
 | |
| 
 | |
|     txt = " ".join(text_parts).strip()
 | |
| 
 | |
|     # Write transcript artifacts
 | |
|     open(base.with_suffix(".json"), "wb").write(orjson.dumps({
 | |
|         "file": str(media_path),
 | |
|         "language": info.language,
 | |
|         "segments": segs
 | |
|     }))
 | |
|     open(base.with_suffix(".txt"), "w", encoding="utf-8").write(txt)
 | |
| 
 | |
|     def fmt_ts(t):
 | |
|         h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
 | |
|         return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
 | |
| 
 | |
|     with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
 | |
|         for i,s in enumerate(segs,1):
 | |
|             srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n")
 | |
| 
 | |
|     with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
 | |
|         vtt.write("WEBVTT\n\n")
 | |
|         for s in segs:
 | |
|             vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n")
 | |
| 
 | |
|     # 4) Copy SRT next to media for Plex (language-suffixed)
 | |
|     try:
 | |
|         lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower()
 | |
|         srt_src = base.with_suffix(".srt")
 | |
|         srt_dst = media_path.with_suffix(f".{lang_code}.srt")
 | |
|         shutil.copy2(srt_src, srt_dst)
 | |
|     except Exception as e:
 | |
|         print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True)
 | |
| 
 | |
|     # Optional: cleanup temporary WAV
 | |
|     try:
 | |
|         if wav.exists():
 | |
|             wav.unlink()
 | |
|     except Exception:
 | |
|         pass
 | |
| 
 | |
|     return base
 | |
| 
 | |
| def index_meili(json_path: Path):
 | |
|     doc = json.loads(open(json_path, "r", encoding="utf-8").read())
 | |
|     title = Path(doc["file"]).stem
 | |
|     date = re.findall(r"\b(\d{8})\b", title)
 | |
|     payload = {
 | |
|         "id": title,
 | |
|         "type": "podcast",
 | |
|         "title": title,
 | |
|         "date": date[0] if date else "",
 | |
|         "source": str(Path(LIB, Path(doc["file"]).name)),
 | |
|         "text": " ".join(s["text"] for s in doc.get("segments", [])),
 | |
|         "segments": doc.get("segments", []),
 | |
|         "meta": {"language": doc.get("language", "")}
 | |
|     }
 | |
|     import time
 | |
|     for attempt in range(5):
 | |
|         try:
 | |
|             r = requests.post(
 | |
|                 f"{MEILI_URL}/indexes/library/documents",
 | |
|                 headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"},
 | |
|                 data=orjson.dumps(payload),
 | |
|                 timeout=15,
 | |
|             )
 | |
|             r.raise_for_status()
 | |
|             break
 | |
|         except Exception:
 | |
|             if attempt == 4:
 | |
|                 raise
 | |
|             time.sleep(2 * (attempt + 1))
 | |
| 
 | |
| import tldextract, trafilatura, requests as _requests
 | |
| 
 | |
| def slugify(text):
 | |
|     text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_')
 | |
|     return text[:120] or 'page'
 | |
| 
 | |
| def save_web_snapshot(url: str):
 | |
|     r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"})
 | |
|     r.raise_for_status()
 | |
|     html = r.text
 | |
|     downloaded = trafilatura.load_html(html, url=url)
 | |
|     text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or ""
 | |
|     meta = trafilatura.metadata.extract_metadata(downloaded) or None
 | |
|     title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S).group(1).strip() if re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S) else url)
 | |
|     date = (meta.date if meta and getattr(meta, 'date', None) else "")
 | |
|     parts = tldextract.extract(url)
 | |
|     domain = ".".join([p for p in [parts.domain, parts.suffix] if p])
 | |
|     slug = slugify(title)
 | |
|     outdir = LIB / "web" / domain
 | |
|     outdir.mkdir(parents=True, exist_ok=True)
 | |
|     base = outdir / slug
 | |
|     open(base.with_suffix(".html"), "w", encoding="utf-8", errors="ignore").write(html)
 | |
|     open(base.with_suffix(".txt"), "w", encoding="utf-8", errors="ignore").write(text)
 | |
|     return base, title, domain, date, text
 | |
| 
 | |
| def index_web(base: Path, title: str, domain: str, date: str, text: str, url: str):
 | |
|     payload = {
 | |
|         "id": f"web:{domain}:{base.stem}",
 | |
|         "type": "web",
 | |
|         "title": title,
 | |
|         "date": re.sub(r'[^0-9]', '', date)[:8] if date else "",
 | |
|         "source": f"file://{str(base.with_suffix('.html'))}",
 | |
|         "text": text,
 | |
|         "segments": [],
 | |
|         "meta": {"url": url, "domain": domain}
 | |
|     }
 | |
|     r = requests.post(f"{MEILI_URL}/indexes/library/documents",
 | |
|                       headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"},
 | |
|                       data=orjson.dumps(payload))
 | |
|     r.raise_for_status()
 | |
| 
 | |
| def is_media_url(url: str):
 | |
|     lowered = url.lower()
 | |
|     media_hosts = ["youtube.com","youtu.be","rumble.com","vimeo.com","soundcloud.com","spotify.com","podbean.com","buzzsprout.com"]
 | |
|     return any(h in lowered for h in media_hosts)
 | |
| 
 | |
| def owui_headers():
 | |
|     return {"Authorization": f"Bearer {OWUI_KEY}"} if OWUI_KEY else {}
 | |
| 
 | |
| def owui_get_or_create_kb():
 | |
|     if not OWUI_URL or not OWUI_KEY:
 | |
|         return None
 | |
|     try:
 | |
|         r = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
 | |
|         r.raise_for_status()
 | |
|         for kb in r.json().get("data", []):
 | |
|             if kb.get("name") == OWUI_KB:
 | |
|                 return kb["id"]
 | |
|     except Exception:
 | |
|         pass
 | |
|     r = requests.post(
 | |
|         f"{OWUI_URL}/api/v1/knowledge/create",
 | |
|         headers={**owui_headers(), "Content-Type": "application/json"},
 | |
|         data=orjson.dumps({"name": OWUI_KB, "description": "All local content indexed by podx"}),
 | |
|         timeout=15,
 | |
|     )
 | |
|     r.raise_for_status()
 | |
|     return r.json()["data"]["id"]
 | |
| 
 | |
| def owui_upload_and_attach(path: Path, kb_id: str):
 | |
|     with open(path, "rb") as f:
 | |
|         r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10)
 | |
|     r.raise_for_status()
 | |
|     file_id = r.json()["data"]["id"]
 | |
|     r = requests.post(
 | |
|         f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
 | |
|         headers={**owui_headers(), "Content-Type": "application/json"},
 | |
|         data=orjson.dumps({"file_id": file_id}),
 | |
|         timeout=60,
 | |
|     )
 | |
|     r.raise_for_status()
 | |
|     return True
 | |
| 
 | |
| def publish_to_openwebui(paths):
 | |
|     if not OWUI_URL or not OWUI_KEY:
 | |
|         return
 | |
|     try:
 | |
|         kb_id = owui_get_or_create_kb()
 | |
|         for p in paths:
 | |
|             p = Path(p)
 | |
|             if not p.exists():
 | |
|                 continue
 | |
|             try:
 | |
|                 owui_upload_and_attach(p, kb_id)
 | |
|             except Exception as e:
 | |
|                 log({"url": str(p), "status": "owui_error", "error": str(e)})
 | |
|     except Exception as e:
 | |
|         log({"status": "owui_error", "error": str(e)})
 | |
| 
 | |
| def handle_local_file(path_str: str):
 | |
|     """Transcribe & index a local media file that already exists in /library.
 | |
|     Safe to call repeatedly; it skips if transcript JSON already exists.
 | |
|     """
 | |
|     try:
 | |
|         p = Path(path_str)
 | |
|         if not p.exists():
 | |
|             log({"url": path_str, "status": "error", "error": "file_not_found"})
 | |
|             return
 | |
|         title = p.stem
 | |
|         base_json = TRN / f"{title}.json"
 | |
|         if base_json.exists():
 | |
|             log({"url": path_str, "status": "skip", "reason": "already_transcribed"})
 | |
|             return
 | |
|         info = {"url": path_str, "status": "transcribing", "title": title, "uploader": p.parent.name, "date": "", "path": str(p), "progress": 0}
 | |
|         log(info)
 | |
|         base = transcribe(p)
 | |
|         index_meili(base.with_suffix(".json"))
 | |
|         publish_to_openwebui([base.with_suffix(".txt")])
 | |
|         log({**info, **{"status": "done"}})
 | |
|     except Exception as e:
 | |
|         log({"url": path_str, "status": "error", "error": str(e)})
 | |
|         raise
 | |
| 
 | |
| def handle_web(url: str):
 | |
|     info = {"url": url, "status":"web-downloading", "title":"", "uploader":"", "date":"", "path":""}
 | |
|     log(info)
 | |
|     base, title, domain, date, text = save_web_snapshot(url)
 | |
|     info.update({"title": title, "uploader": domain, "date": date, "path": str(base.with_suffix('.html'))})
 | |
|     log({**info, **{"status":"web-indexing"}})
 | |
|     index_web(base, title, domain, date, text, url)
 | |
|     push = [p for p in [base.with_suffix('.txt'), base.with_suffix('.html')] if p.exists()]
 | |
|     publish_to_openwebui(push)
 | |
|     log({**info, **{"status":"done"}})
 | |
| 
 | |
| def handle_url(url: str):
 | |
|     try:
 | |
|         # If a local file path (or file:// URL) is provided, process it directly
 | |
|         if url.startswith("file://"):
 | |
|             return handle_local_file(url[7:])
 | |
|         if url.startswith("/") and Path(url).exists():
 | |
|             return handle_local_file(url)
 | |
| 
 | |
|         if not is_media_url(url):
 | |
|             handle_web(url)
 | |
|             return
 | |
|         info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""}
 | |
|         log({**info, **{"status":"downloading"}})
 | |
|         files = yt_dlp(url, TMP)
 | |
|         for f in files:
 | |
|             parts = f.relative_to(TMP).parts
 | |
|             uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown"
 | |
|             dest_dir = LIB / uploader
 | |
|             dest_dir.mkdir(parents=True, exist_ok=True)
 | |
|             dest = dest_dir / sanitize(f.name)
 | |
|             shutil.move(str(f), dest)
 | |
|             info.update({"title": dest.stem, "uploader": uploader,
 | |
|                          "date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""),
 | |
|                          "path": str(dest)})
 | |
|             log({**info, **{"status":"transcribing", "progress": 0}})
 | |
|             base = transcribe(dest)
 | |
|             index_meili(base.with_suffix(".json"))
 | |
|             publish_to_openwebui([base.with_suffix(".txt")])
 | |
|             log({**info, **{"status":"done"}})
 | |
|     except Exception as e:
 | |
|         log({"url": url, "status":"error", "error": str(e)})
 | |
|         raise
 |