import time import threading from typing import Any from fastapi import Depends, FastAPI, HTTPException, Request, status from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field from starlette.middleware.sessions import SessionMiddleware from app.auth import require_auth, verify_credentials from app.bitcoin_rpc import BitcoinRPCClient, BitcoinRPCError, parse_help_output from app.config import get_config from app.db import get_metric_history, get_settings, init_db, save_metric_point, save_settings from app.ssh_control import SSHControlError, build_start_command, run_remote_command cfg = get_config() CHART_WINDOW_SECONDS = { "5m": 5 * 60, "30m": 30 * 60, "2h": 2 * 60 * 60, "6h": 6 * 60 * 60, "all": None, } sampler_stop_event = threading.Event() sampler_thread: threading.Thread | None = None app = FastAPI(title="Bitcoin Core Admin Dashboard") app.add_middleware( SessionMiddleware, secret_key=cfg.session_secret, same_site="lax", https_only=cfg.session_cookie_secure, max_age=60 * 60 * 8, ) app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") @app.on_event("startup") def on_startup() -> None: global sampler_thread init_db() if sampler_thread and sampler_thread.is_alive(): return sampler_stop_event.clear() sampler_thread = threading.Thread( target=_metrics_sampler_loop, name="metrics-sampler", daemon=True, ) sampler_thread.start() @app.on_event("shutdown") def on_shutdown() -> None: global sampler_thread sampler_stop_event.set() if sampler_thread and sampler_thread.is_alive(): sampler_thread.join(timeout=2) sampler_thread = None @app.middleware("http") async def no_cache_for_api(request: Request, call_next): response = await call_next(request) if request.url.path.startswith("/api/") or request.url.path.startswith("/static/"): response.headers["Cache-Control"] = "no-store" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response @app.exception_handler(BitcoinRPCError) async def handle_rpc_error(_: Request, exc: BitcoinRPCError) -> JSONResponse: return JSONResponse(status_code=502, content={"detail": str(exc)}) @app.exception_handler(SSHControlError) async def handle_ssh_error(_: Request, exc: SSHControlError) -> JSONResponse: return JSONResponse(status_code=502, content={"detail": str(exc)}) class LoginPayload(BaseModel): username: str password: str class NodeSettingsPayload(BaseModel): rpc_url: str = Field(default="http://127.0.0.1:8332") rpc_username: str = "" rpc_password: str = "" rpc_wallet: str = "" config_path: str = "" ssh_host: str = "" ssh_port: int = 22 ssh_username: str = "" ssh_password: str = "" ssh_key_path: str = "" bitcoin_binary: str = "bitcoind" class RPCCallPayload(BaseModel): method: str params: list[Any] = Field(default_factory=list) wallet: str | None = None def _coerce_int(value: Any, default: int = 0) -> int: try: return int(value) except (TypeError, ValueError): return default def _window_to_since(window: str) -> int | None: if window not in CHART_WINDOW_SECONDS: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unsupported window '{window}'", ) seconds = CHART_WINDOW_SECONDS[window] if seconds is None: return None return int(time.time()) - seconds def _collect_core_results(client: BitcoinRPCClient) -> dict[str, Any]: return client.batch_call( [ ("getblockchaininfo", []), ("getnetworkinfo", []), ("getmempoolinfo", []), ("getmininginfo", []), ("uptime", []), ] ) def _store_metrics_from_core(core_results: dict[str, Any], updated_at: int) -> None: chain_data = core_results.get("getblockchaininfo") or {} network_data = core_results.get("getnetworkinfo") or {} mempool_data = core_results.get("getmempoolinfo") or {} save_metric_point( timestamp=updated_at, blocks=_coerce_int(chain_data.get("blocks"), 0), headers=_coerce_int(chain_data.get("headers"), 0), mempool_bytes=_coerce_int(mempool_data.get("bytes"), 0), peers=_coerce_int(network_data.get("connections"), 0), ) def _metrics_sampler_loop() -> None: interval = cfg.metrics_sampler_interval_seconds while not sampler_stop_event.is_set(): try: settings = get_settings() client = _rpc_client_from_settings(settings) core_results = _collect_core_results(client) _store_metrics_from_core(core_results, int(time.time())) except Exception: # Background sampler is best-effort. API requests surface actionable errors. pass sampler_stop_event.wait(interval) def _rpc_client_from_settings(settings: dict) -> BitcoinRPCClient: rpc_url = (settings.get("rpc_url") or "").strip() rpc_username = settings.get("rpc_username") or "" rpc_password = settings.get("rpc_password") or "" rpc_wallet = settings.get("rpc_wallet") or "" if not rpc_url: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Node RPC URL is not configured", ) if not rpc_username: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Node RPC username is not configured", ) if not rpc_password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Node RPC password is not configured", ) return BitcoinRPCClient( rpc_url=rpc_url, username=rpc_username, password=rpc_password, wallet=rpc_wallet, ) @app.get("/", response_class=HTMLResponse) def index(request: Request) -> HTMLResponse: return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/health") def health() -> dict: return {"ok": True} @app.get("/api/auth/me") def auth_me(request: Request) -> dict: username = request.session.get("username") return { "authenticated": bool(username and username == cfg.app_username), "username": username if username == cfg.app_username else None, } @app.post("/api/auth/login") def auth_login(payload: LoginPayload, request: Request) -> dict: if not verify_credentials(payload.username, payload.password): raise HTTPException(status_code=401, detail="Invalid username or password") request.session["username"] = payload.username return {"ok": True, "username": payload.username} @app.post("/api/auth/logout") def auth_logout(request: Request, _: str = Depends(require_auth)) -> dict: request.session.clear() return {"ok": True} @app.get("/api/settings") def read_settings(_: str = Depends(require_auth)) -> dict: return get_settings() @app.put("/api/settings") def update_settings(payload: NodeSettingsPayload, _: str = Depends(require_auth)) -> dict: values = payload.model_dump() values["rpc_url"] = values["rpc_url"].strip() for key in [ "rpc_username", "rpc_password", "rpc_wallet", "config_path", "ssh_host", "ssh_username", "ssh_password", "ssh_key_path", "bitcoin_binary", ]: values[key] = str(values.get(key, "")).strip() if values["ssh_port"] < 1 or values["ssh_port"] > 65535: raise HTTPException(status_code=422, detail="SSH port must be between 1 and 65535") return save_settings(values) @app.post("/api/rpc/call") def rpc_call(payload: RPCCallPayload, _: str = Depends(require_auth)) -> dict: settings = get_settings() client = _rpc_client_from_settings(settings) method = payload.method.strip() if not method: raise HTTPException(status_code=422, detail="RPC method is required") result = client.call(method=method, params=payload.params, wallet_override=payload.wallet) return {"method": method, "result": result} @app.get("/api/rpc/commands") def rpc_commands(_: str = Depends(require_auth)) -> dict: settings = get_settings() client = _rpc_client_from_settings(settings) help_text = client.call("help", []) if not isinstance(help_text, str): raise HTTPException(status_code=500, detail="Unexpected help response from node") commands = parse_help_output(help_text) return {"count": len(commands), "commands": commands} @app.get("/api/rpc/help/{method_name}") def rpc_method_help(method_name: str, _: str = Depends(require_auth)) -> dict: settings = get_settings() client = _rpc_client_from_settings(settings) details = client.call("help", [method_name]) return {"method": method_name, "help": details} @app.get("/api/dashboard/summary") def dashboard_summary(_: str = Depends(require_auth)) -> dict: settings = get_settings() client = _rpc_client_from_settings(settings) core_results = _collect_core_results(client) chain_data = core_results.get("getblockchaininfo") or {} network_data = core_results.get("getnetworkinfo") or {} mempool_data = core_results.get("getmempoolinfo") or {} mining_data = core_results.get("getmininginfo") or {} uptime_data = core_results.get("uptime") updated_at = int(time.time()) _store_metrics_from_core(core_results, updated_at) wallet_data = None wallet_name = settings.get("rpc_wallet") or "" if wallet_name: try: wallet_data = client.call("getwalletinfo", [], wallet_override=wallet_name) except BitcoinRPCError: wallet_data = None return { "blockchain": chain_data, "network": network_data, "mempool": mempool_data, "mining": mining_data, "uptime": uptime_data, "wallet": wallet_data, "updated_at": updated_at, } @app.get("/api/dashboard/history") def dashboard_history( window: str = "30m", limit: int = 5000, _: str = Depends(require_auth), ) -> dict: since_ts = _window_to_since(window) points = get_metric_history(since_ts=since_ts, limit=limit) return { "window": window, "count": len(points), "points": points, } @app.get("/api/dashboard/history/{window}") def dashboard_history_by_window( window: str, limit: int = 5000, _: str = Depends(require_auth), ) -> dict: since_ts = _window_to_since(window) points = get_metric_history(since_ts=since_ts, limit=limit) return { "window": window, "count": len(points), "points": points, } @app.post("/api/actions/stop") def stop_node(_: str = Depends(require_auth)) -> dict: settings = get_settings() client = _rpc_client_from_settings(settings) result = client.call("stop", []) return {"action": "stop", "result": result} @app.post("/api/actions/start") def start_node(_: str = Depends(require_auth)) -> dict: settings = get_settings() start_command = build_start_command(settings) remote = run_remote_command(settings, start_command) if remote["exit_code"] != 0: raise HTTPException( status_code=502, detail=f"Remote start command failed (exit {remote['exit_code']}): {remote['stderr'] or remote['stdout']}", ) return { "action": "start", "command": start_command, "remote": remote, } @app.post("/api/actions/restart") def restart_node(_: str = Depends(require_auth)) -> dict: settings = get_settings() stop_response = None stop_error = None try: client = _rpc_client_from_settings(settings) stop_response = client.call("stop", []) time.sleep(2) except (BitcoinRPCError, HTTPException) as exc: stop_error = str(exc) start_command = build_start_command(settings) remote = run_remote_command(settings, start_command) if remote["exit_code"] != 0: raise HTTPException( status_code=502, detail=f"Remote restart command failed (exit {remote['exit_code']}): {remote['stderr'] or remote['stdout']}", ) return { "action": "restart", "stop_result": stop_response, "stop_error": stop_error, "start_command": start_command, "remote": remote, }