Files
tellscoin/app/main.py
2026-02-15 16:28:38 +00:00

414 lines
12 KiB
Python

import time
import threading
from typing import Any
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
from starlette.middleware.sessions import SessionMiddleware
from app.auth import require_auth, verify_credentials
from app.bitcoin_rpc import BitcoinRPCClient, BitcoinRPCError, parse_help_output
from app.config import get_config
from app.db import get_metric_history, get_settings, init_db, save_metric_point, save_settings
from app.ssh_control import SSHControlError, build_start_command, run_remote_command
cfg = get_config()
CHART_WINDOW_SECONDS = {
"5m": 5 * 60,
"30m": 30 * 60,
"2h": 2 * 60 * 60,
"6h": 6 * 60 * 60,
"all": None,
}
sampler_stop_event = threading.Event()
sampler_thread: threading.Thread | None = None
app = FastAPI(title="Bitcoin Core Admin Dashboard")
app.add_middleware(
SessionMiddleware,
secret_key=cfg.session_secret,
same_site="lax",
https_only=cfg.session_cookie_secure,
max_age=60 * 60 * 8,
)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")
@app.on_event("startup")
def on_startup() -> None:
global sampler_thread
init_db()
if sampler_thread and sampler_thread.is_alive():
return
sampler_stop_event.clear()
sampler_thread = threading.Thread(
target=_metrics_sampler_loop,
name="metrics-sampler",
daemon=True,
)
sampler_thread.start()
@app.on_event("shutdown")
def on_shutdown() -> None:
global sampler_thread
sampler_stop_event.set()
if sampler_thread and sampler_thread.is_alive():
sampler_thread.join(timeout=2)
sampler_thread = None
@app.middleware("http")
async def no_cache_for_api(request: Request, call_next):
response = await call_next(request)
if request.url.path.startswith("/api/") or request.url.path.startswith("/static/"):
response.headers["Cache-Control"] = "no-store"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response
@app.exception_handler(BitcoinRPCError)
async def handle_rpc_error(_: Request, exc: BitcoinRPCError) -> JSONResponse:
return JSONResponse(status_code=502, content={"detail": str(exc)})
@app.exception_handler(SSHControlError)
async def handle_ssh_error(_: Request, exc: SSHControlError) -> JSONResponse:
return JSONResponse(status_code=502, content={"detail": str(exc)})
class LoginPayload(BaseModel):
username: str
password: str
class NodeSettingsPayload(BaseModel):
rpc_url: str = Field(default="http://127.0.0.1:8332")
rpc_username: str = ""
rpc_password: str = ""
rpc_wallet: str = ""
config_path: str = ""
ssh_host: str = ""
ssh_port: int = 22
ssh_username: str = ""
ssh_password: str = ""
ssh_key_path: str = ""
bitcoin_binary: str = "bitcoind"
class RPCCallPayload(BaseModel):
method: str
params: list[Any] = Field(default_factory=list)
wallet: str | None = None
def _coerce_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def _window_to_since(window: str) -> int | None:
if window not in CHART_WINDOW_SECONDS:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Unsupported window '{window}'",
)
seconds = CHART_WINDOW_SECONDS[window]
if seconds is None:
return None
return int(time.time()) - seconds
def _collect_core_results(client: BitcoinRPCClient) -> dict[str, Any]:
return client.batch_call(
[
("getblockchaininfo", []),
("getnetworkinfo", []),
("getmempoolinfo", []),
("getmininginfo", []),
("uptime", []),
]
)
def _store_metrics_from_core(core_results: dict[str, Any], updated_at: int) -> None:
chain_data = core_results.get("getblockchaininfo") or {}
network_data = core_results.get("getnetworkinfo") or {}
mempool_data = core_results.get("getmempoolinfo") or {}
save_metric_point(
timestamp=updated_at,
blocks=_coerce_int(chain_data.get("blocks"), 0),
headers=_coerce_int(chain_data.get("headers"), 0),
mempool_bytes=_coerce_int(mempool_data.get("bytes"), 0),
peers=_coerce_int(network_data.get("connections"), 0),
)
def _metrics_sampler_loop() -> None:
interval = cfg.metrics_sampler_interval_seconds
while not sampler_stop_event.is_set():
try:
settings = get_settings()
client = _rpc_client_from_settings(settings)
core_results = _collect_core_results(client)
_store_metrics_from_core(core_results, int(time.time()))
except Exception:
# Background sampler is best-effort. API requests surface actionable errors.
pass
sampler_stop_event.wait(interval)
def _rpc_client_from_settings(settings: dict) -> BitcoinRPCClient:
rpc_url = (settings.get("rpc_url") or "").strip()
rpc_username = settings.get("rpc_username") or ""
rpc_password = settings.get("rpc_password") or ""
rpc_wallet = settings.get("rpc_wallet") or ""
if not rpc_url:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Node RPC URL is not configured",
)
if not rpc_username:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Node RPC username is not configured",
)
if not rpc_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Node RPC password is not configured",
)
return BitcoinRPCClient(
rpc_url=rpc_url,
username=rpc_username,
password=rpc_password,
wallet=rpc_wallet,
)
@app.get("/", response_class=HTMLResponse)
def index(request: Request) -> HTMLResponse:
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/health")
def health() -> dict:
return {"ok": True}
@app.get("/api/auth/me")
def auth_me(request: Request) -> dict:
username = request.session.get("username")
return {
"authenticated": bool(username and username == cfg.app_username),
"username": username if username == cfg.app_username else None,
}
@app.post("/api/auth/login")
def auth_login(payload: LoginPayload, request: Request) -> dict:
if not verify_credentials(payload.username, payload.password):
raise HTTPException(status_code=401, detail="Invalid username or password")
request.session["username"] = payload.username
return {"ok": True, "username": payload.username}
@app.post("/api/auth/logout")
def auth_logout(request: Request, _: str = Depends(require_auth)) -> dict:
request.session.clear()
return {"ok": True}
@app.get("/api/settings")
def read_settings(_: str = Depends(require_auth)) -> dict:
return get_settings()
@app.put("/api/settings")
def update_settings(payload: NodeSettingsPayload, _: str = Depends(require_auth)) -> dict:
values = payload.model_dump()
values["rpc_url"] = values["rpc_url"].strip()
for key in [
"rpc_username",
"rpc_password",
"rpc_wallet",
"config_path",
"ssh_host",
"ssh_username",
"ssh_password",
"ssh_key_path",
"bitcoin_binary",
]:
values[key] = str(values.get(key, "")).strip()
if values["ssh_port"] < 1 or values["ssh_port"] > 65535:
raise HTTPException(status_code=422, detail="SSH port must be between 1 and 65535")
return save_settings(values)
@app.post("/api/rpc/call")
def rpc_call(payload: RPCCallPayload, _: str = Depends(require_auth)) -> dict:
settings = get_settings()
client = _rpc_client_from_settings(settings)
method = payload.method.strip()
if not method:
raise HTTPException(status_code=422, detail="RPC method is required")
result = client.call(method=method, params=payload.params, wallet_override=payload.wallet)
return {"method": method, "result": result}
@app.get("/api/rpc/commands")
def rpc_commands(_: str = Depends(require_auth)) -> dict:
settings = get_settings()
client = _rpc_client_from_settings(settings)
help_text = client.call("help", [])
if not isinstance(help_text, str):
raise HTTPException(status_code=500, detail="Unexpected help response from node")
commands = parse_help_output(help_text)
return {"count": len(commands), "commands": commands}
@app.get("/api/rpc/help/{method_name}")
def rpc_method_help(method_name: str, _: str = Depends(require_auth)) -> dict:
settings = get_settings()
client = _rpc_client_from_settings(settings)
details = client.call("help", [method_name])
return {"method": method_name, "help": details}
@app.get("/api/dashboard/summary")
def dashboard_summary(_: str = Depends(require_auth)) -> dict:
settings = get_settings()
client = _rpc_client_from_settings(settings)
core_results = _collect_core_results(client)
chain_data = core_results.get("getblockchaininfo") or {}
network_data = core_results.get("getnetworkinfo") or {}
mempool_data = core_results.get("getmempoolinfo") or {}
mining_data = core_results.get("getmininginfo") or {}
uptime_data = core_results.get("uptime")
updated_at = int(time.time())
_store_metrics_from_core(core_results, updated_at)
wallet_data = None
wallet_name = settings.get("rpc_wallet") or ""
if wallet_name:
try:
wallet_data = client.call("getwalletinfo", [], wallet_override=wallet_name)
except BitcoinRPCError:
wallet_data = None
return {
"blockchain": chain_data,
"network": network_data,
"mempool": mempool_data,
"mining": mining_data,
"uptime": uptime_data,
"wallet": wallet_data,
"updated_at": updated_at,
}
@app.get("/api/dashboard/history")
def dashboard_history(
window: str = "30m",
limit: int = 5000,
_: str = Depends(require_auth),
) -> dict:
since_ts = _window_to_since(window)
points = get_metric_history(since_ts=since_ts, limit=limit)
return {
"window": window,
"count": len(points),
"points": points,
}
@app.get("/api/dashboard/history/{window}")
def dashboard_history_by_window(
window: str,
limit: int = 5000,
_: str = Depends(require_auth),
) -> dict:
since_ts = _window_to_since(window)
points = get_metric_history(since_ts=since_ts, limit=limit)
return {
"window": window,
"count": len(points),
"points": points,
}
@app.post("/api/actions/stop")
def stop_node(_: str = Depends(require_auth)) -> dict:
settings = get_settings()
client = _rpc_client_from_settings(settings)
result = client.call("stop", [])
return {"action": "stop", "result": result}
@app.post("/api/actions/start")
def start_node(_: str = Depends(require_auth)) -> dict:
settings = get_settings()
start_command = build_start_command(settings)
remote = run_remote_command(settings, start_command)
if remote["exit_code"] != 0:
raise HTTPException(
status_code=502,
detail=f"Remote start command failed (exit {remote['exit_code']}): {remote['stderr'] or remote['stdout']}",
)
return {
"action": "start",
"command": start_command,
"remote": remote,
}
@app.post("/api/actions/restart")
def restart_node(_: str = Depends(require_auth)) -> dict:
settings = get_settings()
stop_response = None
stop_error = None
try:
client = _rpc_client_from_settings(settings)
stop_response = client.call("stop", [])
time.sleep(2)
except (BitcoinRPCError, HTTPException) as exc:
stop_error = str(exc)
start_command = build_start_command(settings)
remote = run_remote_command(settings, start_command)
if remote["exit_code"] != 0:
raise HTTPException(
status_code=502,
detail=f"Remote restart command failed (exit {remote['exit_code']}): {remote['stderr'] or remote['stdout']}",
)
return {
"action": "restart",
"stop_result": stop_response,
"stop_error": stop_error,
"start_command": start_command,
"remote": remote,
}