feat: initial KosmoConnect platform v0.1
Includes: - Backend services: ingestion (:8001), weather API (:8002), gateway (:8003), billing (:8004) with BTCPay integration - Shared asyncpg pool, TimescaleDB hypertable, Redis, Mosquitto MQTT - React frontend: Dashboard (MapLibre) and Messaging (chat UI) - Bridge daemon for Pi + Meshtastic (Serial/TCP T-Deck support) - Production Docker Compose, Nginx reverse proxy, ops scripts - DEPLOY.md with step-by-step deployment guide
This commit is contained in:
104
backend/gateway/README.md
Normal file
104
backend/gateway/README.md
Normal file
@@ -0,0 +1,104 @@
|
||||
# KosmoConnect Gateway Service
|
||||
|
||||
The **Gateway Service** handles all web-to-mesh and mesh-to-web messaging. It is the monetization boundary of the network.
|
||||
|
||||
## What It Does
|
||||
|
||||
- **Subscription Enforcement**: Validates that the user has an active subscription and that their plan allows messaging the target node
|
||||
- **Quota Management**: Tracks monthly message usage and rejects requests when limits are exceeded
|
||||
- **Outbound Queue**: Accepts web messages, stores them in PostgreSQL, and publishes them to MQTT for bridge delivery
|
||||
- **Inbound Consumer**: Listens to `kosmo/mesh/inbound` and stores replies, automatically threading them into conversations
|
||||
- **Delivery Tracking**: Message status progresses `pending -> queued -> transmitted -> delivered` (future: bridge ACKs will update to `transmitted`)
|
||||
|
||||
## Endpoints
|
||||
|
||||
| Method | Path | Description |
|
||||
|--------|------|-------------|
|
||||
| POST | `/api/v1/messages` | Send a message to a mesh node |
|
||||
| GET | `/api/v1/messages/conversations` | List all conversations for the user |
|
||||
| GET | `/api/v1/messages/conversations/{node_id}` | Get full message history with a node |
|
||||
| GET | `/api/v1/messages/{message_id}` | Check delivery status of a message |
|
||||
|
||||
## Authentication (v0.1)
|
||||
|
||||
For rapid development, the gateway currently uses a simple `X-User-ID` header to identify the caller. In production this will be replaced with JWT/OAuth2.
|
||||
|
||||
## Billing
|
||||
|
||||
Subscription management is handled by the [Billing Service](../billing/README.md), which integrates with the Church of Kosmo's BTCPay Server at `pay.cqre.net`. The Gateway does not process payments itself; it only reads subscription state from the shared PostgreSQL database.
|
||||
|
||||
## Subscription Scopes
|
||||
|
||||
| Plan | Scope | Quota (example) |
|
||||
|------|-------|-----------------|
|
||||
| `wanderer` | Any node on the mesh | 50/month |
|
||||
| `guardian` | Only whitelisted nodes | 500/month |
|
||||
| `sanctuary` | Any node + API/webhooks | Unlimited |
|
||||
| `free` | Receive only | 0 outbound |
|
||||
|
||||
## Running Locally
|
||||
|
||||
Make sure the backend infrastructure (Postgres, MQTT) is running:
|
||||
|
||||
```bash
|
||||
cd backend
|
||||
docker-compose up -d
|
||||
```
|
||||
|
||||
Seed test users (only needed once):
|
||||
|
||||
```bash
|
||||
docker-compose exec -T timescaledb psql -U kosmo -d kosmoconnect < migrations/002_seed_test_users.sql
|
||||
```
|
||||
|
||||
Start the gateway:
|
||||
|
||||
```bash
|
||||
./run-dev.sh gateway
|
||||
```
|
||||
|
||||
## Testing with cURL
|
||||
|
||||
```bash
|
||||
# Send a message (test wanderer user)
|
||||
curl -X POST http://localhost:8003/api/v1/messages \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "X-User-ID: 11111111-1111-1111-1111-111111111111" \
|
||||
-d '{"target_node_id": "!a1b2c3d4", "text": "Hello mesh"}'
|
||||
|
||||
# List conversations
|
||||
curl http://localhost:8003/api/v1/messages/conversations \
|
||||
-H "X-User-ID: 11111111-1111-1111-1111-111111111111"
|
||||
|
||||
# Check message status
|
||||
curl http://localhost:8003/api/v1/messages/{message_id} \
|
||||
-H "X-User-ID: 11111111-1111-1111-1111-111111111111"
|
||||
```
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
Web Client
|
||||
|
|
||||
| POST /api/v1/messages (X-User-ID)
|
||||
v
|
||||
Gateway Service (:8003)
|
||||
|- Checks subscription + quota in PostgreSQL
|
||||
|- Writes message to mesh_messages (status=pending)
|
||||
|- Background worker publishes pending rows to MQTT
|
||||
|
|
||||
v
|
||||
MQTT Broker (kosmo/mesh/outbound/{node_id})
|
||||
|
|
||||
v
|
||||
Bridge Daemon (Pi) -> Meshtastic Mesh -> Target Node
|
||||
|
||||
Reply path:
|
||||
Target Node -> Mesh -> Bridge Daemon -> MQTT (kosmo/mesh/inbound)
|
||||
|
|
||||
v
|
||||
Gateway Service consumes MQTT and writes reply to mesh_messages
|
||||
|
|
||||
v
|
||||
Web Client reads via GET /api/v1/messages/conversations
|
||||
```
|
||||
0
backend/gateway/src/__init__.py
Normal file
0
backend/gateway/src/__init__.py
Normal file
362
backend/gateway/src/main.py
Normal file
362
backend/gateway/src/main.py
Normal file
@@ -0,0 +1,362 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
KosmoConnect Gateway Service
|
||||
|
||||
Handles web-to-mesh messaging:
|
||||
- Accepts outbound messages from web clients
|
||||
- Validates subscriptions and quotas
|
||||
- Publishes to MQTT for bridge delivery
|
||||
- Consumes inbound mesh messages and stores them as replies
|
||||
- Tracks delivery status
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import uuid
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
import aiomqtt
|
||||
from fastapi import FastAPI, Header, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../shared"))
|
||||
|
||||
from db import get_pool
|
||||
from gateway.src.models import SendMessageRequest, SendMessageResponse
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger("gateway")
|
||||
|
||||
MQTT_HOST = os.getenv("MQTT_HOST", "localhost")
|
||||
MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
|
||||
MQTT_TOPIC_INBOUND = os.getenv("MQTT_TOPIC_INBOUND", "kosmo/mesh/inbound")
|
||||
MQTT_TOPIC_OUTBOUND_PREFIX = os.getenv("MQTT_TOPIC_OUTBOUND_PREFIX", "kosmo/mesh/outbound")
|
||||
|
||||
pool = None
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Subscription / Quota Enforcement
|
||||
# ============================================================
|
||||
async def get_active_subscription(user_id: str):
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT id, plan_type, message_quota, messages_used, valid_until
|
||||
FROM subscriptions
|
||||
WHERE user_id = $1 AND is_active = true
|
||||
AND valid_from <= NOW() AND (valid_until IS NULL OR valid_until > NOW())
|
||||
ORDER BY valid_from DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
user_id,
|
||||
)
|
||||
return row
|
||||
|
||||
|
||||
async def can_send_to_node(user_id: str, target_node_id: str) -> bool:
|
||||
sub = await get_active_subscription(user_id)
|
||||
if not sub:
|
||||
return False
|
||||
|
||||
if sub["plan_type"] in ("wanderer", "sanctuary"):
|
||||
return True
|
||||
|
||||
if sub["plan_type"] == "guardian":
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"SELECT 1 FROM allowed_nodes WHERE user_id = $1 AND mesh_node_id = $2",
|
||||
user_id,
|
||||
target_node_id,
|
||||
)
|
||||
return row is not None
|
||||
|
||||
# free plan cannot send outbound
|
||||
return False
|
||||
|
||||
|
||||
async def check_and_increment_quota(user_id: str) -> bool:
|
||||
sub = await get_active_subscription(user_id)
|
||||
if not sub:
|
||||
return False
|
||||
if sub["message_quota"] is not None and sub["messages_used"] >= sub["message_quota"]:
|
||||
return False
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"UPDATE subscriptions SET messages_used = messages_used + 1 WHERE id = $1",
|
||||
sub["id"],
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
# ============================================================
|
||||
# MQTT Outbound Worker
|
||||
# ============================================================
|
||||
async def mqtt_outbound_worker():
|
||||
"""Background task: pick up pending messages and publish to MQTT."""
|
||||
logger.info("Starting MQTT outbound worker")
|
||||
while True:
|
||||
try:
|
||||
async with aiomqtt.Client(MQTT_HOST, MQTT_PORT) as client:
|
||||
while True:
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT id, target_node_id, text
|
||||
FROM mesh_messages
|
||||
WHERE direction = 'outbound' AND status = 'pending'
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 50
|
||||
"""
|
||||
)
|
||||
|
||||
for row in rows:
|
||||
topic = f"{MQTT_TOPIC_OUTBOUND_PREFIX}/{row['target_node_id']}"
|
||||
payload = {
|
||||
"message_id": str(row["id"]),
|
||||
"text": row["text"],
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
try:
|
||||
await client.publish(topic, json.dumps(payload), qos=1)
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"UPDATE mesh_messages SET status = 'queued', updated_at = NOW() WHERE id = $1",
|
||||
row["id"],
|
||||
)
|
||||
logger.info("Published pending message %s to %s", row["id"], topic)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to publish message %s: %s", row["id"], e)
|
||||
|
||||
await asyncio.sleep(2)
|
||||
except aiomqtt.MqttError as e:
|
||||
logger.error("MQTT outbound worker error: %s. Reconnecting in 5s...", e)
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# MQTT Inbound Consumer
|
||||
# ============================================================
|
||||
async def mqtt_inbound_consumer():
|
||||
"""Background task: consume mesh->cloud messages and store replies."""
|
||||
logger.info("Starting MQTT inbound consumer")
|
||||
while True:
|
||||
try:
|
||||
async with aiomqtt.Client(MQTT_HOST, MQTT_PORT) as client:
|
||||
await client.subscribe(MQTT_TOPIC_INBOUND)
|
||||
logger.info("Subscribed to %s", MQTT_TOPIC_INBOUND)
|
||||
async for message in client.messages:
|
||||
try:
|
||||
data = json.loads(message.payload.decode())
|
||||
await handle_inbound(data)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to process inbound message: %s", e)
|
||||
except aiomqtt.MqttError as e:
|
||||
logger.error("MQTT inbound consumer error: %s. Reconnecting in 5s...", e)
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
async def handle_inbound(data: dict):
|
||||
source_node_id = data.get("source_node_id")
|
||||
text = data.get("text", "")
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
# Try to match this inbound message to a user who has previously sent
|
||||
# an outbound message to this node. For v0.1 we attach it to the most
|
||||
# recent sender, or leave user_id NULL if unknown.
|
||||
user_row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT user_id FROM mesh_messages
|
||||
WHERE direction = 'outbound' AND target_node_id = $1 AND user_id IS NOT NULL
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
source_node_id,
|
||||
)
|
||||
user_id = user_row["user_id"] if user_row else None
|
||||
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO mesh_messages (
|
||||
id, user_id, direction, sender_node_id, gateway_node_id,
|
||||
text, status, hop_count, rssi, snr, created_at, updated_at
|
||||
) VALUES ($1, $2, 'inbound', $3, $4, $5, 'delivered', $6, $7, $8, NOW(), NOW())
|
||||
""",
|
||||
uuid.UUID(data.get("message_id")) if data.get("message_id") else uuid.uuid4(),
|
||||
user_id,
|
||||
source_node_id,
|
||||
data.get("gateway_node_id"),
|
||||
text,
|
||||
data.get("hop_count"),
|
||||
data.get("rssi"),
|
||||
data.get("snr"),
|
||||
)
|
||||
logger.info("Stored inbound message from %s", source_node_id)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# FastAPI App
|
||||
# ============================================================
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
global pool
|
||||
pool = await get_pool()
|
||||
t1 = asyncio.create_task(mqtt_outbound_worker())
|
||||
t2 = asyncio.create_task(mqtt_inbound_consumer())
|
||||
yield
|
||||
t1.cancel()
|
||||
t2.cancel()
|
||||
try:
|
||||
await t1
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
try:
|
||||
await t2
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await pool.close()
|
||||
|
||||
|
||||
app = FastAPI(title="KosmoConnect Gateway Service", lifespan=lifespan)
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"status": "ok", "service": "gateway"}
|
||||
|
||||
|
||||
@app.post("/api/v1/messages", status_code=202)
|
||||
async def send_message(req: SendMessageRequest, x_user_id: Optional[str] = Header(None)):
|
||||
if not x_user_id:
|
||||
raise HTTPException(status_code=401, detail="Missing X-User-ID header")
|
||||
|
||||
if not await can_send_to_node(x_user_id, req.target_node_id):
|
||||
raise HTTPException(status_code=403, detail="Subscription does not allow messaging this node")
|
||||
|
||||
if not await check_and_increment_quota(x_user_id):
|
||||
raise HTTPException(status_code=429, detail="Monthly message quota exceeded")
|
||||
|
||||
msg_id = uuid.uuid4()
|
||||
async with pool.acquire() as conn:
|
||||
await conn.execute(
|
||||
"""
|
||||
INSERT INTO mesh_messages (
|
||||
id, user_id, direction, target_node_id, text, status, created_at, updated_at
|
||||
) VALUES ($1, $2, 'outbound', $3, $4, 'pending', NOW(), NOW())
|
||||
""",
|
||||
msg_id,
|
||||
x_user_id,
|
||||
req.target_node_id,
|
||||
req.text,
|
||||
)
|
||||
|
||||
logger.info("Queued message %s from user %s to %s", msg_id, x_user_id, req.target_node_id)
|
||||
return SendMessageResponse(message_id=str(msg_id), status="pending", queued_at=datetime.now(timezone.utc))
|
||||
|
||||
|
||||
@app.get("/api/v1/messages/conversations")
|
||||
async def list_conversations(x_user_id: Optional[str] = Header(None)):
|
||||
if not x_user_id:
|
||||
raise HTTPException(status_code=401, detail="Missing X-User-ID header")
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
WITH user_msgs AS (
|
||||
SELECT
|
||||
CASE
|
||||
WHEN direction = 'outbound' THEN target_node_id
|
||||
ELSE sender_node_id
|
||||
END AS node_id,
|
||||
text,
|
||||
created_at,
|
||||
direction,
|
||||
ROW_NUMBER() OVER (PARTITION BY
|
||||
CASE
|
||||
WHEN direction = 'outbound' THEN target_node_id
|
||||
ELSE sender_node_id
|
||||
END
|
||||
ORDER BY created_at DESC
|
||||
) AS rn
|
||||
FROM mesh_messages
|
||||
WHERE user_id = $1
|
||||
)
|
||||
SELECT
|
||||
m.node_id,
|
||||
m.text AS latest_text,
|
||||
m.created_at AS latest_at,
|
||||
a.nickname,
|
||||
COUNT(*) FILTER (WHERE m.direction = 'inbound')::int AS unread_count
|
||||
FROM user_msgs m
|
||||
LEFT JOIN allowed_nodes a ON a.user_id = $1 AND a.mesh_node_id = m.node_id
|
||||
WHERE m.rn = 1
|
||||
GROUP BY m.node_id, m.text, m.created_at, a.nickname
|
||||
ORDER BY m.created_at DESC
|
||||
""",
|
||||
x_user_id,
|
||||
)
|
||||
|
||||
return {"data": [dict(r) for r in rows]}
|
||||
|
||||
|
||||
@app.get("/api/v1/messages/conversations/{node_id}")
|
||||
async def get_conversation(node_id: str, x_user_id: Optional[str] = Header(None)):
|
||||
if not x_user_id:
|
||||
raise HTTPException(status_code=401, detail="Missing X-User-ID header")
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT
|
||||
id::text,
|
||||
direction,
|
||||
sender_node_id,
|
||||
target_node_id,
|
||||
text,
|
||||
status,
|
||||
hop_count,
|
||||
rssi,
|
||||
snr,
|
||||
created_at
|
||||
FROM mesh_messages
|
||||
WHERE user_id = $1 AND (
|
||||
(direction = 'outbound' AND target_node_id = $2)
|
||||
OR
|
||||
(direction = 'inbound' AND sender_node_id = $2)
|
||||
)
|
||||
ORDER BY created_at ASC
|
||||
""",
|
||||
x_user_id,
|
||||
node_id,
|
||||
)
|
||||
|
||||
return {"data": [dict(r) for r in rows]}
|
||||
|
||||
|
||||
@app.get("/api/v1/messages/{message_id}")
|
||||
async def get_message_status(message_id: str, x_user_id: Optional[str] = Header(None)):
|
||||
if not x_user_id:
|
||||
raise HTTPException(status_code=401, detail="Missing X-User-ID header")
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"SELECT id::text, status, created_at, updated_at FROM mesh_messages WHERE id = $1 AND user_id = $2",
|
||||
message_id,
|
||||
x_user_id,
|
||||
)
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Message not found")
|
||||
return dict(row)
|
||||
35
backend/gateway/src/models.py
Normal file
35
backend/gateway/src/models.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SendMessageRequest(BaseModel):
|
||||
target_node_id: str
|
||||
text: str = Field(..., max_length=200)
|
||||
|
||||
|
||||
class SendMessageResponse(BaseModel):
|
||||
message_id: str
|
||||
status: str
|
||||
queued_at: datetime
|
||||
|
||||
|
||||
class MessageOut(BaseModel):
|
||||
id: str
|
||||
direction: str
|
||||
sender_node_id: Optional[str] = None
|
||||
target_node_id: Optional[str] = None
|
||||
text: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
hop_count: Optional[int] = None
|
||||
rssi: Optional[int] = None
|
||||
snr: Optional[float] = None
|
||||
created_at: datetime
|
||||
|
||||
|
||||
class ConversationSummary(BaseModel):
|
||||
node_id: str
|
||||
nickname: Optional[str] = None
|
||||
latest_text: Optional[str] = None
|
||||
latest_at: Optional[datetime] = None
|
||||
unread_count: int = 0
|
||||
Reference in New Issue
Block a user