131 lines
4.7 KiB
Python
131 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
import argparse, sqlite3, re, sys, time
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse, urlunparse
|
|
import requests, tldextract
|
|
from bs4 import BeautifulSoup
|
|
from readability import Document as ReadabilityDoc
|
|
import trafilatura
|
|
from slugify import slugify
|
|
from datetime import date
|
|
|
|
def connect(db_path):
|
|
con = sqlite3.connect(db_path)
|
|
con.execute("PRAGMA foreign_keys=ON;")
|
|
return con
|
|
|
|
def upsert_source(con, url, title=None, publisher=None, date_published=None, content=None, tags=None):
|
|
con.execute(
|
|
"""INSERT INTO sources(url, title, publisher, date_published, content)
|
|
VALUES (?,?,?,?,?)
|
|
ON CONFLICT(url) DO UPDATE SET
|
|
title=COALESCE(excluded.title, title),
|
|
publisher=COALESCE(excluded.publisher, publisher),
|
|
date_published=COALESCE(excluded.date_published, date_published),
|
|
content=COALESCE(excluded.content, content)
|
|
""", (url, title, publisher, date_published, content)
|
|
)
|
|
sid = con.execute("SELECT id FROM sources WHERE url=?", (url,)).fetchone()[0]
|
|
con.commit()
|
|
return sid
|
|
|
|
def normalize_url(u: str) -> str:
|
|
p = urlparse(u.strip())
|
|
if not p.scheme:
|
|
p = p._replace(scheme="https")
|
|
query = re.sub(r'(&|\?)?(utm_[^=&]+|fbclid|gclid)=[^&]*', '', p.query)
|
|
if query.startswith('&'): query = query[1:]
|
|
return urlunparse((p.scheme, p.netloc, p.path, p.params, query, ""))
|
|
|
|
def domain(url: str) -> str:
|
|
ext = tldextract.extract(url)
|
|
return ".".join(part for part in [ext.domain, ext.suffix] if part)
|
|
|
|
def fetch_readable(url: str, timeout=20):
|
|
try:
|
|
r = requests.get(url, timeout=timeout, headers={"User-Agent":"Mozilla/5.0"})
|
|
r.raise_for_status()
|
|
html = r.text
|
|
except Exception:
|
|
return "", ""
|
|
try:
|
|
txt = trafilatura.extract(html, include_comments=False, include_tables=False, favor_recall=True)
|
|
if txt:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
t = (soup.title.string.strip() if soup.title and soup.title.string else "")
|
|
return t, txt.strip()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
doc = ReadabilityDoc(html)
|
|
content_html = doc.summary()
|
|
soup = BeautifulSoup(content_html, "html.parser")
|
|
txt = soup.get_text(separator="\n").strip()
|
|
return (doc.short_title() or "").strip(), txt
|
|
except Exception:
|
|
return "", ""
|
|
|
|
def write_stub(entries_dir: Path, title: str, url: str, source_name: str|None, text: str|None):
|
|
entries_dir.mkdir(parents=True, exist_ok=True)
|
|
from datetime import datetime
|
|
slug = slugify(title or domain(url) or "clanek")[:60] or "clanek"
|
|
path = entries_dir / f"{slug}.md"
|
|
body = text.strip() if text else ""
|
|
fm = f'''---
|
|
title: "{(title or "").replace('"',"\"")}"
|
|
source_name: "{(source_name or domain(url) or "").replace('"',"\"")}"
|
|
url: "{url}"
|
|
tags: []
|
|
status: "todo"
|
|
---
|
|
|
|
{body}
|
|
'''
|
|
path.write_text(fm, encoding="utf-8")
|
|
return path
|
|
|
|
def read_lines(source_path: str|None):
|
|
if source_path:
|
|
return Path(source_path).read_text(encoding="utf-8", errors="ignore").splitlines()
|
|
return sys.stdin.read().splitlines()
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Ingest list of URLs into SQLite and optional entry stubs")
|
|
ap.add_argument("--db", default="data/newsletter.db")
|
|
ap.add_argument("--list", help="Text file with URLs (one per line). If omitted, read from STDIN.")
|
|
ap.add_argument("--fetch", action="store_true", help="Fetch & extract readable text")
|
|
ap.add_argument("--sleep", type=float, default=0.0)
|
|
ap.add_argument("--stubs", action="store_true")
|
|
ap.add_argument("--date", default=date.today().isoformat())
|
|
args = ap.parse_args()
|
|
|
|
con = connect(args.db)
|
|
lines = read_lines(args.list)
|
|
|
|
urls = []
|
|
for ln in lines:
|
|
ln = ln.strip()
|
|
if not ln or ln.startswith("#"):
|
|
continue
|
|
if re.match(r"^\w+://", ln) or re.match(r"^[\w\.-]+\.[a-z]{2,}(/|$)", ln):
|
|
urls.append(normalize_url(ln))
|
|
seen = set(); urls = [u for u in urls if not (u in seen or seen.add(u))]
|
|
|
|
stubs_dir = Path(f"entries/{args.date}") if args.stubs else None
|
|
kept = 0
|
|
for url in urls:
|
|
pub = domain(url)
|
|
title, text = ("","")
|
|
if args.fetch:
|
|
title, text = fetch_readable(url)
|
|
sid = upsert_source(con, url=url, title=(title or None), publisher=pub, content=(text or None))
|
|
kept += 1
|
|
if stubs_dir:
|
|
stub = write_stub(stubs_dir, title or url, url, pub, text)
|
|
print(f"Stub: {stub}")
|
|
if args.sleep: time.sleep(args.sleep)
|
|
print(f"Ingested: {kept} URLs into {args.db}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|