import sqlite3 from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path from app.config import get_config SETTINGS_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS node_settings ( id INTEGER PRIMARY KEY CHECK (id = 1), rpc_url TEXT NOT NULL DEFAULT '', rpc_username TEXT NOT NULL DEFAULT '', rpc_password TEXT NOT NULL DEFAULT '', rpc_wallet TEXT NOT NULL DEFAULT '', config_path TEXT NOT NULL DEFAULT '', ssh_host TEXT NOT NULL DEFAULT '', ssh_port INTEGER NOT NULL DEFAULT 22, ssh_username TEXT NOT NULL DEFAULT '', ssh_password TEXT NOT NULL DEFAULT '', ssh_key_path TEXT NOT NULL DEFAULT '', bitcoin_binary TEXT NOT NULL DEFAULT 'bitcoind', updated_at TEXT NOT NULL ); """ METRICS_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS metrics_history ( ts INTEGER PRIMARY KEY, blocks INTEGER NOT NULL, headers INTEGER NOT NULL, mempool_bytes INTEGER NOT NULL, peers INTEGER NOT NULL ); """ METRICS_INDEX_SQL = """ CREATE INDEX IF NOT EXISTS idx_metrics_history_ts ON metrics_history (ts); """ DEFAULT_SETTINGS = { "rpc_url": "http://127.0.0.1:8332", "rpc_username": "", "rpc_password": "", "rpc_wallet": "", "config_path": "", "ssh_host": "", "ssh_port": 22, "ssh_username": "", "ssh_password": "", "ssh_key_path": "", "bitcoin_binary": "bitcoind", "updated_at": "", } METRICS_RETENTION_SECONDS = 60 * 60 * 24 * 30 METRICS_MAX_ROWS = 20000 def _ensure_db_parent(db_path: Path) -> None: db_path.parent.mkdir(parents=True, exist_ok=True) @contextmanager def _connect() -> sqlite3.Connection: cfg = get_config() db_path = cfg.database_path _ensure_db_parent(db_path) conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def init_db() -> None: with _connect() as conn: conn.execute(SETTINGS_TABLE_SQL) conn.execute(METRICS_TABLE_SQL) conn.execute(METRICS_INDEX_SQL) conn.commit() def get_settings() -> dict: with _connect() as conn: row = conn.execute("SELECT * FROM node_settings WHERE id = 1").fetchone() if not row: return DEFAULT_SETTINGS.copy() data = dict(row) for key, value in DEFAULT_SETTINGS.items(): data.setdefault(key, value) return data def save_settings(values: dict) -> dict: current = get_settings() merged = {**current, **values} merged["updated_at"] = datetime.now(timezone.utc).isoformat() with _connect() as conn: conn.execute( """ INSERT INTO node_settings ( id, rpc_url, rpc_username, rpc_password, rpc_wallet, config_path, ssh_host, ssh_port, ssh_username, ssh_password, ssh_key_path, bitcoin_binary, updated_at ) VALUES ( 1, :rpc_url, :rpc_username, :rpc_password, :rpc_wallet, :config_path, :ssh_host, :ssh_port, :ssh_username, :ssh_password, :ssh_key_path, :bitcoin_binary, :updated_at ) ON CONFLICT(id) DO UPDATE SET rpc_url = excluded.rpc_url, rpc_username = excluded.rpc_username, rpc_password = excluded.rpc_password, rpc_wallet = excluded.rpc_wallet, config_path = excluded.config_path, ssh_host = excluded.ssh_host, ssh_port = excluded.ssh_port, ssh_username = excluded.ssh_username, ssh_password = excluded.ssh_password, ssh_key_path = excluded.ssh_key_path, bitcoin_binary = excluded.bitcoin_binary, updated_at = excluded.updated_at """, merged, ) conn.commit() return merged def save_metric_point( *, timestamp: int, blocks: int, headers: int, mempool_bytes: int, peers: int, ) -> None: with _connect() as conn: conn.execute( """ INSERT INTO metrics_history (ts, blocks, headers, mempool_bytes, peers) VALUES (?, ?, ?, ?, ?) ON CONFLICT(ts) DO UPDATE SET blocks = excluded.blocks, headers = excluded.headers, mempool_bytes = excluded.mempool_bytes, peers = excluded.peers """, (timestamp, blocks, headers, mempool_bytes, peers), ) cutoff = timestamp - METRICS_RETENTION_SECONDS conn.execute("DELETE FROM metrics_history WHERE ts < ?", (cutoff,)) conn.execute( """ DELETE FROM metrics_history WHERE ts NOT IN ( SELECT ts FROM metrics_history ORDER BY ts DESC LIMIT ? ) """, (METRICS_MAX_ROWS,), ) conn.commit() def get_metric_history(*, since_ts: int | None = None, limit: int = 5000) -> list[dict]: if limit < 1: limit = 1 if limit > METRICS_MAX_ROWS: limit = METRICS_MAX_ROWS with _connect() as conn: if since_ts is None: rows = conn.execute( """ SELECT ts, blocks, headers, mempool_bytes, peers FROM metrics_history ORDER BY ts DESC LIMIT ? """, (limit,), ).fetchall() rows = list(reversed(rows)) else: rows = conn.execute( """ SELECT ts, blocks, headers, mempool_bytes, peers FROM metrics_history WHERE ts >= ? ORDER BY ts ASC LIMIT ? """, (since_ts, limit), ).fetchall() return [dict(row) for row in rows]