#!/usr/bin/env python3 import argparse, sqlite3, re, sys, time from pathlib import Path from urllib.parse import urlparse, urlunparse import requests, tldextract from bs4 import BeautifulSoup from readability import Document as ReadabilityDoc import trafilatura from slugify import slugify from datetime import date def connect(db_path): con = sqlite3.connect(db_path) con.execute("PRAGMA foreign_keys=ON;") return con def upsert_source(con, url, title=None, publisher=None, date_published=None, content=None, tags=None): con.execute( """INSERT INTO sources(url, title, publisher, date_published, content) VALUES (?,?,?,?,?) ON CONFLICT(url) DO UPDATE SET title=COALESCE(excluded.title, title), publisher=COALESCE(excluded.publisher, publisher), date_published=COALESCE(excluded.date_published, date_published), content=COALESCE(excluded.content, content) """, (url, title, publisher, date_published, content) ) sid = con.execute("SELECT id FROM sources WHERE url=?", (url,)).fetchone()[0] con.commit() return sid def normalize_url(u: str) -> str: p = urlparse(u.strip()) if not p.scheme: p = p._replace(scheme="https") query = re.sub(r'(&|\?)?(utm_[^=&]+|fbclid|gclid)=[^&]*', '', p.query) if query.startswith('&'): query = query[1:] return urlunparse((p.scheme, p.netloc, p.path, p.params, query, "")) def domain(url: str) -> str: ext = tldextract.extract(url) return ".".join(part for part in [ext.domain, ext.suffix] if part) def fetch_readable(url: str, timeout=20): try: r = requests.get(url, timeout=timeout, headers={"User-Agent":"Mozilla/5.0"}) r.raise_for_status() html = r.text except Exception: return "", "" try: txt = trafilatura.extract(html, include_comments=False, include_tables=False, favor_recall=True) if txt: soup = BeautifulSoup(html, "html.parser") t = (soup.title.string.strip() if soup.title and soup.title.string else "") return t, txt.strip() except Exception: pass try: doc = ReadabilityDoc(html) content_html = doc.summary() soup = BeautifulSoup(content_html, "html.parser") txt = soup.get_text(separator="\n").strip() return (doc.short_title() or "").strip(), txt except Exception: return "", "" def write_stub(entries_dir: Path, title: str, url: str, source_name: str|None, text: str|None): entries_dir.mkdir(parents=True, exist_ok=True) from datetime import datetime slug = slugify(title or domain(url) or "clanek")[:60] or "clanek" path = entries_dir / f"{slug}.md" body = text.strip() if text else "" fm = f'''--- title: "{(title or "").replace('"',"\"")}" source_name: "{(source_name or domain(url) or "").replace('"',"\"")}" url: "{url}" tags: [] status: "todo" --- {body} ''' path.write_text(fm, encoding="utf-8") return path def read_lines(source_path: str|None): if source_path: return Path(source_path).read_text(encoding="utf-8", errors="ignore").splitlines() return sys.stdin.read().splitlines() def main(): ap = argparse.ArgumentParser(description="Ingest list of URLs into SQLite and optional entry stubs") ap.add_argument("--db", default="data/newsletter.db") ap.add_argument("--list", help="Text file with URLs (one per line). If omitted, read from STDIN.") ap.add_argument("--fetch", action="store_true", help="Fetch & extract readable text") ap.add_argument("--sleep", type=float, default=0.0) ap.add_argument("--stubs", action="store_true") ap.add_argument("--date", default=date.today().isoformat()) args = ap.parse_args() con = connect(args.db) lines = read_lines(args.list) urls = [] for ln in lines: ln = ln.strip() if not ln or ln.startswith("#"): continue if re.match(r"^\w+://", ln) or re.match(r"^[\w\.-]+\.[a-z]{2,}(/|$)", ln): urls.append(normalize_url(ln)) seen = set(); urls = [u for u in urls if not (u in seen or seen.add(u))] stubs_dir = Path(f"entries/{args.date}") if args.stubs else None kept = 0 for url in urls: pub = domain(url) title, text = ("","") if args.fetch: title, text = fetch_readable(url) sid = upsert_source(con, url=url, title=(title or None), publisher=pub, content=(text or None)) kept += 1 if stubs_dir: stub = write_stub(stubs_dir, title or url, url, pub, text) print(f"Stub: {stub}") if args.sleep: time.sleep(args.sleep) print(f"Ingested: {kept} URLs into {args.db}") if __name__ == "__main__": main()