")
return "\n".join(html)
def _safe_under(base: Path, rel_path: str) -> Path:
"""
Resolve rel_path safely under base. If an absolute path is provided and it is
already under base, allow it. Otherwise join to base. Reject any path that
escapes base.
"""
try:
p = Path(rel_path)
if p.is_absolute():
candidate = p.resolve()
else:
candidate = (base / rel_path).resolve()
except Exception:
raise FileNotFoundError("Invalid path")
base_str = str(base.resolve())
cand_str = str(candidate)
# allow exact base or any child path
if cand_str == base_str or cand_str.startswith(base_str + os.sep):
return candidate
raise FileNotFoundError("Path escapes base")
def _vtt_header():
return "WEBVTT\n\n"
def _srt_to_vtt_text(srt_text: str) -> str:
# Minimal conversion: SRT -> VTT timestamp format + header
# Replace commas in timecodes with dots
body = re.sub(r"(?m)(\d{2}:\d{2}:\d{2}),(\d{3})", r"\1.\2", srt_text)
# Ensure a WEBVTT header
if not body.lstrip().upper().startswith("WEBVTT"):
body = _vtt_header() + body
return body
def _json_to_vtt_text(json_text: str) -> str:
# Expect Whisper-style segments [{'start':..,'end':..,'text':..}, ...]
try:
data = json.loads(json_text)
except Exception:
return _vtt_header()
segments = data.get("segments") or data # support raw list
out = [_vtt_header()]
idx = 1
for seg in segments or []:
try:
start = float(seg.get("start", 0))
end = float(seg.get("end", start + 0.5))
text = str(seg.get("text", "")).strip()
except Exception:
continue
def fmt(t):
h = int(t // 3600); m = int((t % 3600) // 60); s = t - h*3600 - m*60
return f"{h:02d}:{m:02d}:{s:06.3f}".replace(",", ".")
out.append(f"{idx}")
out.append(f"{fmt(start)} --> {fmt(end)}")
out.append(text or "…")
out.append("") # blank line
idx += 1
return "\n".join(out).rstrip() + "\n"
def _parse_vtt_to_cues(vtt_text: str):
"""Very small VTT parser -> list of dicts {start,end,text} (seconds, seconds, str)."""
def to_seconds(ts: str) -> float:
# 00:00:00.000 or 00:00.000 (allow both)
parts = ts.replace(",", ".").split(":")
try:
if len(parts) == 3:
h, m, s = int(parts[0]), int(parts[1]), float(parts[2])
else:
h, m, s = 0, int(parts[0]), float(parts[1])
return h*3600 + m*60 + s
except Exception:
return 0.0
cues = []
lines = [ln.rstrip("\n\r") for ln in vtt_text.splitlines()]
i = 0
while i < len(lines):
ln = lines[i].strip()
i += 1
if not ln or ln.upper().startswith("WEBVTT"):
continue
# Optional numeric counter line
if ln.isdigit() and i < len(lines):
ln = lines[i].strip(); i += 1
if "-->" in ln:
try:
l, r = ln.split("-->", 1)
start = to_seconds(l.strip())
end = to_seconds(r.strip().split(" ")[0])
except Exception:
start = end = 0.0
texts = []
while i < len(lines) and lines[i].strip() != "":
texts.append(lines[i])
i += 1
# skip blank separator
while i < len(lines) and lines[i].strip() == "":
i += 1
cue_text = " ".join([t.strip() for t in texts]).strip()
if cue_text:
cues.append({"start": start, "end": end, "text": cue_text})
return cues
def _load_transcript_variants(basename: str):
"""
Return tuple (kind, content_text, path_used) where kind in {'vtt','srt','json','txt',None}
- Tries exact filename matches first.
- If not found, falls back to the first file whose name starts with the basename (prefix match).
"""
root = TRANSCRIPT_ROOT
def try_read(path: Path, k: str):
try:
rp = path.resolve()
if not str(rp).startswith(str(root)):
return None
if rp.exists():
with open(rp, "r", encoding="utf-8", errors="ignore") as f:
return (k, f.read(), str(rp))
except Exception:
return None
return None
# 1) exact matches
exact = [
(root / f"{basename}.vtt", "vtt"),
(root / f"{basename}.srt", "srt"),
(root / f"{basename}.json", "json"),
(root / f"{basename}.txt", "txt"),
]
for p, k in exact:
got = try_read(p, k)
if got:
return got
# 2) prefix/fuzzy matches (e.g., "*.vtt", "*.txt", etc.)
exts = [("vtt","vtt"), ("srt","srt"), ("json","json"), ("txt","txt")]
for ext, k in exts:
try:
for gp in root.glob(f"{basename}*.{ext}"):
got = try_read(gp, k)
if got:
return got
except Exception:
continue
return (None, "", "")
@app.get("/search")
def search():
qstr = request.args.get("q","")
hits = meili_search(qstr)
out=[]
for h in hits:
t = h.get("title","")
src = h.get("source","")
typ = h.get("type","")
ctx = h.get("_formatted",{}).get("text", h.get("text","")[:300])
segs = h.get("segments",[])
ts = int(segs[0]["start"]) if segs else 0
if typ == 'podcast':
open_link = f"/play?file={requests.utils.quote(src)}&t={ts}"
else:
open_link = f"/play?file={requests.utils.quote(src)}"
transcript_link = f" | Transcript" if typ == 'podcast' else ""
badge = f"{typ}"
out.append(
f"
"
)
return "\n".join(out) or "No results yet."
@app.get("/open")
def open_local():
file = request.args.get("file","")
t = int(request.args.get("t","0"))
return f"
{file}\nStart at: {t} sec
"
@app.get('/media')
def media():
rel = request.args.get('file', '')
try:
full = _safe_under(LIBRARY_ROOT, rel)
except Exception:
return abort(404)
if not full.exists():
return abort(404)
# Let Flask guess mimetype
return send_file(str(full), conditional=True)
@app.get('/play')
def play():
rel = request.args.get('file', '')
t = int(request.args.get('t', '0') or 0)
src = f"/media?file={requests.utils.quote(rel)}"
track = f"/subtitle?file={requests.utils.quote(rel)}&format=vtt"
return (
""
"Play"
""
f"
{rel}
"
f""
""
)
@app.get("/subtitle")
def subtitle():
file = request.args.get("file", "")
fmt = request.args.get("format", "").lower() # when 'vtt', serve as text/vtt for player
base = os.path.splitext(os.path.basename(file))[0]
kind, content, used_path = _load_transcript_variants(base)
# Build a VTT if requested/needed and we can
if fmt == "vtt":
vtt = ""
if kind == "vtt":
vtt = content if content.lstrip().upper().startswith("WEBVTT") else _vtt_header() + content
elif kind == "srt":
vtt = _srt_to_vtt_text(content)
elif kind == "json":
vtt = _json_to_vtt_text(content)
else:
# No structured timing available
return abort(404)
resp = make_response(vtt)
resp.headers["Content-Type"] = "text/vtt; charset=utf-8"
return resp
# Otherwise, render a simple HTML preview
if kind in ("vtt", "srt", "json"):
# Normalize to VTT first
if kind == "vtt":
vtt_text = content if content.lstrip().upper().startswith("WEBVTT") else _vtt_header() + content
elif kind == "srt":
vtt_text = _srt_to_vtt_text(content)
else:
vtt_text = _json_to_vtt_text(content)
# If ?raw=1 is present, show raw VTT for debugging
if request.args.get("raw") == "1":
return (
""
"Transcript"
""
f"
Transcript (raw VTT): {base}
"
f"
{vtt_text}
"
)
# Otherwise render a readable transcript with clickable timestamps
cues = _parse_vtt_to_cues(vtt_text)
# Build HTML list
items = []
for c in cues:
mm = int(c["start"] // 60)
ss = int(c["start"] % 60)
hh = int(c["start"] // 3600)
ts_label = f"{hh:02d}:{mm%60:02d}:{ss:02d}" if hh else f"{mm:02d}:{ss:02d}"
items.append(
"
"
f""
f"{c['text']}"
"
"
)
html = (
""
"Transcript"
""
f"
Transcript: {base}
"
"
Click a timestamp to open the player at that point.
"
f"
{''.join(items) or 'No cues found.'}
"
"
"
)
return html
elif kind == "txt":
# Normalize and lightly beautify plain text transcripts
safe = content.strip()
# Remove common timestamp patterns like [00:12:34], (00:12), 00:12:34 -
safe = re.sub(r"\[(\d{1,2}:){1,2}\d{2}(?:\.\d{1,3})?\]\s*", "", safe)
safe = re.sub(r"\((\d{1,2}:){1,2}\d{2}(?:\.\d{1,3})?\)\s*", "", safe)
safe = re.sub(r"(?m)^\s*(\d{1,2}:){1,2}\d{2}(?:\.\d{1,3})?\s*[-–—]?\s*", "", safe)
# Collapse multiple blank lines
safe = re.sub(r"\n{3,}", "\n\n", safe)
# Paragraphization: split on blank lines, collapse inner newlines to spaces
paras = [p.strip() for p in re.split(r"\n{2,}", safe) if p.strip()]
clean_paras = [re.sub(r'[\n\r]+', ' ', p) for p in paras[:2000]]
items = "".join(f"
{p}
" for p in clean_paras)
fallback = f"
{safe}
"
body = items if items else fallback
return (
""
"Transcript"
""
f"