Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions architecture/gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ The gateway boots in `main()` (`crates/openshell-server/src/main.rs`) and procee
4. **Build `Config`** -- Assembles a `openshell_core::Config` from the parsed arguments.
5. **Call `run_server()`** (`crates/openshell-server/src/lib.rs`):
1. Connect to the persistence store (`Store::connect`), which auto-detects SQLite vs Postgres from the URL prefix and runs migrations.
2. Create `ComputeRuntime` with the in-process Kubernetes compute backend (`KubernetesComputeDriver`).
2. Create `ComputeRuntime` with an in-process `ComputeDriverService` backed by `KubernetesComputeDriver`, so the gateway calls the `openshell.compute.v1.ComputeDriver` RPC surface even without transport.
3. Build `ServerState` (shared via `Arc<ServerState>` across all handlers).
4. **Spawn background tasks**:
- `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, updates persisted sandbox records, and republishes platform events.
- `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, republishes platform events, and runs a periodic `ListSandboxes` snapshot reconcile so the store-backed public sandbox reads stay aligned with the compute driver.
5. Create `MultiplexService`.
6. Bind `TcpListener` on `config.bind_address`.
7. Optionally create `TlsAcceptor` from cert/key files.
Expand Down Expand Up @@ -149,7 +149,7 @@ pub struct ServerState {
```

- **`store`** -- persistence backend (SQLite or Postgres) for all object types.
- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles orphaned `Provisioning` records that no longer have a backing compute resource.
- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles the store against `ComputeDriver/ListSandboxes` snapshots.
- **`sandbox_index`** -- in-memory bidirectional index mapping sandbox names and agent pod names to sandbox IDs. Updated from compute-driver sandbox snapshots.
- **`sandbox_watch_bus`** -- `broadcast`-based notification bus keyed by sandbox ID. Producers call `notify(&id)` when the persisted sandbox record changes; consumers in `WatchSandbox` streams receive `()` signals and re-read the record.
- **`tracing_log_bus`** -- captures `tracing` events that include a `sandbox_id` field and republishes them as `SandboxLogLine` messages. Maintains a per-sandbox tail buffer (default 200 entries). Also contains a nested `PlatformEventBus` for compute-driver platform events.
Expand Down Expand Up @@ -381,7 +381,7 @@ All buses use `tokio::sync::broadcast` channels keyed by sandbox ID. Buffer size

Broadcast lag is translated to `Status::resource_exhausted` via `broadcast_to_status()`.

**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic orphan sweep for stale `Provisioning` records, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.
**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic snapshot reconcile for sandboxes missing from the driver, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.

**Validation:** `WatchSandbox` validates that the sandbox exists before subscribing to any bus, preventing entries from being created for non-existent IDs. `PushSandboxLogs` validates sandbox existence once on the first batch of the stream.

Expand Down Expand Up @@ -500,7 +500,7 @@ The Helm chart template is at `deploy/helm/openshell/templates/statefulset.yaml`

### Sandbox CRD Management

`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface.
`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface. The gateway binds to that driver through `ComputeDriverService` (`crates/openshell-driver-kubernetes/src/grpc.rs`) in-process, so the same `openshell.compute.v1.ComputeDriver` request and response types are exercised whether the driver is invoked locally or served over gRPC.

- **Get**: `GetSandbox` looks up a sandbox CRD by name and returns a driver-native platform observation (`openshell.compute.v1.DriverSandbox`) with raw status and condition data from the object.
- **List**: `ListSandboxes` enumerates sandbox CRDs and returns driver-native platform observations for each, sorted by name for stable results.
Expand All @@ -517,6 +517,8 @@ The Kubernetes driver emits `WatchSandboxes` events through `proto/compute_drive
- **Deleted**: Removes the sandbox record from the store and the index. Notifies the watch bus.
- **Restarted**: Re-processes all objects (full resync).

In addition to the watch stream, `ComputeRuntime` periodically calls `ComputeDriver/ListSandboxes` through the in-process `ComputeDriverService` and reconciles the store to that full driver snapshot. Public `GetSandbox` and `ListSandboxes` handlers remain store-backed, but the store is refreshed from the driver on a timer so the gateway still exercises the compute-driver RPC surface for reconciliation.

### Gateway Phase Derivation

`ComputeRuntime::derive_phase()` (`crates/openshell-server/src/compute/mod.rs`) maps driver-native compute status to the public `SandboxPhase` exposed by `proto/openshell.proto`:
Expand Down
10 changes: 9 additions & 1 deletion crates/openshell-driver-kubernetes/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ComputeDriver for ComputeDriverService {
self.driver
.create_sandbox(&sandbox)
.await
.map_err(|err| Status::internal(err.to_string()))?;
.map_err(status_from_driver_error)?;
Ok(Response::new(CreateSandboxResponse {}))
}

Expand Down Expand Up @@ -181,4 +181,12 @@ mod tests {
assert_eq!(status.code(), tonic::Code::FailedPrecondition);
assert_eq!(status.message(), "sandbox agent pod IP is not available");
}

#[test]
fn already_exists_driver_errors_map_to_already_exists_status() {
let status = status_from_driver_error(KubernetesDriverError::AlreadyExists);

assert_eq!(status.code(), tonic::Code::AlreadyExists);
assert_eq!(status.message(), "sandbox already exists");
}
}
Loading
Loading