227 lines
9.2 KiB
Python
227 lines
9.2 KiB
Python
import os, subprocess, shutil, json, re, orjson, requests
|
|
from pathlib import Path
|
|
from faster_whisper import WhisperModel
|
|
|
|
MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700")
|
|
MEILI_KEY = os.getenv("MEILI_KEY", "")
|
|
LIB = Path(os.getenv("LIBRARY_ROOT", "/library"))
|
|
TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts"))
|
|
TMP = Path(os.getenv("TMP_ROOT", "/tmpdl"))
|
|
MODEL_NAME = os.getenv("WHISPER_MODEL","large-v3")
|
|
COMPUTE = os.getenv("WHISPER_PRECISION","int8")
|
|
|
|
OWUI_URL = os.getenv("OPENWEBUI_URL", "").rstrip("/")
|
|
OWUI_KEY = os.getenv("OPENWEBUI_API_KEY", "")
|
|
OWUI_KB = os.getenv("OPENWEBUI_KB_NAME", "Homelab Library")
|
|
|
|
TRN.mkdir(parents=True, exist_ok=True)
|
|
LIB.mkdir(parents=True, exist_ok=True)
|
|
TMP.mkdir(parents=True, exist_ok=True)
|
|
|
|
model = WhisperModel(MODEL_NAME, compute_type=COMPUTE)
|
|
|
|
def log(feed):
|
|
try:
|
|
with open(TRN / "_feed.log", "a", encoding="utf-8") as f:
|
|
f.write(orjson.dumps(feed).decode()+"\n")
|
|
except Exception:
|
|
pass
|
|
|
|
def sanitize(name):
|
|
return re.sub(r'[\\/:"*?<>|]+', ' ', name).strip()
|
|
|
|
def yt_dlp(url, outdir):
|
|
outtmpl = str(outdir / "%(uploader)s/%(upload_date)s - %(title)s.%(ext)s")
|
|
cmd = [
|
|
"yt-dlp", "-o", outtmpl,
|
|
"-f", "bv*+ba/best",
|
|
"-x", "--audio-format", "m4a",
|
|
"--write-thumbnail",
|
|
"--no-playlist", "--no-warnings", "--restrict-filenames",
|
|
url
|
|
]
|
|
subprocess.check_call(cmd)
|
|
media = list(outdir.rglob("*.[mM][pP]4")) + list(outdir.rglob("*.mkv")) + list(outdir.rglob("*.m4a")) + list(outdir.rglob("*.mp3"))
|
|
return sorted(media, key=lambda p: p.stat().st_mtime)[-1:]
|
|
|
|
def transcribe(media_path: Path):
|
|
segments, info = model.transcribe(str(media_path), vad_filter=True, language="auto")
|
|
title = media_path.stem
|
|
base = TRN / title
|
|
segs, text_parts = [], []
|
|
for s in segments:
|
|
segs.append({"start": s.start, "end": s.end, "text": s.text})
|
|
text_parts.append(s.text)
|
|
txt = " ".join(text_parts).strip()
|
|
|
|
open(base.with_suffix(".json"), "wb").write(orjson.dumps({"file": str(media_path), "language": info.language, "segments": segs}))
|
|
open(base.with_suffix(".txt"), "w", encoding="utf-8").write(txt)
|
|
|
|
def fmt_ts(t):
|
|
h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60)
|
|
return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',')
|
|
with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt:
|
|
for i,s in enumerate(segs,1):
|
|
srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n")
|
|
with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt:
|
|
vtt.write("WEBVTT\n\n")
|
|
for s in segs:
|
|
vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n")
|
|
return base
|
|
|
|
def index_meili(json_path: Path):
|
|
doc = json.loads(open(json_path, "r", encoding="utf-8").read())
|
|
title = Path(doc["file"]).stem
|
|
date = re.findall(r"\b(\d{8})\b", title)
|
|
payload = {
|
|
"id": title,
|
|
"type": "podcast",
|
|
"title": title,
|
|
"date": date[0] if date else "",
|
|
"source": str(Path(LIB, Path(doc["file"]).name)),
|
|
"text": " ".join(s["text"] for s in doc.get("segments", [])),
|
|
"segments": doc.get("segments", []),
|
|
"meta": {"language": doc.get("language", "")}
|
|
}
|
|
r = requests.post(f"{MEILI_URL}/indexes/library/documents",
|
|
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"},
|
|
data=orjson.dumps(payload))
|
|
r.raise_for_status()
|
|
|
|
import tldextract, trafilatura, requests as _requests
|
|
|
|
def slugify(text):
|
|
text = re.sub(r'[^A-Za-z0-9\-._ ]+', '', text).strip().replace(' ', '_')
|
|
return text[:120] or 'page'
|
|
|
|
def save_web_snapshot(url: str):
|
|
r = _requests.get(url, timeout=30, headers={"User-Agent":"Mozilla/5.0"})
|
|
r.raise_for_status()
|
|
html = r.text
|
|
downloaded = trafilatura.load_html(html, url=url)
|
|
text = trafilatura.extract(downloaded, include_comments=False, include_images=False, with_metadata=True) or ""
|
|
meta = trafilatura.metadata.extract_metadata(downloaded) or None
|
|
title = (meta.title if meta and getattr(meta, 'title', None) else None) or (re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S).group(1).strip() if re.search(r'<title[^>]*>(.*?)</title>', html, re.I|re.S) else url)
|
|
date = (meta.date if meta and getattr(meta, 'date', None) else "")
|
|
parts = tldextract.extract(url)
|
|
domain = ".".join([p for p in [parts.domain, parts.suffix] if p])
|
|
slug = slugify(title)
|
|
outdir = LIB / "web" / domain
|
|
outdir.mkdir(parents=True, exist_ok=True)
|
|
base = outdir / slug
|
|
open(base.with_suffix(".html"), "w", encoding="utf-8", errors="ignore").write(html)
|
|
open(base.with_suffix(".txt"), "w", encoding="utf-8", errors="ignore").write(text)
|
|
return base, title, domain, date, text
|
|
|
|
def index_web(base: Path, title: str, domain: str, date: str, text: str, url: str):
|
|
payload = {
|
|
"id": f"web:{domain}:{base.stem}",
|
|
"type": "web",
|
|
"title": title,
|
|
"date": re.sub(r'[^0-9]', '', date)[:8] if date else "",
|
|
"source": f"file://{str(base.with_suffix('.html'))}",
|
|
"text": text,
|
|
"segments": [],
|
|
"meta": {"url": url, "domain": domain}
|
|
}
|
|
r = requests.post(f"{MEILI_URL}/indexes/library/documents",
|
|
headers={"Authorization": f"Bearer {MEILI_KEY}", "Content-Type":"application/json"},
|
|
data=orjson.dumps(payload))
|
|
r.raise_for_status()
|
|
|
|
def is_media_url(url: str):
|
|
lowered = url.lower()
|
|
media_hosts = ["youtube.com","youtu.be","rumble.com","vimeo.com","soundcloud.com","spotify.com","podbean.com","buzzsprout.com"]
|
|
return any(h in lowered for h in media_hosts)
|
|
|
|
def owui_headers():
|
|
return {"Authorization": f"Bearer {OWUI_KEY}"} if OWUI_KEY else {}
|
|
|
|
def owui_get_or_create_kb():
|
|
if not OWUI_URL or not OWUI_KEY:
|
|
return None
|
|
try:
|
|
r = requests.get(f"{OWUI_URL}/api/v1/knowledge/list", headers=owui_headers(), timeout=15)
|
|
r.raise_for_status()
|
|
for kb in r.json().get("data", []):
|
|
if kb.get("name") == OWUI_KB:
|
|
return kb["id"]
|
|
except Exception:
|
|
pass
|
|
r = requests.post(
|
|
f"{OWUI_URL}/api/v1/knowledge/create",
|
|
headers={**owui_headers(), "Content-Type": "application/json"},
|
|
data=orjson.dumps({"name": OWUI_KB, "description": "All local content indexed by podx"}),
|
|
timeout=15,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()["data"]["id"]
|
|
|
|
def owui_upload_and_attach(path: Path, kb_id: str):
|
|
with open(path, "rb") as f:
|
|
r = requests.post(f"{OWUI_URL}/api/v1/files/", headers=owui_headers(), files={"file": (path.name, f)}, timeout=60*10)
|
|
r.raise_for_status()
|
|
file_id = r.json()["data"]["id"]
|
|
r = requests.post(
|
|
f"{OWUI_URL}/api/v1/knowledge/{kb_id}/file/add",
|
|
headers={**owui_headers(), "Content-Type": "application/json"},
|
|
data=orjson.dumps({"file_id": file_id}),
|
|
timeout=60,
|
|
)
|
|
r.raise_for_status()
|
|
return True
|
|
|
|
def publish_to_openwebui(paths):
|
|
if not OWUI_URL or not OWUI_KEY:
|
|
return
|
|
try:
|
|
kb_id = owui_get_or_create_kb()
|
|
for p in paths:
|
|
p = Path(p)
|
|
if not p.exists():
|
|
continue
|
|
try:
|
|
owui_upload_and_attach(p, kb_id)
|
|
except Exception as e:
|
|
log({"url": str(p), "status": "owui_error", "error": str(e)})
|
|
except Exception as e:
|
|
log({"status": "owui_error", "error": str(e)})
|
|
|
|
def handle_web(url: str):
|
|
info = {"url": url, "status":"web-downloading", "title":"", "uploader":"", "date":"", "path":""}
|
|
log(info)
|
|
base, title, domain, date, text = save_web_snapshot(url)
|
|
info.update({"title": title, "uploader": domain, "date": date, "path": str(base.with_suffix('.html'))})
|
|
log({**info, **{"status":"web-indexing"}})
|
|
index_web(base, title, domain, date, text, url)
|
|
push = [p for p in [base.with_suffix('.txt'), base.with_suffix('.html')] if p.exists()]
|
|
publish_to_openwebui(push)
|
|
log({**info, **{"status":"done"}})
|
|
|
|
def handle_url(url: str):
|
|
try:
|
|
if not is_media_url(url):
|
|
handle_web(url)
|
|
return
|
|
info = {"url": url, "status":"queued", "title":"", "uploader":"", "date":"", "path":""}
|
|
log({**info, **{"status":"downloading"}})
|
|
files = yt_dlp(url, TMP)
|
|
for f in files:
|
|
parts = f.relative_to(TMP).parts
|
|
uploader = sanitize(parts[0]) if len(parts)>1 else "Unknown"
|
|
dest_dir = LIB / uploader
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
dest = dest_dir / sanitize(f.name)
|
|
shutil.move(str(f), dest)
|
|
info.update({"title": dest.stem, "uploader": uploader,
|
|
"date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""),
|
|
"path": str(dest)})
|
|
log({**info, **{"status":"transcribing"}})
|
|
base = transcribe(dest)
|
|
index_meili(base.with_suffix(".json"))
|
|
publish_to_openwebui([base.with_suffix(".txt")])
|
|
log({**info, **{"status":"done"}})
|
|
except Exception as e:
|
|
log({"url": url, "status":"error", "error": str(e)})
|
|
raise
|