diff --git a/architecture/gateway.md b/architecture/gateway.md index cc43374a4..9677dfc47 100644 --- a/architecture/gateway.md +++ b/architecture/gateway.md @@ -96,10 +96,10 @@ The gateway boots in `main()` (`crates/openshell-server/src/main.rs`) and procee 4. **Build `Config`** -- Assembles a `openshell_core::Config` from the parsed arguments. 5. **Call `run_server()`** (`crates/openshell-server/src/lib.rs`): 1. Connect to the persistence store (`Store::connect`), which auto-detects SQLite vs Postgres from the URL prefix and runs migrations. - 2. Create `ComputeRuntime` with the in-process Kubernetes compute backend (`KubernetesComputeDriver`). + 2. Create `ComputeRuntime` with an in-process `ComputeDriverService` backed by `KubernetesComputeDriver`, so the gateway calls the `openshell.compute.v1.ComputeDriver` RPC surface even without transport. 3. Build `ServerState` (shared via `Arc` across all handlers). 4. **Spawn background tasks**: - - `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, updates persisted sandbox records, and republishes platform events. + - `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, republishes platform events, and runs a periodic `ListSandboxes` snapshot reconcile so the store-backed public sandbox reads stay aligned with the compute driver. 5. Create `MultiplexService`. 6. Bind `TcpListener` on `config.bind_address`. 7. Optionally create `TlsAcceptor` from cert/key files. @@ -149,7 +149,7 @@ pub struct ServerState { ``` - **`store`** -- persistence backend (SQLite or Postgres) for all object types. -- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles orphaned `Provisioning` records that no longer have a backing compute resource. +- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles the store against `ComputeDriver/ListSandboxes` snapshots. - **`sandbox_index`** -- in-memory bidirectional index mapping sandbox names and agent pod names to sandbox IDs. Updated from compute-driver sandbox snapshots. - **`sandbox_watch_bus`** -- `broadcast`-based notification bus keyed by sandbox ID. Producers call `notify(&id)` when the persisted sandbox record changes; consumers in `WatchSandbox` streams receive `()` signals and re-read the record. - **`tracing_log_bus`** -- captures `tracing` events that include a `sandbox_id` field and republishes them as `SandboxLogLine` messages. Maintains a per-sandbox tail buffer (default 200 entries). Also contains a nested `PlatformEventBus` for compute-driver platform events. @@ -381,7 +381,7 @@ All buses use `tokio::sync::broadcast` channels keyed by sandbox ID. Buffer size Broadcast lag is translated to `Status::resource_exhausted` via `broadcast_to_status()`. -**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic orphan sweep for stale `Provisioning` records, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes. +**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic snapshot reconcile for sandboxes missing from the driver, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes. **Validation:** `WatchSandbox` validates that the sandbox exists before subscribing to any bus, preventing entries from being created for non-existent IDs. `PushSandboxLogs` validates sandbox existence once on the first batch of the stream. @@ -500,7 +500,7 @@ The Helm chart template is at `deploy/helm/openshell/templates/statefulset.yaml` ### Sandbox CRD Management -`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface. +`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface. The gateway binds to that driver through `ComputeDriverService` (`crates/openshell-driver-kubernetes/src/grpc.rs`) in-process, so the same `openshell.compute.v1.ComputeDriver` request and response types are exercised whether the driver is invoked locally or served over gRPC. - **Get**: `GetSandbox` looks up a sandbox CRD by name and returns a driver-native platform observation (`openshell.compute.v1.DriverSandbox`) with raw status and condition data from the object. - **List**: `ListSandboxes` enumerates sandbox CRDs and returns driver-native platform observations for each, sorted by name for stable results. @@ -517,6 +517,8 @@ The Kubernetes driver emits `WatchSandboxes` events through `proto/compute_drive - **Deleted**: Removes the sandbox record from the store and the index. Notifies the watch bus. - **Restarted**: Re-processes all objects (full resync). +In addition to the watch stream, `ComputeRuntime` periodically calls `ComputeDriver/ListSandboxes` through the in-process `ComputeDriverService` and reconciles the store to that full driver snapshot. Public `GetSandbox` and `ListSandboxes` handlers remain store-backed, but the store is refreshed from the driver on a timer so the gateway still exercises the compute-driver RPC surface for reconciliation. + ### Gateway Phase Derivation `ComputeRuntime::derive_phase()` (`crates/openshell-server/src/compute/mod.rs`) maps driver-native compute status to the public `SandboxPhase` exposed by `proto/openshell.proto`: diff --git a/crates/openshell-driver-kubernetes/src/grpc.rs b/crates/openshell-driver-kubernetes/src/grpc.rs index 67e0795ec..2c5a94467 100644 --- a/crates/openshell-driver-kubernetes/src/grpc.rs +++ b/crates/openshell-driver-kubernetes/src/grpc.rs @@ -102,7 +102,7 @@ impl ComputeDriver for ComputeDriverService { self.driver .create_sandbox(&sandbox) .await - .map_err(|err| Status::internal(err.to_string()))?; + .map_err(status_from_driver_error)?; Ok(Response::new(CreateSandboxResponse {})) } @@ -181,4 +181,12 @@ mod tests { assert_eq!(status.code(), tonic::Code::FailedPrecondition); assert_eq!(status.message(), "sandbox agent pod IP is not available"); } + + #[test] + fn already_exists_driver_errors_map_to_already_exists_status() { + let status = status_from_driver_error(KubernetesDriverError::AlreadyExists); + + assert_eq!(status.code(), tonic::Code::AlreadyExists); + assert_eq!(status.message(), "sandbox already exists"); + } } diff --git a/crates/openshell-server/src/compute/mod.rs b/crates/openshell-server/src/compute/mod.rs index f7802ea37..846782c65 100644 --- a/crates/openshell-server/src/compute/mod.rs +++ b/crates/openshell-server/src/compute/mod.rs @@ -4,22 +4,25 @@ //! Gateway-owned compute orchestration over a pluggable compute backend. use crate::grpc::policy::{SANDBOX_SETTINGS_OBJECT_TYPE, sandbox_settings_id}; -use crate::persistence::{ObjectId, ObjectName, ObjectType, Store}; +use crate::persistence::{ObjectId, ObjectName, ObjectRecord, ObjectType, Store}; use crate::sandbox_index::SandboxIndex; use crate::sandbox_watch::SandboxWatchBus; use crate::tracing_bus::TracingLogBus; use futures::{Stream, StreamExt}; use openshell_core::proto::compute::v1::{ - DriverCondition, DriverPlatformEvent, DriverResourceRequirements, DriverSandbox, - DriverSandboxSpec, DriverSandboxStatus, DriverSandboxTemplate, ResolveSandboxEndpointResponse, - WatchSandboxesEvent, sandbox_endpoint, watch_sandboxes_event, + CreateSandboxRequest, DeleteSandboxRequest, DriverCondition, DriverPlatformEvent, + DriverResourceRequirements, DriverSandbox, DriverSandboxSpec, DriverSandboxStatus, + DriverSandboxTemplate, GetCapabilitiesRequest, GetSandboxRequest, ListSandboxesRequest, + ResolveSandboxEndpointRequest, ResolveSandboxEndpointResponse, ValidateSandboxCreateRequest, + WatchSandboxesEvent, WatchSandboxesRequest, compute_driver_server::ComputeDriver, + sandbox_endpoint, watch_sandboxes_event, }; use openshell_core::proto::{ PlatformEvent, Sandbox, SandboxCondition, SandboxPhase, SandboxSpec, SandboxStatus, SandboxTemplate, SshSession, }; use openshell_driver_kubernetes::{ - KubernetesComputeConfig, KubernetesComputeDriver, KubernetesDriverError, + ComputeDriverService, KubernetesComputeConfig, KubernetesComputeDriver, }; use prost::Message; use std::fmt; @@ -27,11 +30,13 @@ use std::net::IpAddr; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tonic::Status; -use tracing::{debug, info, warn}; +use tokio::sync::Mutex; +use tonic::{Code, Request, Status}; +use tracing::{info, warn}; -type ComputeWatchStream = - Pin> + Send>>; +type DriverWatchStream = Pin> + Send>>; +type SharedComputeDriver = + Arc + Send + Sync>; /// Interval between store-vs-backend reconciliation sweeps. const RECONCILE_INTERVAL: Duration = Duration::from_secs(60); @@ -50,108 +55,21 @@ pub enum ComputeError { Message(String), } -impl From for ComputeError { - fn from(value: KubernetesDriverError) -> Self { - match value { - KubernetesDriverError::AlreadyExists => Self::AlreadyExists, - KubernetesDriverError::Precondition(message) => Self::Precondition(message), - KubernetesDriverError::Message(message) => Self::Message(message), - } - } -} - #[derive(Debug)] pub enum ResolvedEndpoint { Ip(IpAddr, u16), Host(String, u16), } -#[tonic::async_trait] -pub trait ComputeBackend: fmt::Debug + Send + Sync { - fn default_image(&self) -> &str; - async fn validate_sandbox_create(&self, sandbox: &DriverSandbox) -> Result<(), Status>; - async fn create_sandbox(&self, sandbox: &DriverSandbox) -> Result<(), ComputeError>; - async fn delete_sandbox(&self, sandbox_name: &str) -> Result; - async fn sandbox_exists(&self, sandbox_name: &str) -> Result; - async fn resolve_sandbox_endpoint( - &self, - sandbox: &DriverSandbox, - ) -> Result; - async fn watch_sandboxes(&self) -> Result; -} - -#[derive(Debug)] -pub struct InProcessKubernetesBackend { - driver: KubernetesComputeDriver, -} - -impl InProcessKubernetesBackend { - #[must_use] - pub fn new(driver: KubernetesComputeDriver) -> Self { - Self { driver } - } -} - -#[tonic::async_trait] -impl ComputeBackend for InProcessKubernetesBackend { - fn default_image(&self) -> &str { - self.driver.default_image() - } - - async fn validate_sandbox_create(&self, sandbox: &DriverSandbox) -> Result<(), Status> { - self.driver.validate_sandbox_create(sandbox).await - } - - async fn create_sandbox(&self, sandbox: &DriverSandbox) -> Result<(), ComputeError> { - self.driver - .create_sandbox(sandbox) - .await - .map_err(Into::into) - } - - async fn delete_sandbox(&self, sandbox_name: &str) -> Result { - self.driver - .delete_sandbox(sandbox_name) - .await - .map_err(ComputeError::Message) - } - - async fn sandbox_exists(&self, sandbox_name: &str) -> Result { - self.driver - .sandbox_exists(sandbox_name) - .await - .map_err(ComputeError::Message) - } - - async fn resolve_sandbox_endpoint( - &self, - sandbox: &DriverSandbox, - ) -> Result { - let response = self - .driver - .resolve_sandbox_endpoint(sandbox) - .await - .map_err(ComputeError::from)?; - resolved_endpoint_from_response(&response) - } - - async fn watch_sandboxes(&self) -> Result { - let stream = self - .driver - .watch_sandboxes() - .await - .map_err(ComputeError::Message)?; - Ok(Box::pin(stream.map(|item| item.map_err(Into::into)))) - } -} - #[derive(Clone)] pub struct ComputeRuntime { - backend: Arc, + driver: SharedComputeDriver, + default_image: String, store: Arc, sandbox_index: SandboxIndex, sandbox_watch_bus: SandboxWatchBus, tracing_log_bus: TracingLogBus, + sync_lock: Arc>, } impl fmt::Debug for ComputeRuntime { @@ -171,23 +89,37 @@ impl ComputeRuntime { let driver = KubernetesComputeDriver::new(config) .await .map_err(|err| ComputeError::Message(err.to_string()))?; + let driver: SharedComputeDriver = Arc::new(ComputeDriverService::new(driver)); + let default_image = driver + .get_capabilities(Request::new(GetCapabilitiesRequest {})) + .await + .map_err(compute_error_from_status)? + .into_inner() + .default_image; Ok(Self { - backend: Arc::new(InProcessKubernetesBackend::new(driver)), + driver, + default_image, store, sandbox_index, sandbox_watch_bus, tracing_log_bus, + sync_lock: Arc::new(Mutex::new(())), }) } #[must_use] pub fn default_image(&self) -> &str { - self.backend.default_image() + &self.default_image } pub async fn validate_sandbox_create(&self, sandbox: &Sandbox) -> Result<(), Status> { let driver_sandbox = driver_sandbox_from_public(sandbox); - self.backend.validate_sandbox_create(&driver_sandbox).await + self.driver + .validate_sandbox_create(Request::new(ValidateSandboxCreateRequest { + sandbox: Some(driver_sandbox), + })) + .await + .map(|_| ()) } pub async fn create_sandbox(&self, sandbox: Sandbox) -> Result { @@ -210,25 +142,34 @@ impl ComputeRuntime { .map_err(|e| Status::internal(format!("persist sandbox failed: {e}")))?; let driver_sandbox = driver_sandbox_from_public(&sandbox); - match self.backend.create_sandbox(&driver_sandbox).await { - Ok(()) => { + match self + .driver + .create_sandbox(Request::new(CreateSandboxRequest { + sandbox: Some(driver_sandbox), + })) + .await + { + Ok(_) => { self.sandbox_watch_bus.notify(&sandbox.id); Ok(sandbox) } - Err(ComputeError::AlreadyExists) => { + Err(status) if status.code() == Code::AlreadyExists => { let _ = self.store.delete(Sandbox::object_type(), &sandbox.id).await; self.sandbox_index.remove_sandbox(&sandbox.id); Err(Status::already_exists("sandbox already exists")) } - Err(ComputeError::Precondition(message)) => { + Err(status) if status.code() == Code::FailedPrecondition => { let _ = self.store.delete(Sandbox::object_type(), &sandbox.id).await; self.sandbox_index.remove_sandbox(&sandbox.id); - Err(Status::failed_precondition(message)) + Err(Status::failed_precondition(status.message().to_string())) } Err(err) => { let _ = self.store.delete(Sandbox::object_type(), &sandbox.id).await; self.sandbox_index.remove_sandbox(&sandbox.id); - Err(Status::internal(format!("create sandbox failed: {err}"))) + Err(Status::internal(format!( + "create sandbox failed: {}", + err.message() + ))) } } } @@ -283,11 +224,16 @@ impl ComputeRuntime { ); } + let driver_sandbox = driver_sandbox_from_public(&sandbox); let deleted = self - .backend - .delete_sandbox(&sandbox.name) + .driver + .delete_sandbox(Request::new(DeleteSandboxRequest { + sandbox_id: driver_sandbox.id, + sandbox_name: driver_sandbox.name, + })) .await - .map_err(|err| Status::internal(format!("delete sandbox failed: {err}")))?; + .map(|response| response.into_inner().deleted) + .map_err(|err| Status::internal(format!("delete sandbox failed: {}", err.message())))?; if !deleted && let Err(e) = self.store.delete(Sandbox::object_type(), &id).await { warn!(sandbox_id = %id, error = %e, "Failed to clean up store after delete"); @@ -302,12 +248,21 @@ impl ComputeRuntime { sandbox: &Sandbox, ) -> Result { let driver_sandbox = driver_sandbox_from_public(sandbox); - self.backend - .resolve_sandbox_endpoint(&driver_sandbox) + self.driver + .resolve_sandbox_endpoint(Request::new(ResolveSandboxEndpointRequest { + sandbox: Some(driver_sandbox), + })) .await - .map_err(|err| match err { - ComputeError::Precondition(message) => Status::failed_precondition(message), - other => Status::internal(other.to_string()), + .map(|response| response.into_inner()) + .map_err(|status| match status.code() { + Code::FailedPrecondition => { + Status::failed_precondition(status.message().to_string()) + } + _ => Status::internal(status.message().to_string()), + }) + .and_then(|response| { + resolved_endpoint_from_response(&response) + .map_err(|err| Status::internal(err.to_string())) }) } @@ -324,8 +279,12 @@ impl ComputeRuntime { async fn watch_loop(self: Arc) { loop { - let mut stream = match self.backend.watch_sandboxes().await { - Ok(stream) => stream, + let mut stream = match self + .driver + .watch_sandboxes(Request::new(WatchSandboxesRequest {})) + .await + { + Ok(response) => response.into_inner(), Err(err) => { warn!(error = %err, "Compute driver watch stream failed to start"); tokio::time::sleep(Duration::from_secs(2)).await; @@ -357,30 +316,39 @@ impl ComputeRuntime { } async fn reconcile_loop(self: Arc) { - // Let startup settle before pruning store records. - tokio::time::sleep(RECONCILE_INTERVAL).await; - loop { - if let Err(err) = self.reconcile_orphaned_sandboxes(ORPHAN_GRACE_PERIOD).await { + if let Err(err) = self.reconcile_store_with_backend(ORPHAN_GRACE_PERIOD).await { warn!(error = %err, "Store reconciliation sweep failed"); } tokio::time::sleep(RECONCILE_INTERVAL).await; } } - async fn reconcile_orphaned_sandboxes(&self, grace_period: Duration) -> Result<(), String> { + async fn reconcile_store_with_backend(&self, grace_period: Duration) -> Result<(), String> { + let sweep_started_at_ms = current_time_ms(); + let backend_sandboxes = self + .driver + .list_sandboxes(Request::new(ListSandboxesRequest {})) + .await + .map_err(|e| e.to_string())? + .into_inner() + .sandboxes; + let backend_ids = backend_sandboxes + .iter() + .map(|sandbox| sandbox.id.clone()) + .collect::>(); + + for sandbox in backend_sandboxes { + self.reconcile_snapshot_sandbox(sandbox, sweep_started_at_ms) + .await?; + } + let records = self .store .list(Sandbox::object_type(), 500, 0) .await .map_err(|e| e.to_string())?; - let now_ms = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() - .try_into() - .unwrap_or(i64::MAX); let grace_ms = grace_period.as_millis().try_into().unwrap_or(i64::MAX); for record in records { @@ -392,41 +360,12 @@ impl ComputeRuntime { } }; - if sandbox.phase != SandboxPhase::Provisioning as i32 { + if backend_ids.contains(&sandbox.id) { continue; } - let age_ms = now_ms.saturating_sub(record.created_at_ms); - if age_ms < grace_ms { - continue; - } - - match self.backend.sandbox_exists(&sandbox.name).await { - Ok(true) => {} - Ok(false) => { - info!( - sandbox_id = %sandbox.id, - sandbox_name = %sandbox.name, - age_secs = age_ms / 1000, - "Removing orphaned sandbox from store (no corresponding backend resource)" - ); - if let Err(err) = self.store.delete(Sandbox::object_type(), &sandbox.id).await { - warn!(sandbox_id = %sandbox.id, error = %err, "Failed to remove orphaned sandbox"); - continue; - } - self.sandbox_index.remove_sandbox(&sandbox.id); - self.sandbox_watch_bus.notify(&sandbox.id); - self.cleanup_sandbox_state(&sandbox.id); - } - Err(err) => { - debug!( - sandbox_id = %sandbox.id, - sandbox_name = %sandbox.name, - error = %err, - "Skipping orphan check due to backend error" - ); - } - } + self.prune_missing_sandbox(record, sweep_started_at_ms, grace_ms) + .await?; } Ok(()) @@ -462,11 +401,25 @@ impl ComputeRuntime { } async fn apply_sandbox_update(&self, incoming: DriverSandbox) -> Result<(), String> { + let _guard = self.sync_lock.lock().await; let existing = self .store - .get_message::(&incoming.id) + .get(Sandbox::object_type(), &incoming.id) .await .map_err(|e| e.to_string())?; + self.apply_sandbox_update_locked(incoming, existing).await + } + + async fn apply_sandbox_update_locked( + &self, + incoming: DriverSandbox, + existing_record: Option, + ) -> Result<(), String> { + let existing = existing_record + .as_ref() + .map(decode_sandbox_record) + .transpose()?; + let previous = existing.clone(); let mut status = incoming.status.as_ref().map(public_status_from_driver); rewrite_user_facing_conditions( @@ -520,6 +473,10 @@ impl ComputeRuntime { sandbox.status = status; sandbox.phase = phase as i32; + if previous.as_ref() == Some(&sandbox) { + return Ok(()); + } + self.sandbox_index.update_from_sandbox(&sandbox); self.store .put_message(&sandbox) @@ -530,6 +487,11 @@ impl ComputeRuntime { } async fn apply_deleted(&self, sandbox_id: &str) -> Result<(), String> { + let _guard = self.sync_lock.lock().await; + self.apply_deleted_locked(sandbox_id).await + } + + async fn apply_deleted_locked(&self, sandbox_id: &str) -> Result<(), String> { let _ = self .store .delete(Sandbox::object_type(), sandbox_id) @@ -546,6 +508,96 @@ impl ComputeRuntime { self.tracing_log_bus.platform_event_bus.remove(sandbox_id); self.sandbox_watch_bus.remove(sandbox_id); } + + async fn reconcile_snapshot_sandbox( + &self, + snapshot: DriverSandbox, + sweep_started_at_ms: i64, + ) -> Result<(), String> { + let _guard = self.sync_lock.lock().await; + let Some(existing) = self + .store + .get(Sandbox::object_type(), &snapshot.id) + .await + .map_err(|e| e.to_string())? + else { + return Ok(()); + }; + + if existing.updated_at_ms > sweep_started_at_ms { + return Ok(()); + } + + let Some(current) = self + .get_driver_sandbox(&snapshot.id, &snapshot.name) + .await? + else { + return Ok(()); + }; + + self.apply_sandbox_update_locked(current, Some(existing)) + .await + } + + async fn prune_missing_sandbox( + &self, + record: ObjectRecord, + sweep_started_at_ms: i64, + grace_ms: i64, + ) -> Result<(), String> { + let _guard = self.sync_lock.lock().await; + let Some(current_record) = self + .store + .get(Sandbox::object_type(), &record.id) + .await + .map_err(|e| e.to_string())? + else { + return Ok(()); + }; + + if current_record.updated_at_ms > sweep_started_at_ms { + return Ok(()); + } + + let sandbox = decode_sandbox_record(¤t_record)?; + let age_ms = current_time_ms().saturating_sub(current_record.created_at_ms); + if age_ms < grace_ms { + return Ok(()); + } + + if let Some(current) = self.get_driver_sandbox(&sandbox.id, &sandbox.name).await? { + return self + .apply_sandbox_update_locked(current, Some(current_record)) + .await; + } + + info!( + sandbox_id = %sandbox.id, + sandbox_name = %sandbox.name, + age_secs = age_ms / 1000, + "Removing sandbox from store after it disappeared from the compute driver snapshot" + ); + self.apply_deleted_locked(&sandbox.id).await + } + + async fn get_driver_sandbox( + &self, + sandbox_id: &str, + sandbox_name: &str, + ) -> Result, String> { + match self + .driver + .get_sandbox(Request::new(GetSandboxRequest { + sandbox_id: sandbox_id.to_string(), + sandbox_name: sandbox_name.to_string(), + })) + .await + { + Ok(response) => Ok(response.into_inner().sandbox), + Err(status) if status.code() == Code::NotFound => Ok(None), + Err(status) => Err(status.to_string()), + } + } } fn driver_sandbox_from_public(sandbox: &Sandbox) -> DriverSandbox { @@ -740,6 +792,27 @@ impl ObjectName for Sandbox { } } +fn compute_error_from_status(status: Status) -> ComputeError { + match status.code() { + Code::AlreadyExists => ComputeError::AlreadyExists, + Code::FailedPrecondition => ComputeError::Precondition(status.message().to_string()), + _ => ComputeError::Message(status.message().to_string()), + } +} + +fn current_time_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(i64::MAX) +} + +fn decode_sandbox_record(record: &ObjectRecord) -> Result { + Sandbox::decode(record.payload.as_slice()).map_err(|e| e.to_string()) +} + fn resolved_endpoint_from_response( response: &ResolveSandboxEndpointResponse, ) -> Result { @@ -854,63 +927,146 @@ fn is_terminal_failure_reason(reason: &str) -> bool { mod tests { use super::*; use futures::stream; + use openshell_core::proto::compute::v1::{ + CreateSandboxResponse, DeleteSandboxResponse, GetCapabilitiesResponse, GetSandboxRequest, + GetSandboxResponse, ResolveSandboxEndpointResponse, SandboxEndpoint, StopSandboxRequest, + StopSandboxResponse, ValidateSandboxCreateResponse, sandbox_endpoint, + }; use std::sync::Arc; #[derive(Debug, Default)] - struct TestBackend { - sandbox_exists: bool, + struct TestDriver { + listed_sandboxes: Vec, + current_sandboxes: Vec, resolve_precondition: Option, } #[tonic::async_trait] - impl ComputeBackend for TestBackend { - fn default_image(&self) -> &'static str { - "openshell/sandbox:test" + impl ComputeDriver for TestDriver { + type WatchSandboxesStream = DriverWatchStream; + + async fn get_capabilities( + &self, + _request: Request, + ) -> Result, Status> { + Ok(tonic::Response::new(GetCapabilitiesResponse { + driver_name: "test-driver".to_string(), + driver_version: "test".to_string(), + default_image: "openshell/sandbox:test".to_string(), + supports_gpu: true, + })) } - async fn validate_sandbox_create(&self, _sandbox: &DriverSandbox) -> Result<(), Status> { - Ok(()) + async fn validate_sandbox_create( + &self, + _request: Request, + ) -> Result, Status> { + Ok(tonic::Response::new(ValidateSandboxCreateResponse {})) } - async fn create_sandbox(&self, _sandbox: &DriverSandbox) -> Result<(), ComputeError> { - Ok(()) + async fn get_sandbox( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let current = if self.current_sandboxes.is_empty() { + &self.listed_sandboxes + } else { + &self.current_sandboxes + }; + let sandbox = current + .iter() + .find(|sandbox| { + sandbox.name == request.sandbox_name + && (request.sandbox_id.is_empty() || sandbox.id == request.sandbox_id) + }) + .cloned() + .ok_or_else(|| Status::not_found("sandbox not found"))?; + + if !request.sandbox_id.is_empty() && request.sandbox_id != sandbox.id { + return Err(Status::failed_precondition( + "sandbox_id did not match the fetched sandbox", + )); + } + + Ok(tonic::Response::new(GetSandboxResponse { + sandbox: Some(sandbox), + })) } - async fn delete_sandbox(&self, _sandbox_name: &str) -> Result { - Ok(true) + async fn list_sandboxes( + &self, + _request: Request, + ) -> Result< + tonic::Response, + Status, + > { + Ok(tonic::Response::new( + openshell_core::proto::compute::v1::ListSandboxesResponse { + sandboxes: self.listed_sandboxes.clone(), + }, + )) } - async fn sandbox_exists(&self, _sandbox_name: &str) -> Result { - Ok(self.sandbox_exists) + async fn create_sandbox( + &self, + _request: Request, + ) -> Result, Status> { + Ok(tonic::Response::new(CreateSandboxResponse {})) + } + + async fn stop_sandbox( + &self, + _request: Request, + ) -> Result, Status> { + Ok(tonic::Response::new(StopSandboxResponse {})) + } + + async fn delete_sandbox( + &self, + _request: Request, + ) -> Result, Status> { + Ok(tonic::Response::new(DeleteSandboxResponse { + deleted: true, + })) } async fn resolve_sandbox_endpoint( &self, - _sandbox: &DriverSandbox, - ) -> Result { + _request: Request, + ) -> Result, Status> { if let Some(message) = &self.resolve_precondition { - return Err(ComputeError::Precondition(message.clone())); + return Err(Status::failed_precondition(message.clone())); } - Ok(ResolvedEndpoint::Host( - "sandbox.default.svc.cluster.local".to_string(), - 2222, - )) + Ok(tonic::Response::new(ResolveSandboxEndpointResponse { + endpoint: Some(SandboxEndpoint { + target: Some(sandbox_endpoint::Target::Host( + "sandbox.default.svc.cluster.local".to_string(), + )), + port: 2222, + }), + })) } - async fn watch_sandboxes(&self) -> Result { - Ok(Box::pin(stream::empty())) + async fn watch_sandboxes( + &self, + _request: Request, + ) -> Result, Status> { + Ok(tonic::Response::new(Box::pin(stream::empty()))) } } - async fn test_runtime(backend: Arc) -> ComputeRuntime { + async fn test_runtime(driver: SharedComputeDriver) -> ComputeRuntime { let store = Arc::new(Store::connect("sqlite::memory:").await.unwrap()); ComputeRuntime { - backend, + driver, + default_image: "openshell/sandbox:test".to_string(), store, sandbox_index: SandboxIndex::new(), sandbox_watch_bus: SandboxWatchBus::new(), tracing_log_bus: TracingLogBus::new(), + sync_lock: Arc::new(Mutex::new(())), } } @@ -1111,9 +1267,22 @@ mod tests { assert_eq!(status.unwrap().conditions[0].message, original); } + #[test] + fn compute_error_from_status_preserves_driver_status_codes() { + assert!(matches!( + compute_error_from_status(Status::already_exists("sandbox already exists")), + ComputeError::AlreadyExists + )); + + assert!(matches!( + compute_error_from_status(Status::failed_precondition("sandbox agent pod IP is not available")), + ComputeError::Precondition(message) if message == "sandbox agent pod IP is not available" + )); + } + #[tokio::test] async fn apply_sandbox_update_allows_delete_failures_to_recover() { - let runtime = test_runtime(Arc::new(TestBackend::default())).await; + let runtime = test_runtime(Arc::new(TestDriver::default())).await; let sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Deleting); runtime.store.put_message(&sandbox).await.unwrap(); @@ -1155,9 +1324,9 @@ mod tests { #[tokio::test] async fn resolve_sandbox_endpoint_preserves_precondition_errors() { - let runtime = test_runtime(Arc::new(TestBackend { - sandbox_exists: true, + let runtime = test_runtime(Arc::new(TestDriver { resolve_precondition: Some("sandbox agent pod IP is not available".to_string()), + ..Default::default() })) .await; @@ -1166,13 +1335,181 @@ mod tests { .await .expect_err("endpoint resolution should preserve failed-precondition errors"); - assert_eq!(err.code(), tonic::Code::FailedPrecondition); + assert_eq!(err.code(), Code::FailedPrecondition); assert_eq!(err.message(), "sandbox agent pod IP is not available"); } #[tokio::test] - async fn reconcile_orphaned_sandboxes_removes_stale_provisioning_records() { - let runtime = test_runtime(Arc::new(TestBackend::default())).await; + async fn reconcile_store_with_backend_applies_driver_snapshot() { + let runtime = test_runtime(Arc::new(TestDriver { + listed_sandboxes: vec![DriverSandbox { + id: "sb-1".to_string(), + name: "sandbox-a".to_string(), + namespace: "default".to_string(), + spec: None, + status: Some(DriverSandboxStatus { + sandbox_name: "sandbox-a".to_string(), + instance_id: "agent-pod".to_string(), + agent_fd: String::new(), + sandbox_fd: String::new(), + conditions: vec![DriverCondition { + r#type: "Ready".to_string(), + status: "False".to_string(), + reason: "DependenciesNotReady".to_string(), + message: "Pod is Pending".to_string(), + last_transition_time: String::new(), + }], + deleting: false, + }), + }], + current_sandboxes: vec![DriverSandbox { + id: "sb-1".to_string(), + name: "sandbox-a".to_string(), + namespace: "default".to_string(), + spec: None, + status: Some(DriverSandboxStatus { + sandbox_name: "sandbox-a".to_string(), + instance_id: "agent-pod".to_string(), + agent_fd: String::new(), + sandbox_fd: String::new(), + conditions: vec![DriverCondition { + r#type: "Ready".to_string(), + status: "True".to_string(), + reason: "DependenciesReady".to_string(), + message: "Pod is Ready".to_string(), + last_transition_time: String::new(), + }], + deleting: false, + }), + }], + ..Default::default() + })) + .await; + + let sandbox = Sandbox { + spec: Some(SandboxSpec { + gpu: true, + ..Default::default() + }), + ..sandbox_record("sb-1", "sandbox-a", SandboxPhase::Provisioning) + }; + runtime.store.put_message(&sandbox).await.unwrap(); + runtime.sandbox_index.update_from_sandbox(&sandbox); + + runtime + .reconcile_store_with_backend(Duration::ZERO) + .await + .unwrap(); + + let stored = runtime + .store + .get_message::("sb-1") + .await + .unwrap() + .unwrap(); + assert_eq!( + SandboxPhase::try_from(stored.phase).unwrap(), + SandboxPhase::Ready + ); + assert!(stored.spec.as_ref().is_some_and(|spec| spec.gpu)); + } + + #[tokio::test] + async fn reconcile_store_with_backend_does_not_recreate_missing_record_from_snapshot() { + let runtime = test_runtime(Arc::new(TestDriver { + listed_sandboxes: vec![DriverSandbox { + id: "sb-1".to_string(), + name: "sandbox-a".to_string(), + namespace: "default".to_string(), + spec: None, + status: Some(make_driver_status(make_driver_condition( + "DependenciesNotReady", + "Pod exists with phase: Pending; Service Exists", + ))), + }], + current_sandboxes: vec![DriverSandbox { + id: "sb-1".to_string(), + name: "sandbox-a".to_string(), + namespace: "default".to_string(), + spec: None, + status: Some(make_driver_status(DriverCondition { + r#type: "Ready".to_string(), + status: "True".to_string(), + reason: "DependenciesReady".to_string(), + message: "Pod is Ready".to_string(), + last_transition_time: String::new(), + })), + }], + ..Default::default() + })) + .await; + + runtime + .reconcile_store_with_backend(Duration::ZERO) + .await + .unwrap(); + + assert!( + runtime + .store + .get_message::("sb-1") + .await + .unwrap() + .is_none() + ); + } + + #[tokio::test] + async fn reconcile_store_with_backend_rechecks_driver_before_pruning() { + let runtime = test_runtime(Arc::new(TestDriver { + current_sandboxes: vec![DriverSandbox { + id: "sb-1".to_string(), + name: "sandbox-a".to_string(), + namespace: "default".to_string(), + spec: None, + status: Some(DriverSandboxStatus { + sandbox_name: "sandbox-a".to_string(), + instance_id: "agent-pod".to_string(), + agent_fd: String::new(), + sandbox_fd: String::new(), + conditions: vec![DriverCondition { + r#type: "Ready".to_string(), + status: "True".to_string(), + reason: "DependenciesReady".to_string(), + message: "Pod is Ready".to_string(), + last_transition_time: String::new(), + }], + deleting: false, + }), + }], + ..Default::default() + })) + .await; + + let sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Provisioning); + runtime.store.put_message(&sandbox).await.unwrap(); + runtime.sandbox_index.update_from_sandbox(&sandbox); + + runtime + .reconcile_store_with_backend(Duration::ZERO) + .await + .unwrap(); + + let stored = runtime + .store + .get_message::("sb-1") + .await + .unwrap() + .unwrap(); + assert_eq!( + SandboxPhase::try_from(stored.phase).unwrap(), + SandboxPhase::Ready + ); + } + + #[tokio::test] + async fn reconcile_store_with_backend_removes_stale_provisioning_records() { + let runtime = test_runtime(Arc::new(TestDriver::default())).await; let sandbox = sandbox_record("sb-1", "sandbox-a", SandboxPhase::Provisioning); runtime.store.put_message(&sandbox).await.unwrap(); runtime.sandbox_index.update_from_sandbox(&sandbox); @@ -1180,7 +1517,7 @@ mod tests { let mut watch_rx = runtime.sandbox_watch_bus.subscribe(&sandbox.id); runtime - .reconcile_orphaned_sandboxes(Duration::ZERO) + .reconcile_store_with_backend(Duration::ZERO) .await .unwrap();