diff --git a/app/app.py b/app/app.py index 7c90a76..98c2d34 100644 --- a/app/app.py +++ b/app/app.py @@ -1,5 +1,6 @@ from flask import Flask, request, redirect -import os, json, requests +import os, json, time, requests +from pathlib import Path from redis import Redis from rq import Queue @@ -8,6 +9,7 @@ MEILI_KEY = os.getenv("MEILI_KEY", "") # from .env REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") app = Flask(__name__) +FEED_LOG = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) / "_feed.log" q = Queue(connection=Redis.from_url(REDIS_URL)) PAGE = """ @@ -64,9 +66,74 @@ doSearch(); setTimeout(poll, 4000); })(); + +
+

Activity

+
Loading…
+

+
+ """ +def read_feed_tail(max_lines: int = 200): + if not FEED_LOG.exists(): + return [] + try: + with open(FEED_LOG, "rb") as f: + try: + f.seek(-65536, 2) # read last ~64KB + except OSError: + f.seek(0) + data = f.read().decode("utf-8", errors="ignore") + except Exception: + return [] + lines = [x.strip() for x in data.splitlines() if x.strip()] + events = [] + for ln in lines[-max_lines:]: + try: + events.append(json.loads(ln)) + except Exception: + pass + return events + +@app.get("/api/status") +def api_status(): + events = read_feed_tail(200) + last = events[-1] if events else {} + summary = { + "last_status": last.get("status"), + "last_title": last.get("title") or last.get("path") or last.get("url"), + "last_time": int(time.time()), + "count": len(events), + } + return {"ok": True, "summary": summary, "events": events} + def meili_search(qstr, limit=30): if not qstr.strip(): return [] diff --git a/app/scanner.py b/app/scanner.py index 33d0e8f..24ab518 100644 --- a/app/scanner.py +++ b/app/scanner.py @@ -14,6 +14,12 @@ TRN = Path(os.getenv("TRANSCRIPT_ROOT", "/transcripts")) REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0") SCAN_INTERVAL = int(os.getenv("SCAN_INTERVAL", "30")) # seconds +# RQ job configuration +JOB_TIMEOUT = int(os.getenv("JOB_TIMEOUT", "14400")) # 4 hours +JOB_TTL = int(os.getenv("JOB_TTL", "86400")) # 24 hours +RESULT_TTL = int(os.getenv("RESULT_TTL", "86400")) # 24 hours +FAILURE_TTL = int(os.getenv("FAILURE_TTL", "86400")) # 24 hours + # Media types to track MEDIA_EXT = { ".mp3", ".m4a", ".mp4", ".mkv", ".wav", ".flac", ".webm", ".ogg", ".opus" @@ -51,11 +57,20 @@ def enqueue_new_files(): continue if already_transcribed(p): _seen.add(key) + print(f"[scanner] Skip (already transcribed): {p}", flush=True) continue - # Enqueue the worker to process this local file - q.enqueue("worker.handle_local_file", key) + # Enqueue the worker to process this local file (with generous timeouts) + q.enqueue( + "worker.handle_local_file", + key, + job_timeout=JOB_TIMEOUT, + ttl=JOB_TTL, + result_ttl=RESULT_TTL, + failure_ttl=FAILURE_TTL, + ) _seen.add(key) new_jobs += 1 + print(f"[scanner] Enqueued: {p}", flush=True) return new_jobs diff --git a/app/worker.py b/app/worker.py index 9db97d4..488d4df 100644 --- a/app/worker.py +++ b/app/worker.py @@ -1,5 +1,6 @@ import os, subprocess, shutil, json, re, orjson, requests from pathlib import Path +import math from faster_whisper import WhisperModel MEILI_URL = os.getenv("MEILI_URL", "http://meili:7700") @@ -77,31 +78,111 @@ def yt_dlp(url, outdir): list(outdir.rglob("*.mp3"))) return sorted(media, key=lambda p: p.stat().st_mtime)[-1:] +def extract_audio(src: Path, outdir: Path) -> Path: + """Extract mono 16kHz WAV for robust transcription (handles odd containers/codecs).""" + outdir.mkdir(parents=True, exist_ok=True) + wav_path = outdir / (src.stem + ".wav") + # Force audio-only, mono, 16kHz WAV + cmd = [ + "ffmpeg", "-nostdin", "-y", + "-i", str(src), + "-vn", "-ac", "1", "-ar", "16000", + "-f", "wav", str(wav_path), + ] + try: + subprocess.check_output(cmd, stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + raise RuntimeError(f"ffmpeg extract failed: {e.output.decode(errors='ignore')}") + return wav_path + +def media_duration_seconds(path: Path) -> float: + """Return duration in seconds using ffprobe; fallback to 0.0 on error.""" + try: + out = subprocess.check_output([ + "ffprobe", "-v", "error", "-show_entries", "format=duration", + "-of", "default=nokey=1:noprint_wrappers=1", str(path) + ], stderr=subprocess.STDOUT, text=True).strip() + return float(out) if out else 0.0 + except Exception: + return 0.0 + def transcribe(media_path: Path): model = get_model() + # 1) Robustly extract audio to 16k mono WAV (fixes pyAV/webm edge cases) + wav = extract_audio(media_path, TMP) + + # 2) Language lang = None if WHISPER_LANGUAGE.lower() == "auto" else WHISPER_LANGUAGE - segments, info = model.transcribe(str(media_path), vad_filter=True, language=lang) + + # 3) Transcribe + segments, info = model.transcribe(str(wav), vad_filter=True, language=lang) + title = media_path.stem base = TRN / title + + # Determine duration for progress; use extracted WAV (accurate for transcription input) + dur = media_duration_seconds(wav) or 0.0 + last_pct = -1 + segs, text_parts = [], [] for s in segments: - segs.append({"start": s.start, "end": s.end, "text": s.text}) + seg = {"start": s.start, "end": s.end, "text": s.text} + segs.append(seg) text_parts.append(s.text) + # progress logging every +5% + if dur > 0 and s.end is not None: + pct = int(min(100, max(0, (s.end / dur) * 100))) + if pct >= last_pct + 5: + log({ + "status": "transcribing", + "path": str(media_path), + "title": title, + "progress": pct + }) + last_pct = pct + # ensure we mark 100% on completion + if last_pct < 100: + log({"status": "transcribing", "path": str(media_path), "title": title, "progress": 100}) + txt = " ".join(text_parts).strip() - open(base.with_suffix(".json"), "wb").write(orjson.dumps({"file": str(media_path), "language": info.language, "segments": segs})) + # Write transcript artifacts + open(base.with_suffix(".json"), "wb").write(orjson.dumps({ + "file": str(media_path), + "language": info.language, + "segments": segs + })) open(base.with_suffix(".txt"), "w", encoding="utf-8").write(txt) - def fmt_ts(t): + def fmt_ts(t): h=int(t//3600); m=int((t%3600)//60); s=t-(h*3600+m*60) return f"{h:02}:{m:02}:{s:06.3f}".replace('.',',') + with open(base.with_suffix(".srt"), "w", encoding="utf-8") as srt: for i,s in enumerate(segs,1): srt.write(f"{i}\n{fmt_ts(s['start'])} --> {fmt_ts(s['end'])}\n{s['text'].strip()}\n\n") + with open(base.with_suffix(".vtt"), "w", encoding="utf-8") as vtt: vtt.write("WEBVTT\n\n") for s in segs: vtt.write(f"{fmt_ts(s['start']).replace(',', '.')} --> {fmt_ts(s['end']).replace(',', '.')} \n{s['text'].strip()}\n\n") + + # 4) Copy SRT next to media for Plex (language-suffixed) + try: + lang_code = (info.language or (WHISPER_LANGUAGE if WHISPER_LANGUAGE.lower() != 'auto' else 'en')).lower() + srt_src = base.with_suffix(".srt") + srt_dst = media_path.with_suffix(f".{lang_code}.srt") + shutil.copy2(srt_src, srt_dst) + except Exception as e: + print(f"[post] could not copy srt -> {srt_dst}: {e}", flush=True) + + # Optional: cleanup temporary WAV + try: + if wav.exists(): + wav.unlink() + except Exception: + pass + return base def index_meili(json_path: Path): @@ -247,7 +328,7 @@ def handle_local_file(path_str: str): if base_json.exists(): log({"url": path_str, "status": "skip", "reason": "already_transcribed"}) return - info = {"url": path_str, "status": "transcribing", "title": title, "uploader": p.parent.name, "date": "", "path": str(p)} + info = {"url": path_str, "status": "transcribing", "title": title, "uploader": p.parent.name, "date": "", "path": str(p), "progress": 0} log(info) base = transcribe(p) index_meili(base.with_suffix(".json")) @@ -292,7 +373,7 @@ def handle_url(url: str): info.update({"title": dest.stem, "uploader": uploader, "date": (re.findall(r"\b(\d{8})\b", dest.stem)[0] if re.findall(r"\b(\d{8})\b", dest.stem) else ""), "path": str(dest)}) - log({**info, **{"status":"transcribing"}}) + log({**info, **{"status":"transcribing", "progress": 0}}) base = transcribe(dest) index_meili(base.with_suffix(".json")) publish_to_openwebui([base.with_suffix(".txt")]) diff --git a/docker-compose.yml b/docker-compose.yml index 7d9942d..e040922 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,7 +30,7 @@ services: podx-worker: build: ./app container_name: podx-worker - command: ["rq", "worker", "-u", "redis://redis:6379/0", "default"] + command: ["rq", "worker", "-u", "redis://redis:6379/0", "--job-timeout", "14400", "default"] env_file: [.env] environment: MEILI_URL: http://meili:7700