""" Sovereign Orchestrator - Proxmox API Client Async HTTP client for the Proxmox VE REST API using token-based authentication. All methods return parsed JSON dicts or raise on error. """ from __future__ import annotations import logging from typing import Any, Optional import httpx from app.config import settings logger = logging.getLogger(__name__) class ProxmoxError(Exception): """Raised when a Proxmox API call fails.""" def __init__(self, message: str, status_code: int | None = None) -> None: super().__init__(message) self.status_code = status_code class ProxmoxClient: """Async Proxmox VE API client with token authentication.""" def __init__( self, host: str | None = None, token_id: str | None = None, token_secret: str | None = None, verify_ssl: bool = False, timeout: float = 30.0, ) -> None: self.host = (host or settings.proxmox_host).rstrip("/") self.token_id = token_id or settings.proxmox_token_id self.token_secret = token_secret or settings.proxmox_token_secret self.verify_ssl = verify_ssl self.timeout = timeout if not self.token_id or not self.token_secret: logger.warning( "Proxmox token credentials are not configured. " "API calls will fail." ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _auth_header(self) -> dict[str, str]: """Build the PVEAPIToken authorization header.""" return { "Authorization": f"PVEAPIToken={self.token_id}={self.token_secret}" } def _url(self, path: str) -> str: """Build the full API URL for *path* (should start with /).""" return f"{self.host}/api2/json{path}" async def _request( self, method: str, path: str, *, params: dict[str, Any] | None = None, data: dict[str, Any] | None = None, json_body: dict[str, Any] | None = None, ) -> Any: """Execute an API request and return the ``data`` envelope.""" url = self._url(path) headers = self._auth_header() logger.debug("%s %s params=%s", method, url, params) async with httpx.AsyncClient( verify=self.verify_ssl, timeout=self.timeout ) as client: response = await client.request( method, url, headers=headers, params=params, data=data, json=json_body, ) if response.status_code >= 400: body = response.text[:500] logger.error( "Proxmox API error: %s %s -> %d: %s", method, path, response.status_code, body, ) raise ProxmoxError( f"Proxmox API {method} {path} returned {response.status_code}: {body}", status_code=response.status_code, ) payload = response.json() return payload.get("data", payload) # ------------------------------------------------------------------ # Node operations # ------------------------------------------------------------------ async def get_nodes(self) -> list[dict[str, Any]]: """List all cluster nodes.""" return await self._request("GET", "/nodes") # ------------------------------------------------------------------ # VM operations # ------------------------------------------------------------------ async def get_vms(self, node: str | None = None) -> list[dict[str, Any]]: """List VMs on *node*.""" node = node or settings.default_node return await self._request("GET", f"/nodes/{node}/qemu") async def get_vm_status( self, node: str | None = None, vmid: int | None = None ) -> dict[str, Any]: """Get current status of a single VM.""" node = node or settings.default_node vmid = vmid if vmid is not None else settings.default_vmid return await self._request( "GET", f"/nodes/{node}/qemu/{vmid}/status/current" ) async def start_vm( self, node: str | None = None, vmid: int | None = None ) -> str: """Start a VM. Returns the Proxmox UPID task string.""" node = node or settings.default_node vmid = vmid if vmid is not None else settings.default_vmid logger.info("Starting VM %d on %s", vmid, node) return await self._request( "POST", f"/nodes/{node}/qemu/{vmid}/status/start" ) async def stop_vm( self, node: str | None = None, vmid: int | None = None ) -> str: """Stop (hard) a VM. Returns the Proxmox UPID task string.""" node = node or settings.default_node vmid = vmid if vmid is not None else settings.default_vmid logger.info("Stopping VM %d on %s", vmid, node) return await self._request( "POST", f"/nodes/{node}/qemu/{vmid}/status/stop" ) async def destroy_vm( self, node: str | None = None, vmid: int | None = None, purge: bool = True, destroy_unreferenced_disks: bool = True, ) -> str: """Delete a VM and optionally purge its disks.""" node = node or settings.default_node vmid = vmid if vmid is not None else settings.default_vmid logger.info("Destroying VM %d on %s (purge=%s)", vmid, node, purge) params: dict[str, Any] = {} if purge: params["purge"] = 1 if destroy_unreferenced_disks: params["destroy-unreferenced-disks"] = 1 return await self._request( "DELETE", f"/nodes/{node}/qemu/{vmid}", params=params ) async def create_vm( self, node: str | None = None, vmid: int | None = None, config: dict[str, Any] | None = None, ) -> str: """Create a new VM with the given configuration dict.""" node = node or settings.default_node vmid = vmid if vmid is not None else settings.default_vmid config = config or {} config.setdefault("vmid", vmid) logger.info("Creating VM %d on %s with config: %s", vmid, node, config) return await self._request( "POST", f"/nodes/{node}/qemu", data=config ) # ------------------------------------------------------------------ # ISO / storage operations # ------------------------------------------------------------------ async def get_isos( self, node: str | None = None, storage: str | None = None, ) -> list[dict[str, Any]]: """List ISO images available on *storage*.""" node = node or settings.default_node storage = storage or settings.default_storage result = await self._request( "GET", f"/nodes/{node}/storage/{storage}/content", params={"content": "iso"}, ) # The API returns all content types; filter to ISOs just in case if isinstance(result, list): return [ item for item in result if item.get("content") == "iso" or item.get("volid", "").endswith(".iso") ] return result async def upload_iso( self, filepath: str, node: str | None = None, storage: str | None = None, ) -> str: """Upload a local ISO file to Proxmox storage. Uses the Proxmox upload endpoint which expects multipart form data. """ node = node or settings.default_node storage = storage or settings.default_storage url = self._url(f"/nodes/{node}/storage/{storage}/upload") headers = self._auth_header() logger.info("Uploading ISO %s to %s:%s", filepath, node, storage) async with httpx.AsyncClient( verify=self.verify_ssl, timeout=300.0 ) as client: with open(filepath, "rb") as fh: files = {"filename": (filepath.split("/")[-1], fh, "application/octet-stream")} data = {"content": "iso"} response = await client.post( url, headers=headers, data=data, files=files ) if response.status_code >= 400: body = response.text[:500] raise ProxmoxError( f"ISO upload failed ({response.status_code}): {body}", status_code=response.status_code, ) payload = response.json() task_id = payload.get("data", "") logger.info("Upload task started: %s", task_id) return task_id # ------------------------------------------------------------------ # Task helpers # ------------------------------------------------------------------ async def wait_for_task( self, node: str, upid: str, *, poll_interval: float = 2.0, max_wait: float = 120.0, ) -> dict[str, Any]: """Poll a Proxmox task until it completes or times out.""" import asyncio elapsed = 0.0 while elapsed < max_wait: status = await self._request( "GET", f"/nodes/{node}/tasks/{upid}/status" ) if status.get("status") == "stopped": return status await asyncio.sleep(poll_interval) elapsed += poll_interval raise ProxmoxError( f"Task {upid} did not complete within {max_wait}s" ) # Module-level singleton for convenience proxmox = ProxmoxClient()