Introducing OpenAI offloaded transcription

This commit is contained in:
2025-09-24 09:36:00 +02:00
parent 47d13cde83
commit 73e89b9a67
4 changed files with 125 additions and 12 deletions

View File

@@ -1,4 +1,5 @@
import os, subprocess, shutil, json, re, orjson, requests, unicodedata
from types import SimpleNamespace
from rq import Queue
from redis import Redis
from pathlib import Path
@@ -105,6 +106,13 @@ OFFLOAD_TRANSCRIBE = os.getenv("OFFLOAD_TRANSCRIBE", "1").lower() not in ("0", "
WORKER_MODE = os.getenv("WORKER_MODE", "all").strip().lower() # 'all' or 'transcribe'
JOB_QUEUES = [q.strip() for q in os.getenv("JOB_QUEUES", "default").split(",") if q.strip()]
# Remote transcription (OpenAI) configuration
TRANSCRIBE_BACKEND = os.getenv("TRANSCRIBE_BACKEND", "local").strip().lower()
OPENAI_API_KEY = (os.getenv("OPENAI_API_KEY", "") or "").strip()
OPENAI_BASE_URL = (os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") or "https://api.openai.com/v1").rstrip("/")
OPENAI_TRANSCRIBE_MODEL = os.getenv("OPENAI_TRANSCRIBE_MODEL", "whisper-1").strip()
OPENAI_TRANSCRIBE_TIMEOUT = int(os.getenv("OPENAI_TRANSCRIBE_TIMEOUT", "600"))
def _mode_allows(task: str) -> bool:
"""Gate tasks by worker role. In 'transcribe' mode only allow transcription of local files
(including indexing and OWUI publish). "task" is one of: 'download','web','local','transcribe'."""
@@ -171,6 +179,73 @@ def run_transcribe_with_fallback(wav_path: Path, lang):
raise
raise
def run_transcribe_openai(wav_path: Path, lang_hint: str | None):
"""Transcribe audio via OpenAI's Whisper API, returning (segments, info, raw_payload)."""
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY must be set when TRANSCRIBE_BACKEND is 'openai'")
url = f"{OPENAI_BASE_URL}/audio/transcriptions"
headers = {"Authorization": f"Bearer {OPENAI_API_KEY}"}
data: dict[str, str] = {
"model": OPENAI_TRANSCRIBE_MODEL or "whisper-1",
"response_format": "verbose_json",
}
if lang_hint:
data["language"] = lang_hint
start = time.time()
with open(wav_path, "rb") as fh:
files = {"file": (wav_path.name, fh, "audio/wav")}
resp = requests.post(
url,
headers=headers,
data=data,
files=files,
timeout=OPENAI_TRANSCRIBE_TIMEOUT,
)
elapsed = time.time() - start
try:
resp.raise_for_status()
except requests.HTTPError as exc:
print(f"[openai] transcription failed ({exc}); response={resp.text[:400]}", flush=True)
raise
payload = resp.json()
segments_raw = payload.get("segments") or []
seg_objs: list[SimpleNamespace] = []
for seg in segments_raw:
seg_objs.append(
SimpleNamespace(
start=float(seg.get("start") or 0.0),
end=float(seg.get("end") or 0.0),
text=str(seg.get("text") or ""),
)
)
if not seg_objs and payload.get("text"):
duration = float(payload.get("duration") or 0.0)
seg_objs.append(
SimpleNamespace(
start=0.0,
end=duration,
text=str(payload.get("text") or ""),
)
)
language = payload.get("language") or lang_hint or ""
info = SimpleNamespace(language=language)
print(
f"[openai] transcribed {wav_path.name} via {OPENAI_TRANSCRIBE_MODEL or 'whisper-1'} "
f"in {elapsed:.1f}s; segments={len(seg_objs)} lang={language or 'unknown'}",
flush=True,
)
return seg_objs, info, payload
def log(feed):
try:
with open(TRN / "_feed.log", "a", encoding="utf-8") as f:
@@ -908,7 +983,8 @@ def _save_partial(title: str, language: str, segs: list[dict]):
print(f"[whisper] partial txt save failed: {e}", flush=True)
def transcribe(media_path: Path):
print(f"[whisper] start transcribe: {media_path}", flush=True)
backend = TRANSCRIBE_BACKEND
print(f"[transcribe] start backend={backend}: {media_path}", flush=True)
# If paused, abort before any heavy work (no ffmpeg, no model load)
if transcribe_paused():
print(f"[pause] transcribe: pause active before heavy work; aborting {media_path}", flush=True)
@@ -927,12 +1003,13 @@ def transcribe(media_path: Path):
title = media_path.stem
base = TRN / title
resume_enabled = (backend != "openai") and WHISPER_RESUME
# Resume support: if a partial checkpoint exists, load it and trim input
resume_segments = []
resume_offset = 0.0
language_hint = None
if WHISPER_RESUME:
if resume_enabled:
pjson, ptxt = _partial_paths(title)
if pjson.exists():
try:
@@ -946,7 +1023,10 @@ def transcribe(media_path: Path):
print(f"[whisper] failed to load partial: {e}", flush=True)
# If resuming, trim WAV from last end time
wav_for_run = trim_wav(wav, resume_offset, TMP)
if resume_enabled and resume_offset > 0.0:
wav_for_run = trim_wav(wav, resume_offset, TMP)
else:
wav_for_run = wav
# 2) Language selection
lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE
@@ -954,14 +1034,18 @@ def transcribe(media_path: Path):
# carry hint forward if available
lang = language_hint
# 3) Transcribe
segments, info = run_transcribe_with_fallback(wav_for_run, lang)
# 3) Transcribe (local Whisper or OpenAI backend)
payload = None
if backend == "openai":
segments, info, payload = run_transcribe_openai(wav_for_run, lang)
else:
segments, info = run_transcribe_with_fallback(wav_for_run, lang)
# Determine duration for progress; use full WAV duration for consistent % regardless of resume
dur = media_duration_seconds(wav) or 0.0
# Start wall clock timer for speed/ETA
start_wall = time.time()
if WHISPER_RESUME and resume_offset and dur and resume_offset >= dur:
if resume_enabled and resume_offset and dur and resume_offset >= dur:
print(f"[whisper] resume offset {resume_offset:.2f}s >= duration {dur:.2f}s; resetting resume.", flush=True)
resume_offset = 0.0
last_pct = -1
@@ -981,7 +1065,7 @@ def transcribe(media_path: Path):
text_parts.append(s.text)
# --- Cooperative pause: save checkpoint and abort as soon as pause is requested ---
if transcribe_paused():
if resume_enabled and transcribe_paused():
try:
pct = int(min(100, max(0, (end / dur) * 100))) if dur > 0 else 0
except Exception:
@@ -1037,7 +1121,7 @@ def transcribe(media_path: Path):
# periodic partial save
seg_count_since_save += 1
if WHISPER_RESUME and seg_count_since_save >= PARTIAL_SAVE_EVERY_SEGS:
if resume_enabled and seg_count_since_save >= PARTIAL_SAVE_EVERY_SEGS:
_save_partial(title, info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != "auto" else "en"), segs)
seg_count_since_save = 0
@@ -1109,7 +1193,7 @@ def transcribe(media_path: Path):
pass
# Remove partial checkpoints on success
if WHISPER_RESUME:
if resume_enabled:
try:
pjson, ptxt = _partial_paths(title)
if pjson.exists(): pjson.unlink()
@@ -1124,7 +1208,10 @@ def transcribe(media_path: Path):
print(f"[whisper] avg speed ~{avg_rtf:0.2f}x (audio_seconds / wall_seconds)", flush=True)
except Exception:
pass
print(f"[whisper] finished: {media_path} lang={info.language} segments={len(segs)} dur={dur:.2f}s", flush=True)
print(
f"[transcribe] backend={backend} finished: {media_path} lang={info.language} segments={len(segs)} dur={dur:.2f}s",
flush=True,
)
return base
@@ -1776,4 +1863,4 @@ def handle_url(url: str):
log({**info, **{"status":"done"}})
except Exception as e:
log({"url": url, "status":"error", "error": str(e)})
raise
raise