diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a18397..9a24751 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,8 +27,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **M16 — JSON codec extracted behind the `json-serialize` feature; `RecordValue::as_json()` now works on `no_std + alloc`, not just `std` ([Design 032](docs/design/032-M16-aimx-json-codec.md)).** New `aimdb-core::codec` module: `RemoteSerialize` (blanket-impl'd for every `serde` `Serialize + DeserializeOwned` type), the object-safe `JsonCodec`, and the zero-sized `SerdeJsonCodec`. `serde_json` runs on `alloc`, so embedded targets can opt in; `std` enables the feature transitively, so std builds are unaffected. ([aimdb-core](aimdb-core/CHANGELOG.md)) +- **Embassy buffer + join-queue tests now run in CI (Issue #85).** The join-queue tests previously sat behind `embassy-runtime`, which pulls `embassy-executor`'s cortex-m assembly and can't compile under `cargo test` on x86_64 — so ordering / backpressure / clone-routing regressions were never caught. The `join_queue` module is now gated on `embassy-sync`, and `make test` runs the embassy adapter's unit tests + doctests on the host (no executor). Also adds `EmbassyBuffer::peek()` and fixes a stale `EmbassyBuffer` doc example. ([aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md)) + ### Changed (breaking) +- **M15 — `latest_snapshot` removed; point-in-time reads go through the new buffer-native `DynBuffer::peek()` ([Design 031](docs/design/031-M15-remove-latest-snapshot.md)).** `TypedRecord::latest()` and AimX `record.get` read the buffer directly instead of a per-record snapshot mutex (one lock + clone off the `produce()` hot path). Consequences: a `.with_remote_access()` record with **no buffer** now fails `build()` (was a silent runtime no-op); `record.get` / `latest()` on an `SpmcRing` record returns `not_found` / `None` (rings have no canonical latest — use `record.drain` / `record.subscribe`); `SingleLatest` and `Mailbox` are unaffected. `TypedRecord::produce` is removed — all writes go through `WriteHandle::push`. Adapters implement `peek()` per buffer type. ([aimdb-core](aimdb-core/CHANGELOG.md), [aimdb-tokio-adapter](aimdb-tokio-adapter/CHANGELOG.md), [aimdb-embassy-adapter](aimdb-embassy-adapter/CHANGELOG.md), [aimdb-wasm-adapter](aimdb-wasm-adapter/CHANGELOG.md)) +- **M16 — `with_remote_access()` now requires the `json-serialize` feature (transitively enabled by `std`); `with_read_only_serialization()` removed ([Design 032](docs/design/032-M16-aimx-json-codec.md)).** The stored serializer/deserializer closures are replaced by a type-erased `Arc>`. A `Serialize`-only record can no longer be exposed read-only over remote access. ([aimdb-core](aimdb-core/CHANGELOG.md)) + - **M13 — `Spawn` trait removed across the workspace; `AimDbBuilder::build()` now returns `(AimDb, AimDbRunner)` (Issue #88, [Design 028](docs/design/028-M13-remove-spawn-trait.md)).** Every future the database drives — producer services, taps, transforms, join forwarders, connector loops, the remote-access supervisor, `on_start` tasks — is collected at build time and driven by a single `FuturesUnordered` inside `runner.run().await`. Adapter implementations (`TokioAdapter`, `EmbassyAdapter`, `WasmAdapter`) drop their `impl Spawn`. The `embassy-task-pool-8/16/32` features are deleted and `EmbassyAdapter::new_with_network` no longer takes a `Spawner`. Connector authors must update `ConnectorBuilder::build()` to return `Vec` instead of `Arc`. See each crate's CHANGELOG for the per-crate impact. ## [1.1.0] - 2026-05-22 diff --git a/Cargo.lock b/Cargo.lock index f25860c..650947a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ "embassy-net", "embassy-sync", "embassy-time", + "embassy-time-driver", "embedded-hal 0.2.7", "embedded-hal-async", "embedded-hal-nb", @@ -1173,9 +1174,10 @@ dependencies = [ [[package]] name = "embassy-usb-driver" -version = "0.2.1" +version = "0.2.2" dependencies = [ "defmt 1.0.1", + "embedded-io-async 0.6.1", "embedded-io-async 0.7.0", ] diff --git a/Makefile b/Makefile index cdcec40..91ad9d1 100644 --- a/Makefile +++ b/Makefile @@ -104,6 +104,8 @@ test: cargo test --package aimdb-core --no-default-features --features "alloc,profiling" @printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + metrics)$(NC)\n" cargo test --package aimdb-core --no-default-features --features "alloc,metrics" + @printf "$(YELLOW) → Testing aimdb-core (no_std + alloc + json-serialize)$(NC)\n" + cargo test --package aimdb-core --no-default-features --features "alloc,json-serialize" @printf "$(YELLOW) → Testing aimdb-core remote module$(NC)\n" cargo test --package aimdb-core --lib --features "std" remote:: @printf "$(YELLOW) → Testing tokio adapter$(NC)\n" @@ -112,6 +114,8 @@ test: cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,metrics" @printf "$(YELLOW) → Testing tokio adapter (with profiling)$(NC)\n" cargo test --package aimdb-tokio-adapter --features "tokio-runtime,tracing,profiling" + @printf "$(YELLOW) → Testing embassy adapter (host, no executor: buffers, join-queue, doctests)$(NC)\n" + cargo test --package aimdb-embassy-adapter --no-default-features --features "alloc,embassy-sync,embassy-time" @printf "$(YELLOW) → Testing sync wrapper$(NC)\n" cargo test --package aimdb-sync @printf "$(YELLOW) → Testing codegen library$(NC)\n" @@ -167,6 +171,8 @@ clippy: cargo clippy --package aimdb-data-contracts --no-default-features --features alloc -- -D warnings @printf "$(YELLOW) → Clippy on aimdb-core (no_std + alloc)$(NC)\n" cargo clippy --package aimdb-core --no-default-features --features alloc --all-targets -- -D warnings + @printf "$(YELLOW) → Clippy on aimdb-core (no_std + alloc + json-serialize)$(NC)\n" + cargo clippy --package aimdb-core --no-default-features --features "alloc,json-serialize" --all-targets -- -D warnings @printf "$(YELLOW) → Clippy on aimdb-core (std)$(NC)\n" cargo clippy --package aimdb-core --features "std,tracing,metrics" --all-targets -- -D warnings @printf "$(YELLOW) → Clippy on tokio adapter$(NC)\n" @@ -253,6 +259,8 @@ test-embedded: cargo check --package aimdb-data-contracts --target thumbv7em-none-eabihf --no-default-features --features alloc @printf "$(YELLOW) → Checking aimdb-core (no_std minimal) on thumbv7em-none-eabihf target$(NC)\n" cargo check --package aimdb-core --target thumbv7em-none-eabihf --no-default-features --features alloc + @printf "$(YELLOW) → Checking aimdb-core (no_std + alloc + json-serialize) on thumbv7em-none-eabihf target$(NC)\n" + cargo check --package aimdb-core --target thumbv7em-none-eabihf --no-default-features --features "alloc,json-serialize" @printf "$(YELLOW) → Checking aimdb-core (no_std/embassy) on thumbv7em-none-eabihf target$(NC)\n" cargo check --package aimdb-core --target thumbv7em-none-eabihf --no-default-features --features alloc @printf "$(YELLOW) → Checking aimdb-embassy-adapter on thumbv7em-none-eabihf target$(NC)\n" diff --git a/_external/embassy b/_external/embassy index 8fe4094..664d4ea 160000 --- a/_external/embassy +++ b/_external/embassy @@ -1 +1 @@ -Subproject commit 8fe40948b9a474b78d461a35f656407528157c40 +Subproject commit 664d4ead36bb24a63955ca649bcec66c6e70bf6d diff --git a/aimdb-core/CHANGELOG.md b/aimdb-core/CHANGELOG.md index b1f1d19..16accd3 100644 --- a/aimdb-core/CHANGELOG.md +++ b/aimdb-core/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`json-serialize` feature + `codec` module (M16, Design 032).** New `crate::codec` module with `RemoteSerialize` (capability trait, blanket-impl'd for every `serde` `Serialize + DeserializeOwned` type), the object-safe `JsonCodec` storage trait, and the zero-sized `SerdeJsonCodec`. All three are re-exported from the crate root. The feature is `no_std + alloc` compatible (`serde_json` runs on `alloc`), so `RecordValue::as_json()` now works on embedded targets, not just `std`. `std` enables `json-serialize` transitively, so existing std builds are unaffected. +- **`DynBuffer::peek(&self) -> Option` (M15, Design 031).** Non-destructive, buffer-native point-in-time read; the default impl returns `None` (correct for buffers with no canonical latest, e.g. broadcast/SPMC rings). AimX `record.get` and `TypedRecord::latest()` now route through it. Adapters implement it per buffer type — see the tokio/embassy adapter changelogs. + ### Internal refactors - **AimX remote-access path is now spawn-free (Issue #114, Design 030).** Every remaining `tokio::spawn` in `aimdb-core/src/remote/` was removed; the supervisor's accept loop and each connection handler now own their own `FuturesUnordered` driven by `tokio::select! { biased; }`. Cancellation collapsed to one mechanism — dropping the future. @@ -17,6 +22,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed (breaking) +- **`latest_snapshot` removed from `TypedRecord`; `latest()` / AimX `record.get` read the buffer via `peek()` (M15, Design 031).** Eliminates one snapshot-mutex lock + `Option` clone per `produce()` on the hot path. Behavioural consequences: + - A record configured with `.with_remote_access()` but **no buffer** now fails `build()` with a clear error (previously a silent runtime no-op — reads returned `not_found`, writes were discarded). Add a buffer, e.g. `.buffer(BufferCfg::SingleLatest)`. + - `record.get` / `latest()` on an `SpmcRing` record now returns `not_found` / `None` — a ring keeps per-consumer history with no canonical latest. Use `record.drain` (history) or `record.subscribe` (live). `SingleLatest` and `Mailbox` are unaffected. + - On `no_std`/embedded, `latest()` now depends on the adapter implementing `peek()` (the Embassy adapter does — see its changelog). +- **`with_remote_access()` is now gated on `json-serialize` and bounded on `T: codec::RemoteSerialize` (M16, Design 032).** Same effective bound as before (`Serialize + DeserializeOwned`, blanket-impl'd), but the stored serializer/deserializer closures are replaced by a single type-erased `Arc>`. `std` enables `json-serialize`, so std callers see no change; `no_std + alloc` callers must enable the `json-serialize` feature to call it. +- **`producer_service` renamed to `producer` (M15).** `TypedRecord::set_producer_service` → `set_producer`, and `has_producer_service` → `has_producer` (the latter also on the `AnyRecord` trait). Affects code that called these methods directly; the public `.source()` registrar API is unchanged. Also collapses the std/no_std `cfg` split on `AnyRecord::buffer_info` / `transform_input_keys` into single signatures. - **`AimxConfig` lost `subscription_queue_size` (Issue #114, Design 030).** The field bounded a per-subscription mpsc channel that no longer exists — subscriptions are now one future in a `FuturesUnordered`. The builder method `.subscription_queue_size(n)` is removed; replace it with `.max_subs_per_connection(n)` if you were using the value as a soft cap on subscription count, or just delete the call. - **AimX `Welcome.max_subscriptions` now reports the real per-connection cap.** Previously it returned `subscription_queue_size` (default 100) while the actual cap was implicit; it now returns `max_subs_per_connection` (default 32). Clients that displayed this value will see the change. - **AimX `record.subscribe` response no longer carries `queue_size`.** Result object is now `{ "subscription_id": "..." }` — the previous `"queue_size"` reported a number that no longer corresponded to anything in the implementation. @@ -25,7 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?` → `producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }` → `let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible. - `AimDb::produce(key, value) -> DbResult<()>` is now sync; `.await` on the call site goes away. Only the key lookup can fail. - `Database::produce` likewise sync. - - `TypedRecord::produce` is now `pub fn produce(&self, val: T)` (was `pub async fn produce`). + - `TypedRecord::produce` was made sync here (was `pub async fn produce`), then **removed entirely in M15** — see _Removed (breaking)_ below. - `aimdb-wasm-adapter`: `bindings::poll_sync` helper deleted — no remaining callers now that `TypedRecord::produce` is sync. - Dead `consumer.subscribe()` error arms in `transform/single.rs` and `transform/join.rs` removed (the `Err` branch was unreachable after M14). @@ -52,6 +63,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - On the AimX remote-access path, three `runtime.spawn(...)` call sites were temporarily bridged to bare `tokio::spawn` under `#[cfg(feature = "std")]`. These have since been removed by the AimX spawn-free follow-up — see the "AimX remote-access path is now spawn-free" entry above. - `on_start` no_std bifurcation collapsed: a single `StartFnType` alias replaces the byte-identical std/no_std pair. +### Removed (breaking) + +- **`TypedRecord::produce` removed (M15, Design 031).** The M14 step (above) made it sync; M15 removes it entirely. All writes now go through `WriteHandle::push` via `TypedRecord::writer_handle()`. `AimDb::produce` and AimX `set_from_json` route through it; as a side effect `set_from_json` now marks record metadata as updated (previously skipped on that path). `WriteHandle` / `RecordWriter` no longer carry the snapshot mutex. +- **`with_read_only_serialization()` removed (M16, Design 032).** A `Serialize`-only record can no longer be exposed read-only over remote access. Use `with_remote_access()`, which additionally requires `DeserializeOwned`. No in-tree callers existed. + ## [1.1.0] - 2026-05-22 ### Added diff --git a/aimdb-core/Cargo.toml b/aimdb-core/Cargo.toml index 660ad57..5a51d8f 100644 --- a/aimdb-core/Cargo.toml +++ b/aimdb-core/Cargo.toml @@ -23,7 +23,7 @@ std = [ "serde", "thiserror", "anyhow", - "serde_json", + "json-serialize", "tokio", "aimdb-executor/std", ] @@ -31,6 +31,11 @@ std = [ # Heap allocation in no_std environments alloc = ["serde"] # Enable heap in no_std +# JSON codec (`crate::codec`): serde_json-backed `RemoteSerialize` / `JsonCodec`. +# no_std-compatible (serde_json runs on alloc); opt in on embedded targets to +# get `record.latest()?.as_json()` without std/AimX. `std` enables it for AimX. +json-serialize = ["alloc", "serde_json"] + # Observability features (available on both std/no_std) tracing = ["dep:tracing"] # Works in both std and no_std environments defmt = ["dep:defmt"] # Embedded logging via probe (no_std) diff --git a/aimdb-core/src/buffer/traits.rs b/aimdb-core/src/buffer/traits.rs index 571ef48..e544443 100644 --- a/aimdb-core/src/buffer/traits.rs +++ b/aimdb-core/src/buffer/traits.rs @@ -75,6 +75,22 @@ pub trait DynBuffer: Send + Sync { /// Returns self as Any for downcasting to concrete buffer types fn as_any(&self) -> &dyn core::any::Any; + /// Non-destructive read of the buffer's current value. + /// + /// Returns `Some(T)` if the buffer holds a current value that can be read + /// without affecting any consumer's position. Returns `None` if the buffer + /// type has no canonical "current value" concept (e.g., SPMC Ring) or if + /// no value has been produced yet. + /// + /// This is the buffer-native point-in-time read used by AimX `record.get` + /// (design 031). Implementations must not advance any reader position. + /// + /// The default returns `None`, which is the correct behaviour for buffers + /// without a canonical latest value. + fn peek(&self) -> Option { + None + } + /// Get buffer metrics snapshot (metrics feature only) /// /// Returns `Some(snapshot)` if the buffer implementation supports metrics, diff --git a/aimdb-core/src/buffer/writer.rs b/aimdb-core/src/buffer/writer.rs index 19e48c8..e154302 100644 --- a/aimdb-core/src/buffer/writer.rs +++ b/aimdb-core/src/buffer/writer.rs @@ -1,8 +1,8 @@ //! `RecordWriter` — the sole implementor of `WriteHandle` (design 029). //! -//! Pre-binds the three Arcs a `TypedRecord` already owns (buffer, -//! latest-snapshot, metadata tracker) so `Producer` can push values without -//! holding a `Arc>` or running a `HashMap` lookup per call. +//! Pre-binds the buffer and (std-only) metadata tracker so `Producer` can +//! push values without holding a `Arc>` or running a `HashMap` +//! lookup per call. #[cfg(not(feature = "std"))] extern crate alloc; @@ -16,15 +16,9 @@ use std::sync::Arc; use super::traits::{DynBuffer, WriteHandle}; pub(crate) struct RecordWriter { - /// `None` for records that only support `latest()` (no buffer configured). + /// `None` for records without a configured buffer. buffer: Option>>, - /// Snapshot slot shared with `TypedRecord` and any `latest()` reader. - #[cfg(feature = "std")] - latest_snapshot: Arc>>, - #[cfg(not(feature = "std"))] - latest_snapshot: Arc>>, - /// Metadata tracker (already `Clone` with shared inner `Arc` / /// `Arc`). std-only. #[cfg(feature = "std")] @@ -35,39 +29,19 @@ impl RecordWriter { #[cfg(feature = "std")] pub(crate) fn new( buffer: Option>>, - latest_snapshot: Arc>>, metadata: crate::typed_record::RecordMetadataTracker, ) -> Self { - Self { - buffer, - latest_snapshot, - metadata, - } + Self { buffer, metadata } } #[cfg(not(feature = "std"))] - pub(crate) fn new( - buffer: Option>>, - latest_snapshot: Arc>>, - ) -> Self { - Self { - buffer, - latest_snapshot, - } + pub(crate) fn new(buffer: Option>>) -> Self { + Self { buffer } } } impl WriteHandle for RecordWriter { fn push(&self, value: T) { - #[cfg(feature = "std")] - { - *self.latest_snapshot.lock().unwrap() = Some(value.clone()); - } - #[cfg(not(feature = "std"))] - { - *self.latest_snapshot.lock() = Some(value.clone()); - } - if let Some(buf) = &self.buffer { buf.push(value); #[cfg(feature = "std")] diff --git a/aimdb-core/src/builder.rs b/aimdb-core/src/builder.rs index b49a914..c1ddd4e 100644 --- a/aimdb-core/src/builder.rs +++ b/aimdb-core/src/builder.rs @@ -1102,8 +1102,10 @@ impl AimDb { where T: Send + 'static + Debug + Clone, { + // Single write path via WriteHandle (design 031). For hot paths, + // prefer `db.producer::(key)` once and reuse the returned handle. let typed_rec = self.inner.get_typed_record_by_key::(key)?; - typed_rec.produce(value); + typed_rec.writer_handle().push(value); Ok(()) } diff --git a/aimdb-core/src/codec.rs b/aimdb-core/src/codec.rs new file mode 100644 index 0000000..bb9c561 --- /dev/null +++ b/aimdb-core/src/codec.rs @@ -0,0 +1,83 @@ +//! JSON codec for records (feature `json-serialize`, no_std + alloc compatible). +//! +//! A record's value `T` often has to cross a type-erasure boundary into a wire +//! format. AimX (std) crosses into `serde_json::Value`; so can a no_std +//! connector or a local `record.latest()?.as_json()` call. This module +//! provides that bridge without requiring `std` — `serde_json` runs on `alloc` +//! alone, so embedded targets can opt in via the `json-serialize` feature. +//! +//! Two layers: +//! +//! - [`RemoteSerialize`] — the capability trait. Blanket-implemented for every +//! `serde` type, so any `T: Serialize + DeserializeOwned` gets a JSON codec +//! for free. This is the AimX/connector analogue of the data-contract traits +//! (`Streamable`, `Linkable`). It lives in `aimdb-core` rather than +//! `aimdb-data-contracts` because that crate depends on `aimdb-core`, not the +//! reverse — bounding a core method on `Streamable` would be a dependency +//! cycle. Every `Streamable` type satisfies `RemoteSerialize` automatically. +//! +//! - [`JsonCodec`] — the object-safe, type-erased storage form, with the +//! zero-sized [`SerdeJsonCodec`] implementation. A record stores +//! `Option>>`; the AimX read/write/subscribe paths and +//! `RecordValue::as_json` route through it. This mirrors the connector +//! layer's `SerializerFn` / `DeserializerFn`. + +use serde::{de::DeserializeOwned, Serialize}; + +/// A record type that can be encoded to / decoded from the JSON wire format. +/// +/// Blanket-implemented for every `T: Serialize + DeserializeOwned`, so opting a +/// record into JSON via `with_remote_access()` requires no extra boilerplate. +/// Implement `Serialize` + `Deserialize` (e.g. via `derive`) and the type is +/// codec-ready. +pub trait RemoteSerialize: Sized { + /// Serialize this value to a JSON value, or `None` on failure. + fn to_json(&self) -> Option; + + /// Deserialize a JSON value into `Self`, or `None` on schema mismatch. + fn from_json(value: &serde_json::Value) -> Option; +} + +impl RemoteSerialize for T +where + T: Serialize + DeserializeOwned, +{ + fn to_json(&self) -> Option { + serde_json::to_value(self).ok() + } + + fn from_json(value: &serde_json::Value) -> Option { + serde_json::from_value(value.clone()).ok() + } +} + +/// Type-erased JSON codec for one record type. +/// +/// Stored as `Arc>` inside `TypedRecord`, where the +/// blanket `AnyRecord` impl cannot carry a `T: RemoteSerialize` bound (it must +/// also cover non-serializable record types). `T` is fixed per record, so the +/// trait is object-safe. +pub trait JsonCodec: Send + Sync { + /// Encode a typed value to JSON, or `None` on failure. + fn encode(&self, value: &T) -> Option; + + /// Decode a JSON value into `T`, or `None` on schema mismatch. + fn decode(&self, value: &serde_json::Value) -> Option; +} + +/// Zero-sized serde-backed [`JsonCodec`]. +/// +/// Constructed only under a `T: RemoteSerialize` bound (see +/// `TypedRecord::with_remote_access`), so the erased `JsonCodec` it yields is +/// always valid. +pub struct SerdeJsonCodec; + +impl JsonCodec for SerdeJsonCodec { + fn encode(&self, value: &T) -> Option { + value.to_json() + } + + fn decode(&self, value: &serde_json::Value) -> Option { + T::from_json(value) + } +} diff --git a/aimdb-core/src/ext_macros.rs b/aimdb-core/src/ext_macros.rs index b746a2a..2956294 100644 --- a/aimdb-core/src/ext_macros.rs +++ b/aimdb-core/src/ext_macros.rs @@ -116,18 +116,11 @@ macro_rules! impl_record_registrar_ext { ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> { use $crate::buffer::Buffer; - #[cfg(feature = "std")] - { - let buffer = Box::new($buffer_new(&cfg)); - self.buffer_with_cfg(buffer, cfg) - } - - #[cfg(not(feature = "std"))] - { - extern crate alloc; - let buffer = alloc::boxed::Box::new($buffer_new(&cfg)); - self.buffer_raw(buffer) - } + extern crate alloc; + let buffer = alloc::boxed::Box::new($buffer_new(&cfg)); + // Record the cfg so buffer_info() reports the real buffer + // type/capacity for the dependency graph (std and no_std). + self.buffer_with_cfg(buffer, cfg) } fn source( @@ -250,18 +243,11 @@ macro_rules! impl_record_registrar_ext { ) -> &'a mut $crate::RecordRegistrar<'a, T, $runtime> { use $crate::buffer::Buffer; - #[cfg(feature = "std")] - { - let buffer = Box::new($buffer_new(&cfg)); - self.buffer_with_cfg(buffer, cfg) - } - - #[cfg(not(feature = "std"))] - { - extern crate alloc; - let buffer = alloc::boxed::Box::new($buffer_new(&cfg)); - self.buffer_raw(buffer) - } + extern crate alloc; + let buffer = alloc::boxed::Box::new($buffer_new(&cfg)); + // Record the cfg so buffer_info() reports the real buffer + // type/capacity for the dependency graph (std and no_std). + self.buffer_with_cfg(buffer, cfg) } fn source( diff --git a/aimdb-core/src/lib.rs b/aimdb-core/src/lib.rs index c8ffbff..6dcc24d 100644 --- a/aimdb-core/src/lib.rs +++ b/aimdb-core/src/lib.rs @@ -20,6 +20,8 @@ extern crate alloc; pub mod buffer; pub mod builder; +#[cfg(feature = "json-serialize")] +pub mod codec; pub mod connector; pub mod context; pub mod database; @@ -80,6 +82,10 @@ pub use transport::{Connector, ConnectorConfig, PublishError}; pub use typed_api::{Consumer, Producer, RecordRegistrar, RecordT, StageKind}; pub use typed_record::{AnyRecord, AnyRecordExt, TypedRecord}; +// JSON codec (feature `json-serialize`, no_std + alloc compatible) +#[cfg(feature = "json-serialize")] +pub use codec::{JsonCodec, RemoteSerialize, SerdeJsonCodec}; + // Stage profiling exports (feature-gated) #[cfg(feature = "profiling")] pub use profiling::{RecordProfilingMetrics, StageMetrics, StageProfilingInfo}; diff --git a/aimdb-core/src/typed_api.rs b/aimdb-core/src/typed_api.rs index 0f34f89..a6320c4 100644 --- a/aimdb-core/src/typed_api.rs +++ b/aimdb-core/src/typed_api.rs @@ -132,9 +132,9 @@ where /// Produce a value of type T /// - /// Push to the record's buffer, update the latest-snapshot cache, and - /// notify tap observers + outbound link connectors. Synchronous and - /// infallible — the underlying `WriteHandle::push` cannot fail. + /// Push to the record's buffer; consumer tasks and outbound link connectors + /// observe it from there. Synchronous and infallible — the underlying + /// `WriteHandle::push` cannot fail. /// pub fn produce(&self, value: T) { #[cfg(feature = "profiling")] @@ -407,7 +407,7 @@ where + 'static, Fut: Future + Send + 'static, { - self.rec.set_producer_service(f); + self.rec.set_producer(f); #[cfg(feature = "profiling")] { let (idx, _) = self.rec.profiling_mut().push_source(); @@ -470,8 +470,7 @@ where self } - /// Configures a buffer with metadata tracking (std only) - #[cfg(feature = "std")] + /// Configures a buffer with metadata tracking pub fn buffer_with_cfg( &'a mut self, buffer: Box>, @@ -482,17 +481,18 @@ where self } - /// Sets the buffer configuration for metadata tracking (std only) - #[cfg(feature = "std")] + /// Sets the buffer configuration for metadata tracking pub fn buffer_cfg(&'a mut self, cfg: crate::buffer::BufferCfg) -> &'a mut Self { self.rec.set_buffer_cfg(cfg); self } - /// Enables JSON serialization for remote access (std only) + /// Installs the JSON codec for this record (feature `json-serialize`) /// - /// Configures this record to support the `record.get` protocol method. - /// Requires `T: serde::Serialize`. + /// Enables `record.latest()?.as_json()`, and on `std` the AimX `record.get` + /// / `set` / `subscribe` protocol. Requires `T: RemoteSerialize` + /// (blanket-impl'd for every `Serialize + DeserializeOwned` type). Works on + /// no_std + alloc. /// /// # Example /// ```rust,ignore @@ -501,10 +501,10 @@ where /// .with_remote_access(); // Enable remote queries /// }); /// ``` - #[cfg(feature = "std")] + #[cfg(feature = "json-serialize")] pub fn with_remote_access(&'a mut self) -> &'a mut Self where - T: serde::Serialize + serde::de::DeserializeOwned, + T: crate::codec::RemoteSerialize + 'static, { self.rec.with_remote_access(); self @@ -1056,7 +1056,7 @@ where self.url ); } - if self.registrar.rec.has_producer_service() { + if self.registrar.rec.has_producer() { panic!( "Record already has a .source(); cannot also have a .link_from() for {}", self.url @@ -1620,7 +1620,7 @@ mod tests { fn link_from_after_source_panics() { let mut rec = crate::typed_record::TypedRecord::::new(); rec.set_buffer(Box::new(MockBuffer)); - rec.set_producer_service(|_p, _ctx| async move {}); + rec.set_producer(|_p, _ctx| async move {}); let builders: Vec>> = vec![Box::new(MockConnectorBuilder { @@ -1673,7 +1673,7 @@ mod tests { .finish(); } - rec.set_producer_service(|_p, _ctx| async move {}); + rec.set_producer(|_p, _ctx| async move {}); } #[test] diff --git a/aimdb-core/src/typed_record.rs b/aimdb-core/src/typed_record.rs index d94ac28..181b2ab 100644 --- a/aimdb-core/src/typed_record.rs +++ b/aimdb-core/src/typed_record.rs @@ -18,7 +18,7 @@ use core::fmt::Debug; extern crate alloc; #[cfg(not(feature = "std"))] -use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec}; #[cfg(not(feature = "std"))] use alloc::string::ToString; @@ -26,15 +26,33 @@ use alloc::string::ToString; #[cfg(feature = "std")] use std::{boxed::Box, string::String, sync::Arc, vec::Vec}; -use crate::buffer::DynBuffer; +#[cfg(feature = "profiling")] +use crate::profiling::RecordProfilingMetrics; -/// Type alias for JSON serializer function (std only) #[cfg(feature = "std")] -type JsonSerializer = Arc Option + Send + Sync>; +type Mutex = std::sync::Mutex; +#[cfg(not(feature = "std"))] +type Mutex = spin::Mutex; -/// Type alias for JSON deserializer function (std only) +/// Locks one of the `TypedRecord` field mutexes, hiding the std/spin API +/// difference (`std::sync::Mutex::lock` returns a `LockResult`; `spin` returns +/// the guard directly). A poisoned std mutex is unrecoverable in this code, so +/// `.unwrap()` is the correct response. The returned guard derefs to `T` on +/// both sides, so call sites are identical. #[cfg(feature = "std")] -type JsonDeserializer = Arc Option + Send + Sync>; +fn lock(m: &Mutex) -> std::sync::MutexGuard<'_, T> { + m.lock().unwrap() +} +#[cfg(not(feature = "std"))] +fn lock(m: &Mutex) -> spin::MutexGuard<'_, T> { + m.lock() +} + +use crate::buffer::DynBuffer; + +/// Type alias for a record's type-erased JSON codec (feature `json-serialize`) +#[cfg(feature = "json-serialize")] +type RecordCodec = Arc>; /// Wrapper for a record's latest value with optional serialization /// @@ -42,20 +60,20 @@ type JsonDeserializer = Arc Option + Send + /// both std and no_std. JSON serialization (`.as_json()`) requires std feature. pub struct RecordValue { value: T, - #[cfg(feature = "std")] - serializer: Option>, + #[cfg(feature = "json-serialize")] + codec: Option>, } impl RecordValue { - /// Create a new RecordValue with optional serializer - #[cfg(feature = "std")] - fn new(value: T, serializer: Option>) -> Self { - Self { value, serializer } + /// Create a new RecordValue with optional codec + #[cfg(feature = "json-serialize")] + fn new(value: T, codec: Option>) -> Self { + Self { value, codec } } - /// Create a new RecordValue without serializer (no_std) - #[cfg(not(feature = "std"))] - fn new(value: T, _serializer: Option<()>) -> Self { + /// Create a new RecordValue without codec (codec feature off) + #[cfg(not(feature = "json-serialize"))] + fn new(value: T, _codec: Option<()>) -> Self { Self { value } } @@ -69,15 +87,15 @@ impl RecordValue { self.value } - /// Serialize the value to JSON (std only) + /// Serialize the value to JSON (feature `json-serialize`) /// - /// Returns `Some(JsonValue)` if record was configured with `.with_remote_access()`, - /// otherwise `None`. Requires `serde_json` (std only). For no_std, use `.get()`, + /// Returns `Some(JsonValue)` if the record was configured with + /// `.with_remote_access()`, otherwise `None`. Available on no_std + alloc + /// when the `json-serialize` feature is enabled. Without it, use `.get()`, /// `.into_inner()`, or `Deref` for direct access. - #[cfg(feature = "std")] + #[cfg(feature = "json-serialize")] pub fn as_json(&self) -> Option { - let serializer = self.serializer.as_ref()?; - serializer(&self.value) + self.codec.as_ref()?.encode(&self.value) } } @@ -109,8 +127,8 @@ impl core::ops::Deref for RecordValue { struct JsonReaderAdapter { /// The underlying typed buffer reader inner: Box + Send>, - /// JSON serializer function (from .with_remote_access()) - serializer: JsonSerializer, + /// JSON codec (from .with_remote_access()) + codec: RecordCodec, } #[cfg(feature = "std")] @@ -129,7 +147,7 @@ impl crate::buffer::JsonBufferReader for JsonReaderAd let value = self.inner.recv().await?; // Serialize to JSON - (self.serializer)(&value).ok_or_else(|| { + self.codec.encode(&value).ok_or_else(|| { #[cfg(feature = "std")] { crate::DbError::RuntimeError { @@ -148,10 +166,12 @@ impl crate::buffer::JsonBufferReader for JsonReaderAd // Non-blocking receive from underlying typed buffer let value = self.inner.try_recv()?; - // Serialize to JSON using the configured serializer - (self.serializer)(&value).ok_or_else(|| crate::DbError::RuntimeError { - message: "Failed to serialize value to JSON".to_string(), - }) + // Serialize to JSON using the configured codec + self.codec + .encode(&value) + .ok_or_else(|| crate::DbError::RuntimeError { + message: "Failed to serialize value to JSON".to_string(), + }) } } @@ -281,7 +301,7 @@ pub trait AnyRecord: Send + Sync { fn consumer_count(&self) -> usize; /// Returns whether a producer service is registered - fn has_producer_service(&self) -> bool; + fn has_producer(&self) -> bool; /// Returns whether a transform is registered for this record fn has_transform(&self) -> bool; @@ -292,23 +312,11 @@ pub trait AnyRecord: Send + Sync { /// Returns the buffer type name and capacity (for dependency graph) /// /// Returns (buffer_type_name, optional_capacity). - #[cfg(feature = "std")] fn buffer_info(&self) -> (String, Option); - /// Returns the buffer type name and capacity (for dependency graph) - /// - /// Returns (buffer_type_name, optional_capacity). - #[cfg(not(feature = "std"))] - fn buffer_info(&self) -> (alloc::string::String, Option); - /// Returns the transform input keys (if a transform is registered) - #[cfg(feature = "std")] fn transform_input_keys(&self) -> Option>; - /// Returns the transform input keys (if a transform is registered) - #[cfg(not(feature = "std"))] - fn transform_input_keys(&self) -> Option>; - /// Sets the writable flag for this record (type-erased) /// /// Used internally by the builder to apply security policy to records. @@ -505,7 +513,7 @@ where let mut futures = Vec::new(); - if typed_record.has_producer_service() { + if typed_record.has_producer() { if let Some(f) = typed_record.collect_producer_future(runtime, db, record_key)? { futures.push(f); } @@ -534,28 +542,17 @@ pub struct TypedRecord< /// This will be auto-spawned during build() if present /// Stored as FnOnce that takes (Producer, RuntimeContext) and returns a Future /// Wrapped in Mutex for interior mutability (needed to take() during spawning) - #[cfg(feature = "std")] - producer_service: std::sync::Mutex>>, - - #[cfg(not(feature = "std"))] - producer_service: spin::Mutex>>, + producer: Mutex>>, /// List of consumer/tap tasks - wrapped in Mutex for Sync + taking out during spawn /// Each is spawned as an independent background task that subscribes to the buffer /// Using Mutex provides the Sync bound required by AnyRecord trait - #[cfg(feature = "std")] - consumers: std::sync::Mutex>>, - - #[cfg(not(feature = "std"))] - consumers: spin::Mutex>>, + consumers: Mutex>>, - /// Transform descriptor — mutually exclusive with producer_service. + /// Transform descriptor — mutually exclusive with producer. /// If set, this record is a reactive derivation from one or more input records. /// Uses the same Mutex pattern for take()-during-spawn. - #[cfg(feature = "std")] - transform: std::sync::Mutex>>, - #[cfg(not(feature = "std"))] - transform: spin::Mutex>>, + transform: Mutex>>, /// Optional buffer for async dispatch /// When present, produce() enqueues to buffer instead of direct call @@ -564,8 +561,8 @@ pub struct TypedRecord< /// a pre-resolved handle to the same buffer — design 029 hot-path change. buffer: Option>>, - /// Buffer configuration (cached for metadata, std only) - #[cfg(feature = "std")] + /// Buffer configuration cached for metadata / dependency-graph reporting. + /// Set via `set_buffer_cfg()` after `set_buffer()`. buffer_cfg: Option, /// List of outbound connector links (AimDB → External) @@ -581,33 +578,20 @@ pub struct TypedRecord< /// Stages are appended here in the same order they are registered on the /// `RecordRegistrar`, which matches the order the spawn machinery iterates them. #[cfg(feature = "profiling")] - profiling: crate::profiling::RecordProfilingMetrics, + profiling: RecordProfilingMetrics, /// Metadata tracking (std only - for remote access) #[cfg(feature = "std")] metadata: RecordMetadataTracker, - /// JSON serializer function (std only - for remote access) - /// When set via .with_remote_access(), automatically serializes values for record.get queries - /// Stores the serialization logic where T: Serialize is known at call site - #[cfg(feature = "std")] - json_serializer: Option>, - - /// JSON deserializer function (std only - for remote access) - /// When set via .with_remote_access(), automatically deserializes JSON for record.set operations - /// Stores the deserialization logic where T: Deserialize is known at call site - #[cfg(feature = "std")] - json_deserializer: Option>, - - /// Latest value snapshot - for latest() API - /// Cached atomically on every produce() call to support latest() - /// This provides a buffer-agnostic way to query the latest value - /// Available in both std and no_std environments - #[cfg(feature = "std")] - latest_snapshot: Arc>>, - - #[cfg(not(feature = "std"))] - latest_snapshot: Arc>>, + /// Type-erased JSON codec (feature `json-serialize`). + /// `Some` iff the record opted into JSON via `.with_remote_access()`. + /// `RecordValue::as_json` and — on std — the AimX read (`latest_json`), + /// write (`set_from_json`), and subscribe (`subscribe_json`) paths route + /// through it. Built from a `SerdeJsonCodec` where the `T: RemoteSerialize` + /// bound is known at the call site. + #[cfg(feature = "json-serialize")] + remote_codec: Option>, } impl @@ -618,48 +602,32 @@ impl Self { Self { - #[cfg(feature = "std")] - producer_service: std::sync::Mutex::new(None), - #[cfg(not(feature = "std"))] - producer_service: spin::Mutex::new(None), - #[cfg(feature = "std")] - consumers: std::sync::Mutex::new(Vec::new()), - #[cfg(not(feature = "std"))] - consumers: spin::Mutex::new(alloc::vec::Vec::new()), - #[cfg(feature = "std")] - transform: std::sync::Mutex::new(None), - #[cfg(not(feature = "std"))] - transform: spin::Mutex::new(None), + producer: Mutex::new(None), + consumers: Mutex::new(Vec::new()), + transform: Mutex::new(None), buffer: None, - #[cfg(feature = "std")] buffer_cfg: None, outbound_connectors: Vec::new(), inbound_connectors: Vec::new(), #[cfg(feature = "profiling")] - profiling: crate::profiling::RecordProfilingMetrics::new(), + profiling: RecordProfilingMetrics::new(), #[cfg(feature = "std")] metadata: RecordMetadataTracker::new::(), - #[cfg(feature = "std")] - json_serializer: None, - #[cfg(feature = "std")] - json_deserializer: None, - #[cfg(feature = "std")] - latest_snapshot: Arc::new(std::sync::Mutex::new(None)), - #[cfg(not(feature = "std"))] - latest_snapshot: Arc::new(spin::Mutex::new(None)), + #[cfg(feature = "json-serialize")] + remote_codec: None, } } /// Stage profiling metrics for this record (feature `profiling`). #[cfg(feature = "profiling")] - pub fn profiling(&self) -> &crate::profiling::RecordProfilingMetrics { + pub fn profiling(&self) -> &RecordProfilingMetrics { &self.profiling } /// Mutable access to the stage profiling metrics — used during registration /// to append per-stage entries and assign names. #[cfg(feature = "profiling")] - pub(crate) fn profiling_mut(&mut self) -> &mut crate::profiling::RecordProfilingMetrics { + pub(crate) fn profiling_mut(&mut self) -> &mut RecordProfilingMetrics { &mut self.profiling } @@ -670,18 +638,13 @@ impl(&mut self, f: F) + pub fn set_producer(&mut self, f: F) where F: FnOnce(crate::Producer, Arc) -> Fut + Send + 'static, Fut: core::future::Future + Send + 'static, { // Check for existing transform (mutual exclusion) - #[cfg(feature = "std")] - let has_transform = self.transform.lock().unwrap().is_some(); - #[cfg(not(feature = "std"))] - let has_transform = self.transform.lock().is_some(); - - if has_transform { + if lock(&self.transform).is_some() { panic!("Record already has a .transform(); cannot also have a .source()."); } @@ -690,12 +653,7 @@ impl BoxFuture<'static, ()> { Box::pin(f(consumer, ctx_any)) }, ); - #[cfg(feature = "std")] - { - self.consumers.lock().unwrap().push(boxed); - } - - #[cfg(not(feature = "std"))] - { - self.consumers.lock().push(boxed); - } + lock(&self.consumers).push(boxed); } /// Sets the transform descriptor for this record. @@ -770,12 +713,7 @@ impl, ) { // Enforce mutual exclusion with .source() - #[cfg(feature = "std")] - let has_source = self.producer_service.lock().unwrap().is_some(); - #[cfg(not(feature = "std"))] - let has_source = self.producer_service.lock().is_some(); - - if has_source { + if lock(&self.producer).is_some() { panic!("Record already has a .source(); cannot also have a .transform()."); } @@ -783,11 +721,7 @@ impl bool { - #[cfg(feature = "std")] - { - self.transform.lock().unwrap().is_some() - } - #[cfg(not(feature = "std"))] - { - self.transform.lock().is_some() - } + lock(&self.transform).is_some() } /// Returns how this record gets its values. @@ -815,15 +742,7 @@ impl crate::graph::RecordOrigin { // Check for transform first (most specific) - #[cfg(feature = "std")] - let transform_keys = self - .transform - .lock() - .unwrap() - .as_ref() - .map(|t| t.input_keys.clone()); - #[cfg(not(feature = "std"))] - let transform_keys = self.transform.lock().as_ref().map(|t| t.input_keys.clone()); + let transform_keys = lock(&self.transform).as_ref().map(|t| t.input_keys.clone()); if let Some(input_keys) = transform_keys { if input_keys.len() == 1 { @@ -844,12 +763,7 @@ impl Option> { - #[cfg(feature = "std")] - let guard = self.transform.lock().unwrap(); - #[cfg(not(feature = "std"))] - let guard = self.transform.lock(); - - guard.as_ref().map(|t| t.input_keys.clone()) - } - - #[cfg(not(feature = "std"))] - pub fn transform_input_keys(&self) -> Option> { - #[cfg(feature = "std")] - { - self.transform - .lock() - .unwrap() - .as_ref() - .map(|d| d.input_keys.clone()) - } - #[cfg(not(feature = "std"))] - { - self.transform.lock().as_ref().map(|d| d.input_keys.clone()) - } + lock(&self.transform).as_ref().map(|t| t.input_keys.clone()) } /// Collects the transform task future and any fan-in forwarder futures. @@ -902,16 +794,7 @@ impl>) { - // Cache buffer configuration for metadata (std only) - #[cfg(feature = "std")] - { - // Store a simplified version of the config for metadata - // We can't call cfg() on the buffer, so we'll infer from the buffer type name - self.buffer_cfg = None; // Will be set by the caller via set_buffer_cfg - } + // The buffer trait object hides the original BufferCfg, so callers must + // supply it via set_buffer_cfg() if they want accurate buffer_info(). + self.buffer_cfg = None; // `Arc::from(Box)` reuses the existing heap allocation; the // public API stays Box-flavoured to avoid churn at adapter / test call // sites, while internally we share via Arc so producers/consumers can @@ -953,14 +832,12 @@ impl (String, Option) { if let Some(cfg) = &self.buffer_cfg { let cap = match cfg { @@ -968,22 +845,14 @@ impl None, }; (cfg.name().to_string(), cap) + } else if self.buffer.is_some() { + // Buffer set via buffer_raw() without a recorded cfg. + ("unknown".to_string(), None) } else { ("none".to_string(), None) } } - /// Returns buffer type name and capacity (for dependency graph) - #[cfg(not(feature = "std"))] - pub fn buffer_info(&self) -> (alloc::string::String, Option) { - // No buffer_cfg in no_std, so we just report based on buffer presence - if self.buffer.is_some() { - ("unknown".into(), None) - } else { - ("none".into(), None) - } - } - /// Returns whether a buffer is configured /// /// # Returns @@ -1011,16 +880,12 @@ impl &mut Self where - T: serde::Serialize + serde::de::DeserializeOwned, + T: crate::codec::RemoteSerialize + 'static, { - // Store serialization function where T: Serialize is known - self.json_serializer = Some(std::sync::Arc::new(|val: &T| { - serde_json::to_value(val).ok() - })); - - // Store deserialization function where T: DeserializeOwned is known - self.json_deserializer = Some(std::sync::Arc::new(|json_val: &serde_json::Value| { - serde_json::from_value(json_val.clone()).ok() - })); + // Capture the serde-backed codec where the bound is statically known. + self.remote_codec = Some(Arc::new(crate::codec::SerdeJsonCodec)); #[cfg(feature = "tracing")] tracing::info!( @@ -1089,36 +947,6 @@ impl &mut Self - where - T: serde::Serialize, - { - // Store only serialization function - self.json_serializer = Some(std::sync::Arc::new(|val: &T| { - serde_json::to_value(val).ok() - })); - - // Deserialization intentionally left as None - will fail at runtime if - // someone tries to use record.set on this record - - #[cfg(feature = "tracing")] - tracing::info!( - "with_read_only_serialization() called for record type: {}", - core::any::type_name::() - ); - - self - } - /// Returns a reference to the registered outbound connectors /// /// # Returns @@ -1152,21 +980,11 @@ impl usize { - #[cfg(feature = "std")] - { - self.consumers.lock().unwrap().len() - } - - #[cfg(not(feature = "std"))] - { - self.consumers.lock().len() - } + lock(&self.consumers).len() } /// Collects all registered consumer (`.tap()`) futures. @@ -1220,17 +1030,7 @@ impl bool { - #[cfg(feature = "std")] - { - self.producer_service.lock().unwrap().is_some() - } - #[cfg(not(feature = "std"))] - { - self.producer_service.lock().is_some() - } + pub fn has_producer(&self) -> bool { + lock(&self.producer).is_some() } /// Marks this record as writable for remote access (std only) @@ -1399,15 +1158,15 @@ impl Option> { - #[cfg(feature = "std")] + // Read buffer-native storage via peek() (design 031). Records without + // a buffer return None — see Breaking Changes in design 031. + let value = self.buffer.as_ref()?.peek()?; + #[cfg(feature = "json-serialize")] { - let value = self.latest_snapshot.lock().unwrap().clone()?; - Some(RecordValue::new(value, self.json_serializer.clone())) + Some(RecordValue::new(value, self.remote_codec.clone())) } - - #[cfg(not(feature = "std"))] + #[cfg(not(feature = "json-serialize"))] { - let value = self.latest_snapshot.lock().clone()?; Some(RecordValue::new(value, None)) } } @@ -1449,6 +1208,18 @@ impl bool { - TypedRecord::has_producer_service(self) + fn has_producer(&self) -> bool { + TypedRecord::has_producer(self) } fn has_transform(&self) -> bool { @@ -1492,26 +1263,14 @@ impl (String, Option) { TypedRecord::buffer_info(self) } - #[cfg(not(feature = "std"))] - fn buffer_info(&self) -> (alloc::string::String, Option) { - TypedRecord::buffer_info(self) - } - - #[cfg(feature = "std")] fn transform_input_keys(&self) -> Option> { TypedRecord::transform_input_keys(self) } - #[cfg(not(feature = "std"))] - fn transform_input_keys(&self) -> Option> { - TypedRecord::transform_input_keys(self) - } - fn set_writable_erased(&self, writable: bool) { #[cfg(feature = "std")] { @@ -1556,7 +1315,7 @@ impl() ); - // Delegate to latest() which returns RecordValue with serializer attached - let result = self.latest().and_then(|v| v.as_json()); + // Read buffer-native storage via peek() (design 031). Records without + // a buffer return None — see Breaking Changes in design doc. + let value = self.buffer.as_ref()?.peek()?; + let result = self.remote_codec.as_ref()?.encode(&value); #[cfg(feature = "tracing")] tracing::debug!("Serialization result: {:?}", result.is_some()); @@ -1617,8 +1378,8 @@ impl T - let value: T = deserializer(&json_value).ok_or_else(|| DbError::RuntimeError { - message: format!( - "Failed to deserialize JSON to type '{}'. \ + let value: T = codec + .decode(&json_value) + .ok_or_else(|| DbError::RuntimeError { + message: format!( + "Failed to deserialize JSON to type '{}'. \ JSON structure does not match the expected schema.", - core::any::type_name::() - ), - })?; + core::any::type_name::() + ), + })?; #[cfg(feature = "tracing")] tracing::debug!( @@ -1713,7 +1476,9 @@ impl() ); - self.produce(value); + // Push through the unified write path (design 031). This also marks + // metadata as updated — previously skipped on this path. + self.writer_handle().push(value); #[cfg(feature = "tracing")] tracing::info!( diff --git a/aimdb-embassy-adapter/CHANGELOG.md b/aimdb-embassy-adapter/CHANGELOG.md index c107d2e..a08e5c7 100644 --- a/aimdb-embassy-adapter/CHANGELOG.md +++ b/aimdb-embassy-adapter/CHANGELOG.md @@ -7,6 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`EmbassyBuffer::peek()` (M15, Design 031).** Non-destructive buffer-native read matching the Tokio adapter's semantics: `SingleLatest` (`Watch`) via `Watch::try_get()`, `Mailbox` (`Channel<_, T, 1>`) via `Channel::try_peek()`, `SpmcRing` (`PubSubChannel`) returns `None`. Neither path consumes a receiver slot or advances a cursor. +- **Embassy buffer + join-queue unit tests now run in CI on the host (Issue #85).** Previously the join-queue tests sat behind `feature = "embassy-runtime"`, which transitively pulls `embassy-executor`'s `platform-cortex-m` ARM assembly and fails to compile under `cargo test` on x86_64 — so ordering / backpressure / clone-routing regressions went uncaught. The `join_queue` module is now gated on `embassy-sync` instead (the `JoinFanInRuntime for EmbassyAdapter` impl keeps its own `embassy-runtime` gate), and `make test` runs `cargo test -p aimdb-embassy-adapter --no-default-features --features "alloc,embassy-sync,embassy-time"` (15 unit tests + doctests). A test-only no-op `#[defmt::global_logger]` / `#[defmt::panic_handler]` and a trivial `embassy-time-driver` satisfy the host link targets that `defmt` + `defmt-timestamp-uptime` would otherwise leave undefined. +- **`embassy-time-driver` dev-dependency** — provides the trivial host time driver above (no tick feature, so it unifies with the workspace `tick-hz-32_768` rather than forcing `mock-driver`/`std`'s conflicting rate). + +### Fixed + +- **`TypedRecord::latest()` no longer always returns `None` on Embassy (M15).** With `latest_snapshot` removed in `aimdb-core`, reads go straight to the buffer via `peek()`; the Embassy adapter now implements `peek()` (above) so `latest()` returns the current value on `SingleLatest` / `Mailbox` instead of `None`. +- **Stale `EmbassyBuffer` doc example.** It imported the removed `BufferBackend` trait (now `Buffer` / `BufferReader`) and put a non-`const` `new_spmc()` in a `static`; it never compiled because doctests didn't build on host before. Now corrected and exercised by `cargo test`'s doctest pass. + +### Changed + +- **`buffer()` / `buffer_sized()` now record the `BufferCfg` (via `buffer_with_cfg`).** `buffer_info()` therefore reports the real buffer type and capacity in the dependency graph on `no_std` too (previously `"unknown"`), matching std behaviour. Mirrors the `aimdb-core` `impl_record_registrar_ext!` change (M15). + ### Changed (breaking) - **Generated extension trait emits `Producer` / `Consumer`** (no `, EmbassyAdapter`) via the updated `impl_record_registrar_ext!` macro from `aimdb-core` (Design 029, M14). Embassy demo signatures collapse from `Producer` to `Producer`. diff --git a/aimdb-embassy-adapter/Cargo.toml b/aimdb-embassy-adapter/Cargo.toml index eb58616..4d72351 100644 --- a/aimdb-embassy-adapter/Cargo.toml +++ b/aimdb-embassy-adapter/Cargo.toml @@ -78,6 +78,12 @@ futures = "0.3" # Provides critical-section impl for host-side tests (embassy_sync channels need it) critical-section = { version = "1.1", features = ["std"] } +# Lets host tests register a trivial time driver (defines `_embassy_time_now`), +# pulled in by embassy-time's `defmt-timestamp-uptime`. No tick feature here so +# it unifies with the workspace's `tick-hz-32_768` (mock-driver/std would force +# a conflicting tick rate). +embassy-time-driver = { path = "../_external/embassy/embassy-time-driver" } + # For tracing tests tracing-test = "0.2" diff --git a/aimdb-embassy-adapter/src/buffer.rs b/aimdb-embassy-adapter/src/buffer.rs index e33c337..cf248e9 100644 --- a/aimdb-embassy-adapter/src/buffer.rs +++ b/aimdb-embassy-adapter/src/buffer.rs @@ -69,15 +69,15 @@ use aimdb_core::buffer::{BufferCounters, BufferMetrics, BufferMetricsSnapshot}; /// # Example /// ```no_run /// use aimdb_embassy_adapter::EmbassyBuffer; -/// use aimdb_core::buffer::{BufferBackend, BufferCfg}; +/// use aimdb_core::buffer::{Buffer, BufferReader}; /// /// // Create an SPMC ring buffer with capacity 32, 4 subscribers, 2 publishers /// type MyBuffer = EmbassyBuffer; -/// static BUFFER: MyBuffer = MyBuffer::new_spmc(); /// /// # async fn example() { -/// let mut reader = BUFFER.subscribe(); -/// BUFFER.push(42); +/// let buffer: MyBuffer = MyBuffer::new_spmc(); +/// let mut reader = buffer.subscribe(); +/// buffer.push(42); /// let value = reader.recv().await.unwrap(); /// # } /// ``` @@ -240,6 +240,20 @@ impl< self } + fn peek(&self) -> Option { + match &*self.inner { + // Watch stores the latest value natively; try_get() clones it + // without consuming a receiver slot or advancing any cursor. + EmbassyBufferInner::Watch(watch) => watch.try_get(), + // Channel<_, T, 1>::try_peek() clones the pending slot without + // removing it (the slot is drained once a consumer receives). + // Mirrors the Tokio Mailbox arm. + EmbassyBufferInner::Mailbox(channel) => channel.try_peek().ok(), + // PubSub has no canonical latest — see design 031 §SPMC Ring. + EmbassyBufferInner::SpmcRing(_) => None, + } + } + #[cfg(feature = "metrics")] fn metrics_snapshot(&self) -> Option { Some(::metrics(self)) @@ -546,6 +560,40 @@ impl< mod tests { use super::*; + // ── Host-test scaffolding ──────────────────────────────────────────── + // The crate links `defmt` (workspace dep) and embassy-time's + // `defmt-timestamp-uptime`, but on the host neither a defmt logger nor a + // time driver exists. Provide no-op stubs so the test binary links. Run via + // the `test` Make target, or directly: + // cargo test -p aimdb-embassy-adapter \ + // --no-default-features --features "alloc,embassy-sync,embassy-time" + // (`embassy-runtime` would pull the cortex-m executor, which can't host-build.) + #[defmt::global_logger] + struct TestLogger; + + unsafe impl defmt::Logger for TestLogger { + fn acquire() {} + unsafe fn flush() {} + unsafe fn release() {} + unsafe fn write(_bytes: &[u8]) {} + } + + #[defmt::panic_handler] + fn defmt_panic() -> ! { + core::panic!("defmt panic in host test") + } + + // Trivial time driver so `_embassy_time_now` resolves on the host. peek() + // never reads the clock; the driver only needs to exist for linking. + struct TestTimeDriver; + impl embassy_time_driver::Driver for TestTimeDriver { + fn now(&self) -> u64 { + 0 + } + fn schedule_wake(&self, _at: u64, _waker: &core::task::Waker) {} + } + embassy_time_driver::time_driver_impl!(static TEST_TIME_DRIVER: TestTimeDriver = TestTimeDriver); + // Note: Embassy tests typically run on actual embedded targets or with embassy-executor // For now, these are basic compilation tests. Integration tests would need embassy-executor. @@ -571,4 +619,75 @@ mod tests { let cfg3 = BufferCfg::Mailbox; let _buf3: TestBuffer = Buffer::new(&cfg3); } + + // ======================================================================== + // peek() Tests — non-destructive buffer-native reads (design 031) + // + // push()/peek() are synchronous and lock a CriticalSectionRawMutex; the + // `critical-section` std impl in dev-dependencies provides the host-side + // implementation, so these run without an embassy executor. Run with: + // cargo test -p aimdb-embassy-adapter \ + // --no-default-features --features "alloc,embassy-sync,embassy-time" + // ======================================================================== + + use aimdb_core::buffer::DynBuffer; + + type PeekBuffer = EmbassyBuffer; + + #[test] + fn test_peek_single_latest_empty() { + let buffer: PeekBuffer = PeekBuffer::new_watch(); + assert_eq!(DynBuffer::peek(&buffer), None); + } + + #[test] + fn test_peek_single_latest_returns_latest() { + let buffer: PeekBuffer = PeekBuffer::new_watch(); + DynBuffer::push(&buffer, 1); + DynBuffer::push(&buffer, 2); + DynBuffer::push(&buffer, 3); + assert_eq!(DynBuffer::peek(&buffer), Some(3)); + } + + #[test] + fn test_peek_single_latest_is_non_destructive() { + let buffer: PeekBuffer = PeekBuffer::new_watch(); + DynBuffer::push(&buffer, 42); + // Multiple peeks return the same value. + assert_eq!(DynBuffer::peek(&buffer), Some(42)); + assert_eq!(DynBuffer::peek(&buffer), Some(42)); + } + + #[test] + fn test_peek_mailbox_empty() { + let buffer: PeekBuffer = PeekBuffer::new_mailbox(); + assert_eq!(DynBuffer::peek(&buffer), None); + } + + #[test] + fn test_peek_mailbox_returns_pending() { + let buffer: PeekBuffer = PeekBuffer::new_mailbox(); + DynBuffer::push(&buffer, 7); + assert_eq!(DynBuffer::peek(&buffer), Some(7)); + // Peek is non-destructive: the slot is still occupied. + assert_eq!(DynBuffer::peek(&buffer), Some(7)); + } + + #[test] + fn test_peek_mailbox_reflects_overwrite() { + let buffer: PeekBuffer = PeekBuffer::new_mailbox(); + DynBuffer::push(&buffer, 1); + DynBuffer::push(&buffer, 2); + assert_eq!(DynBuffer::peek(&buffer), Some(2)); + } + + #[test] + fn test_peek_spmc_ring_returns_none() { + // PubSub has no canonical latest — see design 031 §SPMC Ring. + let buffer: PeekBuffer = PeekBuffer::new_spmc(); + assert_eq!(DynBuffer::peek(&buffer), None); + DynBuffer::push(&buffer, 1); + DynBuffer::push(&buffer, 2); + assert_eq!(DynBuffer::peek(&buffer), None); + } } diff --git a/aimdb-embassy-adapter/src/join_queue.rs b/aimdb-embassy-adapter/src/join_queue.rs index 8dab5c5..75d1788 100644 --- a/aimdb-embassy-adapter/src/join_queue.rs +++ b/aimdb-embassy-adapter/src/join_queue.rs @@ -102,23 +102,18 @@ impl JoinFanInRuntime for EmbassyAdapter { // These tests cover: roundtrip ordering, bounded backpressure, and sender cloning. // Embassy channels do not close — there are no QueueClosed scenarios to test. // -// NOTE: the tests themselves only depend on `embassy_sync::Channel` and -// `futures::executor::block_on`, both of which are host-portable. The -// `critical-section` dev-dep with `std` feature is provided so the -// `CriticalSectionRawMutex` link target is satisfied on host. -// -// However, the tests live in a module gated on `feature = "embassy-runtime"`, -// which transitively pulls in `embassy-executor`'s `platform-cortex-m` (ARM -// assembly) and so does not compile under `cargo test` on x86_64. As a result -// they are NOT exercised by `make check` / `make all` today — only by -// `cargo check --target thumbv7em-none-eabihf --features embassy-runtime`, -// which type-checks but does not execute them. Run them manually on an -// Embassy-capable board or ARM simulator, or via a host-side harness that -// builds the queue module without the executor. +// They run on the host: the queue types depend only on `embassy_sync::Channel` +// and `futures::executor::block_on` (the runtime-specific `JoinFanInRuntime` +// impl above carries its own `embassy-runtime` gate), so this module is gated on +// `embassy-sync` rather than `embassy-runtime` and never pulls embassy-executor's +// cortex-m assembly into the host test build. `make test` exercises them via: +// cargo test -p aimdb-embassy-adapter \ +// --no-default-features --features "alloc,embassy-sync,embassy-time" +// (the `critical-section/std` dev-dep satisfies the `CriticalSectionRawMutex` +// link target; defmt/time-driver stubs live in `buffer.rs`'s test module). #[cfg(test)] mod tests { use super::*; - use aimdb_executor::{JoinQueue as _, JoinReceiver as _, JoinSender as _}; use futures::executor::block_on; fn make_channel() -> &'static EmbassyChan { diff --git a/aimdb-embassy-adapter/src/lib.rs b/aimdb-embassy-adapter/src/lib.rs index 6aeae77..0bdd77e 100644 --- a/aimdb-embassy-adapter/src/lib.rs +++ b/aimdb-embassy-adapter/src/lib.rs @@ -66,7 +66,7 @@ extern crate alloc; #[cfg(all(not(feature = "std"), feature = "embassy-sync"))] pub mod buffer; -#[cfg(all(not(feature = "std"), feature = "embassy-runtime"))] +#[cfg(all(not(feature = "std"), feature = "embassy-sync"))] pub mod join_queue; #[cfg(not(feature = "std"))] @@ -333,7 +333,9 @@ where let buffer = Box::new(EmbassyBuffer::::new( &cfg, )); - self.buffer_raw(buffer) + // Record the cfg so buffer_info() reports the real buffer type/capacity + // for the dependency graph. + self.buffer_with_cfg(buffer, cfg) } fn source_with_context( diff --git a/aimdb-tokio-adapter/CHANGELOG.md b/aimdb-tokio-adapter/CHANGELOG.md index e7ead46..e1fa651 100644 --- a/aimdb-tokio-adapter/CHANGELOG.md +++ b/aimdb-tokio-adapter/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **`TokioBuffer::peek()` (M15, Design 031).** Non-destructive buffer-native read backing AimX `record.get` / `TypedRecord::latest()`: `SingleLatest` (`Watch`) reads via `watch::Sender::borrow()`, `Mailbox` (`Notify`) clones the slot mutex, `SpmcRing` (`Broadcast`) returns `None` (no canonical latest). Unit tests cover all three buffer types (empty, populated, non-destructive, overwrite, drained). +- **`tests/remote_access_validation.rs` integration test.** Asserts that a `.with_remote_access()` record with no buffer fails `build()`, and that the same record with a `SingleLatest` buffer builds — locking in the new build-time guard from `aimdb-core` (M15). + +### Fixed + +- **`SingleLatest` no longer drops a value produced before any subscriber attached (M15).** The `Watch` push path now uses `watch::Sender::send_replace` instead of `send`. `send` returns `Err` and discards the value when there are zero receivers; `send_replace` always updates the slot, so the value is visible to `peek()` and to later subscribers reading the slot. + ### Changed (breaking) - **Generated extension trait emits `Producer` / `Consumer`** (no `, TokioAdapter`) via the updated `impl_record_registrar_ext!` macro from `aimdb-core` (Design 029, M14). User-side `.source(|ctx, producer| ...)` / `.tap(|ctx, consumer| ...)` callbacks now receive the simpler types. @@ -19,6 +28,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Notes - `BufferOps::spawn_dispatcher` (a test-only utility) is unchanged — it calls `tokio::spawn` directly and does not depend on the deleted `Spawn` trait. +- `tests/stage_profiling.rs` dropped the `avg == total / call_count` assertion: it is a tautology (`avg_time_ns()` is *defined* as that quotient) and racy while the source task is still producing. No coverage lost. ## [0.6.0] - 2026-05-22 diff --git a/aimdb-tokio-adapter/src/buffer.rs b/aimdb-tokio-adapter/src/buffer.rs index afc9c22..67b24c4 100644 --- a/aimdb-tokio-adapter/src/buffer.rs +++ b/aimdb-tokio-adapter/src/buffer.rs @@ -81,7 +81,11 @@ impl Buffer for TokioBuffer { let _ = tx.send(value); } TokioBufferInner::Watch { tx } => { - let _ = tx.send(Some(value)); + // send_replace updates the slot unconditionally; send() would + // fail (and silently drop the value) when no receivers exist, + // which would also break peek() for producers that publish + // before any subscriber attaches. + tx.send_replace(Some(value)); } TokioBufferInner::Notify { slot, notify } => { *slot.lock().unwrap() = Some(value); @@ -129,6 +133,17 @@ impl aimdb_core::buffer::DynBuffer for Toki self } + fn peek(&self) -> Option { + match &*self.inner { + // watch::Sender::borrow() reads the slot non-destructively. + TokioBufferInner::Watch { tx } => tx.borrow().clone(), + // Same Mutex the Mailbox buffer already uses for the slot. + TokioBufferInner::Notify { slot, .. } => slot.lock().unwrap().clone(), + // broadcast has no canonical latest — see design 031 §SPMC Ring. + TokioBufferInner::Broadcast { .. } => None, + } + } + #[cfg(feature = "metrics")] fn metrics_snapshot(&self) -> Option { Some(::metrics(self)) @@ -1091,6 +1106,102 @@ mod tests { assert_eq!(remaining, vec![20, 30]); } + // ======================================================================== + // peek() Tests — non-destructive buffer-native reads (design 031) + // ======================================================================== + + mod peek_tests { + use super::super::*; + use aimdb_core::buffer::DynBuffer; + + #[tokio::test] + async fn test_peek_single_latest_empty() { + let buffer = TokioBuffer::::new(&BufferCfg::SingleLatest); + assert_eq!(buffer.peek(), None); + } + + #[tokio::test] + async fn test_peek_single_latest_returns_latest() { + let buffer = TokioBuffer::::new(&BufferCfg::SingleLatest); + DynBuffer::push(&buffer, 1); + DynBuffer::push(&buffer, 2); + DynBuffer::push(&buffer, 3); + assert_eq!(buffer.peek(), Some(3)); + } + + #[tokio::test] + async fn test_peek_single_latest_is_non_destructive() { + let buffer = TokioBuffer::::new(&BufferCfg::SingleLatest); + // Subscribe BEFORE push so the receiver's version counter advances + // on send_replace. (Watch receivers created after a push will only + // wake on the *next* push — that's the gap peek() exists to fill.) + let mut reader = Buffer::subscribe(&buffer); + DynBuffer::push(&buffer, 42); + + // Multiple peeks return the same value. + assert_eq!(buffer.peek(), Some(42)); + assert_eq!(buffer.peek(), Some(42)); + + // Peek did not consume the value from the subscriber's perspective. + assert_eq!(reader.recv().await.unwrap(), 42); + + // And peek still works after the subscriber received. + assert_eq!(buffer.peek(), Some(42)); + } + + #[tokio::test] + async fn test_peek_single_latest_works_without_subscriber() { + // The exact case the design 031 snapshot was originally added for: + // a producer pushes before anyone subscribes. peek() must see it. + let buffer = TokioBuffer::::new(&BufferCfg::SingleLatest); + DynBuffer::push(&buffer, 17); + assert_eq!(buffer.peek(), Some(17)); + } + + #[tokio::test] + async fn test_peek_mailbox_empty() { + let buffer = TokioBuffer::::new(&BufferCfg::Mailbox); + assert_eq!(buffer.peek(), None); + } + + #[tokio::test] + async fn test_peek_mailbox_returns_pending() { + let buffer = TokioBuffer::::new(&BufferCfg::Mailbox); + DynBuffer::push(&buffer, 7); + assert_eq!(buffer.peek(), Some(7)); + } + + #[tokio::test] + async fn test_peek_mailbox_drained_after_recv() { + let buffer = TokioBuffer::::new(&BufferCfg::Mailbox); + DynBuffer::push(&buffer, 99); + assert_eq!(buffer.peek(), Some(99)); + // Subscriber takes the slot. + let mut reader = Buffer::subscribe(&buffer); + assert_eq!(reader.recv().await.unwrap(), 99); + // After take(), peek sees the slot is empty. + assert_eq!(buffer.peek(), None); + } + + #[tokio::test] + async fn test_peek_mailbox_reflects_overwrite() { + let buffer = TokioBuffer::::new(&BufferCfg::Mailbox); + DynBuffer::push(&buffer, 1); + DynBuffer::push(&buffer, 2); + assert_eq!(buffer.peek(), Some(2)); + } + + #[tokio::test] + async fn test_peek_spmc_ring_returns_none() { + // Broadcast has no canonical latest — see design 031 §SPMC Ring. + let buffer = TokioBuffer::::new(&BufferCfg::SpmcRing { capacity: 8 }); + assert_eq!(buffer.peek(), None); + DynBuffer::push(&buffer, 1); + DynBuffer::push(&buffer, 2); + assert_eq!(buffer.peek(), None); + } + } + // ======================================================================== // Metrics Tests (feature-gated) // ======================================================================== diff --git a/aimdb-tokio-adapter/tests/remote_access_validation.rs b/aimdb-tokio-adapter/tests/remote_access_validation.rs new file mode 100644 index 0000000..29f7382 --- /dev/null +++ b/aimdb-tokio-adapter/tests/remote_access_validation.rs @@ -0,0 +1,47 @@ +//! Build-time validation for `.with_remote_access()` records (design 031). +//! +//! Removing `latest_snapshot` means a remote-access record reads and writes +//! straight through its buffer. With no buffer there is no storage to serve: +//! `record.get`/`latest()` return not_found and `record.set` silently discards. +//! `build()` must reject this loudly instead of letting it surface as a silent +//! runtime no-op. + +use aimdb_core::buffer::BufferCfg; +use aimdb_core::AimDbBuilder; +use aimdb_tokio_adapter::{TokioAdapter, TokioRecordRegistrarExt}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +struct Config { + threshold: u32, +} + +#[tokio::test] +async fn bufferless_remote_access_record_fails_build() { + let mut builder = AimDbBuilder::new().runtime(Arc::new(TokioAdapter)); + + builder.configure::("test::Config", |reg| { + // .with_remote_access() but no .buffer(...) — invalid since design 031. + reg.with_remote_access(); + }); + + assert!( + builder.build().await.is_err(), + "build() must reject a remote-access record with no buffer" + ); +} + +#[tokio::test] +async fn remote_access_record_with_buffer_builds() { + let mut builder = AimDbBuilder::new().runtime(Arc::new(TokioAdapter)); + + builder.configure::("test::Config", |reg| { + reg.buffer(BufferCfg::SingleLatest).with_remote_access(); + }); + + assert!( + builder.build().await.is_ok(), + "remote-access record with a buffer must build" + ); +} diff --git a/aimdb-tokio-adapter/tests/stage_profiling.rs b/aimdb-tokio-adapter/tests/stage_profiling.rs index 123bdfb..fa33fd0 100644 --- a/aimdb-tokio-adapter/tests/stage_profiling.rs +++ b/aimdb-tokio-adapter/tests/stage_profiling.rs @@ -72,7 +72,6 @@ async fn source_and_tap_stages_are_timed_and_named() { ); assert!(s.min_time_ns() <= s.avg_time_ns()); assert!(s.avg_time_ns() <= s.max_time_ns()); - assert_eq!(s.avg_time_ns(), s.total_time_ns() / s.call_count()); // Tap stage. let tap = prof.tap(0).expect("tap stage registered"); diff --git a/aimdb-wasm-adapter/CHANGELOG.md b/aimdb-wasm-adapter/CHANGELOG.md index 233e4a0..b4a4caf 100644 --- a/aimdb-wasm-adapter/CHANGELOG.md +++ b/aimdb-wasm-adapter/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- **`record.set` write path routes through `Producer` (M15, Design 031).** `bindings.rs` (`set`) and `ws_bridge.rs` now call `db.producer::(key)?.produce(val)` instead of the removed `TypedRecord::produce`. Internal only — no `#[wasm_bindgen]` / JS API change. + ### Removed (breaking) - **`impl Spawn for WasmAdapter` deleted (Issue #88).** Also removed the `unsafe impl Send/Sync for WasmAdapter` — the adapter is a ZST and auto-derives both. diff --git a/aimdb-wasm-adapter/src/bindings.rs b/aimdb-wasm-adapter/src/bindings.rs index 80cbbca..9097c61 100644 --- a/aimdb-wasm-adapter/src/bindings.rs +++ b/aimdb-wasm-adapter/src/bindings.rs @@ -545,12 +545,10 @@ where let val: T = serde_wasm_bindgen::from_value(value) .map_err(|e| JsError::new(&format!("Contract violation: {e}")))?; - let inner = db.inner(); - let typed = inner - .get_typed_record_by_key::(key) - .map_err(|e| JsError::new(&format!("{e:?}")))?; - - typed.produce(val); + // Single write path via Producer (design 031). + db.producer::(key) + .map_err(|e| JsError::new(&format!("{e:?}")))? + .produce(val); Ok(()) } diff --git a/aimdb-wasm-adapter/src/ws_bridge.rs b/aimdb-wasm-adapter/src/ws_bridge.rs index 17515bf..9863525 100644 --- a/aimdb-wasm-adapter/src/ws_bridge.rs +++ b/aimdb-wasm-adapter/src/ws_bridge.rs @@ -754,15 +754,15 @@ where { match serde_json::from_value::(json) { Ok(val) => { - let inner = db.inner(); - match inner.get_typed_record_by_key::(key) { - Ok(typed) => { - typed.produce(val); + // Single write path via Producer (design 031). + match db.producer::(key) { + Ok(producer) => { + producer.produce(val); } Err(e) => { web_sys::console::warn_1( &format!( - "[WsBridge] get_typed_record_by_key failed for key='{}': {:?}", + "[WsBridge] producer lookup failed for key='{}': {:?}", key, e ) .into(), diff --git a/docs/design/031-M15-remove-latest-snapshot.md b/docs/design/031-M15-remove-latest-snapshot.md new file mode 100644 index 0000000..b34307a --- /dev/null +++ b/docs/design/031-M15-remove-latest-snapshot.md @@ -0,0 +1,648 @@ +# Remove `latest_snapshot` — Buffer-Native Reads and Real Reader Slots + +**Version:** 0.2 (proposal) +**Status:** 🔵 Proposed +**Depends on:** [M14 — Remove `R` from typed handles](029-M14-remove-r-from-typed-handles.md) +**Last Updated:** May 27, 2026 +**Milestone:** M15 — Snapshot elimination + +--- + +## Table of Contents + +- [Summary](#summary) +- [Motivation](#motivation) + - [The snapshot is redundant for every buffer type](#the-snapshot-is-redundant-for-every-buffer-type) + - [The snapshot is updated twice via divergent paths](#the-snapshot-is-updated-twice-via-divergent-paths) + - [std/no\_std cfg duplication](#stdno_std-cfg-duplication) + - [The snapshot limits remote access to latest-only semantics](#the-snapshot-limits-remote-access-to-latest-only-semantics) +- [Current Architecture](#current-architecture) + - [How `latest_snapshot` is used today](#how-latest_snapshot-is-used-today) + - [Why it was added](#why-it-was-added) + - [The two divergent write paths](#the-two-divergent-write-paths) +- [Proposed Design](#proposed-design) + - [1. `DynBuffer::peek()` — buffer-native point-in-time read](#1-dynbufferpeek--buffer-native-point-in-time-read) + - [2. AimX `record.get` uses `peek()` directly](#2-aimx-recordget-uses-peek-directly) + - [3. AimX `record.drain` and `record.subscribe` use real `BufferReader`](#3-aimx-recorddrain-and-recordsubscribe-use-real-bufferreader) + - [4. Remove `latest_snapshot` from `TypedRecord`](#4-remove-latest_snapshot-from-typedrecord) + - [5. Remove the `latest_snapshot` Arc from `RecordWriter`](#5-remove-the-latest_snapshot-arc-from-recordwriter) + - [6. Merge the two divergent update paths](#6-merge-the-two-divergent-update-paths) +- [Per-Buffer-Type Analysis](#per-buffer-type-analysis) + - [SingleLatest (watch)](#singlelatestwwatch) + - [Mailbox (notify + mutex slot)](#mailbox-notify--mutex-slot) + - [SPMC Ring (broadcast)](#spmc-ring-broadcast) + - [Bufferless records](#bufferless-records) +- [no\_std Impact](#nostd-impact) +- [Implementation Plan](#implementation-plan) +- [Breaking Changes](#breaking-changes) +- [Out of Scope](#out-of-scope) + +--- + +## Summary + +`TypedRecord` carries a `latest_snapshot: Arc>>` that +stores a redundant copy of every produced value. For `SingleLatest` records +the value already lives in the `watch::Sender`'s slot; for `Mailbox` records +it already lives in the `Mutex>` slot inside the buffer. The +snapshot is a second copy that adds a lock acquire on every `produce()` and +serves only the AimX `record.get` path. + +This design replaces it with two things: + +1. **`DynBuffer::peek()`** — a new optional method on the buffer trait that + reads the buffer's own native storage non-destructively. For `SingleLatest` + this is `watch_tx.borrow().clone()`; for `Mailbox` it is `slot.lock().clone()`; + for SPMC Ring it returns `None` (no canonical latest in a history buffer). + +2. **Real `BufferReader` slots for AimX** — `record.drain` and + `record.subscribe` already use a per-connection `Box>`. + `record.get` is changed to use `peek()` on the buffer instead of reading + the snapshot. The reader IS the buffer reader — not a shadow copy. + +The `latest_snapshot` field is removed entirely. For no\_std the benefit is +larger: the `spin::Mutex>` field and all its lock sites on the hot +`produce()` path disappear completely, since AimX does not exist in +embedded targets. + +--- + +## Motivation + +### The snapshot is redundant for every buffer type + +`TokioBufferInner` and `EmbassyBufferInner` already store the value natively: + +| Buffer type | Native storage | Snapshot storage | +|---|---|---| +| `SingleLatest` (Tokio) | `watch::Sender>` slot | `Arc>>` | +| `SingleLatest` (Embassy) | `Watch` slot | `Arc>>` | +| `Mailbox` (Tokio) | `Arc>>` + Notify | `Arc>>` | +| `Mailbox` (Embassy) | `Channel<_, T, 1>` | `Arc>>` | +| `SPMC Ring` | ordered ring (no canonical latest) | `Arc>>` | + +For `SingleLatest` and `Mailbox`, the value is stored twice. Every `produce()` +pays for two writes. For `SingleLatest` this is the most visible win: +`watch::Sender::send()` is lock-free, so removing the snapshot mutex makes +the produce path truly lock-free — the only mutex on `SingleLatest` produce +today is the snapshot one. For `Mailbox` it's a smaller change: two +uncontended `StdMutex` acquisitions collapse to one. For SPMC Ring the +snapshot is a third store alongside the ring slot and the broadcast queue. + +### The snapshot is updated twice via divergent paths + +Two independent code paths update the snapshot, with duplicated `#[cfg]` guards: + +```rust +// Path 1 — TypedRecord::produce() (called by WASM adapter, builder init) +#[cfg(feature = "std")] +*self.latest_snapshot.lock().unwrap() = Some(val.clone()); +#[cfg(not(feature = "std"))] +*self.latest_snapshot.lock() = Some(val.clone()); + +// Path 2 — RecordWriter::push() (called by Producer via WriteHandle) +#[cfg(feature = "std")] +*self.latest_snapshot.lock().unwrap() = Some(value.clone()); +#[cfg(not(feature = "std"))] +*self.latest_snapshot.lock() = Some(value.clone()); +``` + +Both also duplicate the buffer push. There is no invariant preventing the two +paths from diverging (e.g., one being updated when the other is not). + +### std/no\_std cfg duplication + +The snapshot field and all its call sites repeat the +`#[cfg(feature = "std")] / #[cfg(not(feature = "std"))]` pair at approximately +ten locations in `typed_record.rs` and `buffer/writer.rs` alone (~60–80 +cfg-conditional lines for a single conceptual thing). Removing the field +collapses all of them. + +### The snapshot limits remote access to latest-only semantics + +`record.get` today reads the single snapshot value — it can only ever return +the last value produced. For `SPMC Ring` records this is architecturally wrong: +the ring's purpose is to preserve a bounded history of N values for independent +consumers. A caller using AimX `record.get` cannot observe the buffer's window; +they always see only the most recent item. A real `BufferReader` for the ring +gives access to the full current window. + +--- + +## Current Architecture + +### How `latest_snapshot` is used today + +``` +produce(T) + ├─ TypedRecord::produce() (WASM adapter, builder) + │ ├─ snapshot.lock() = Some(T) ← snapshot write + │ └─ buffer.push(T) + └─ RecordWriter::push() (Producer via WriteHandle) + ├─ snapshot.lock() = Some(T) ← snapshot write (duplicate) + └─ buffer.push(T) + +record.get (AimX) + └─ db.try_latest_as_json() + └─ AnyRecord::latest_json() + └─ snapshot.lock().clone() ← snapshot read + +record.drain (AimX) + └─ conn_state.drain_readers[name].try_recv_json() ← real BufferReader ✓ + +record.subscribe (AimX) + └─ AnyRecord::subscribe_json() + └─ buffer.subscribe_boxed() ← real BufferReader ✓ +``` + +`record.drain` and `record.subscribe` already use real `BufferReader`s. +Only `record.get` reads the snapshot. + +### Why it was added + +The snapshot was introduced for `record.get` to provide a point-in-time read +without subscribing a persistent reader. Subscribing a `watch::Receiver` at +request time gives `has_changed() = false` (the receiver starts at the current +version), so `try_recv()` returns `BufferEmpty` even when a value exists. +`borrow()` on the watch sender bypasses this — which is exactly what `peek()` +exposes. + +### The two divergent update paths + +`TypedRecord::produce()` exists alongside `RecordWriter::push()`. Both update +the snapshot and the buffer. Call sites of `TypedRecord::produce()`: + +- `aimdb-wasm-adapter/src/bindings.rs:553` — WASM inbound JS→record handler +- `aimdb-wasm-adapter/src/ws_bridge.rs:760` — WebSocket bridge `produce_from_json` +- `aimdb-core/src/builder.rs:1106` — body of the public `AimDb::produce(key, value)` + method (per-call key lookup convenience wrapper) +- `aimdb-core/src/typed_record.rs:1716` — inside `AnyRecord::set_from_json()`, + the AimX `record.set` path +- (transitive) `aimdb-core/src/database.rs:84` and `aimdb-sync/src/handle.rs:407` + call `AimDb::produce` and are covered once the wrapper is migrated + +All of these bypass `RecordWriter` and therefore bypass `RecordMetadataTracker` +(in std), meaning metadata (last-updated timestamp) is not updated on these +paths. After unification, every produce path will mark metadata — see +[Breaking Changes](#breaking-changes). + +--- + +## Proposed Design + +### 1. `DynBuffer::peek()` — buffer-native point-in-time read + +Add an optional method to the `DynBuffer` trait in `aimdb-core`: + +```rust +pub trait DynBuffer: Send + Sync { + fn push(&self, value: T); + fn subscribe_boxed(&self) -> Box + Send>; + fn as_any(&self) -> &dyn core::any::Any; + + /// Non-destructive read of the buffer's current value. + /// + /// Returns `Some(T)` if the buffer holds a current value that can be read + /// without consuming it from any consumer's perspective. Returns `None` + /// if the buffer type has no such concept (e.g., SPMC Ring) or if no + /// value has been produced yet. + /// + /// This method does NOT advance any reader position. It is equivalent to + /// a non-blocking borrow of the buffer's internal state. + fn peek(&self) -> Option { + None + } +} +``` + +**Tokio adapter implementations:** + +```rust +// TokioBuffer::peek() +match &*self.inner { + TokioBufferInner::Watch { tx } => { + tx.borrow().clone() // reads from sender's slot, no lock, no copy unless caller clones + } + TokioBufferInner::Notify { slot, .. } => { + slot.lock().unwrap().clone() // same Mutex already held by the Mailbox buffer + } + TokioBufferInner::Broadcast { .. } => None, +} +``` + +**Embassy adapter implementations** (when embedded introspection is needed): + +```rust +// EmbassyBuffer::peek() +match &*self.inner { + EmbassyBufferInner::Watch(watch) => { + // Watch::try_get(&self) reads the current value without claiming one + // of the bounded WATCH_N receiver slots. Do NOT use watch.receiver(), + // which allocates a receiver and can fail when N is exhausted. + watch.try_get() + } + EmbassyBufferInner::Mailbox(channel) => { + // Channel<_, T, 1> has no non-destructive peek; None is acceptable + None + } + EmbassyBufferInner::SpmcRing(_) => None, +} +``` + +The Embassy implementation of `peek()` is optional — the default `None` is +correct for no\_std targets where AimX does not exist. + +### 2. AimX `record.get` uses `peek()` directly + +Replace `db.try_latest_as_json()` → `snapshot.lock().clone()` with: + +```rust +// handle_record_get: +let value = record.buffer().peek(); // Option directly from buffer native storage +match value { + Some(v) => serialize_to_json(&v), + None => Response::error(request_id, "not_found", "No value available"), +} +``` + +`AnyRecord::latest_json()` is rewritten to call `buf.peek()` and serialise +directly, bypassing `RecordValue` for this path. `latest()` and +`RecordValue` remain in place for the typed in-process API and are dealt +with separately — see [Out of Scope](#out-of-scope). After Step 3 the read +path is `record.get` → `latest_json` → `buffer.peek()` → buffer-native storage. + +**SPMC Ring (Broadcast) records — explicit semantic choice.** The Tokio +broadcast channel does not expose a non-destructive peek. There are two +options, and this design picks (B): + +(A) **Maintain a "latest" alongside the ring.** Restores today's snapshot +behaviour for the ring but partially defeats the proposal — it reintroduces +a second store on the produce hot path, only for one buffer type. + +(B) **`peek()` returns `None`, `record.get` returns `not_found`.** Clients +that need "give me the most recent value" on a ring use `record.drain` (which +consumes from their own per-connection cursor). Clients that need +point-in-time reads use `SingleLatest`, which is the buffer designed for it. +This is a behaviour change for ring-buffered records that previously called +`record.get`; callers must move to `record.drain` or reconfigure the record. + +The earlier draft proposed a fallback "drain the per-connection reader from +inside `record.get`" — that is rejected because it consumes from the cursor +shared with `record.drain`, so interleaving `get` and `drain` on the same +connection would silently drop values from the drain stream. + +### 3. AimX `record.drain` and `record.subscribe` use real `BufferReader` + +This is already implemented. The per-connection `drain_readers` map in +`ConnectionState` holds a `Box` per record, which is a +`BufferReader` wrapped with a JSON serialization adapter. No changes needed +here. + +The AimX reader IS a buffer reader — not a shadow copy. It has its own +independent position in the SPMC Ring, watches its own version counter for +`SingleLatest`, and drains the `Mailbox` slot destructively. This is identical +to how any `Consumer` operates. + +### 4. Remove `latest_snapshot` from `TypedRecord` + +```rust +// Before +pub struct TypedRecord { + // ... + #[cfg(feature = "std")] + latest_snapshot: Arc>>, + #[cfg(not(feature = "std"))] + latest_snapshot: Arc>>, + // ... +} + +// After +pub struct TypedRecord { + // latest_snapshot removed entirely + // ... +} +``` + +The `latest()` method on `TypedRecord` / `AnyRecord` is replaced: + +```rust +// Before — reads snapshot +fn latest_json(&self) -> Option { + let value = self.latest_snapshot.lock().unwrap().clone()?; + // serialize... +} + +// After — delegates to buffer peek +fn latest_json(&self) -> Option { + let buf = self.buffer.as_ref()?; + let value = buf.peek()?; + // serialize... +} +``` + +### 5. Remove the `latest_snapshot` Arc from `RecordWriter` + +`RecordWriter` in `aimdb-core/src/buffer/writer.rs` carries +`latest_snapshot: Arc>>`. Its `push()` method updates it. +Both are removed: + +```rust +// Before +pub(crate) struct RecordWriter { + buffer: Arc>, + #[cfg(feature = "std")] + latest_snapshot: Arc>>, + #[cfg(not(feature = "std"))] + latest_snapshot: Arc>>, + #[cfg(feature = "std")] + metadata: RecordMetadataTracker, +} + +// After +pub(crate) struct RecordWriter { + buffer: Arc>, + #[cfg(feature = "std")] + metadata: RecordMetadataTracker, +} +``` + +`RecordWriter::push()` becomes: + +```rust +fn push(&self, value: T) { + self.buffer.push(value); + #[cfg(feature = "std")] + self.metadata.mark_updated(); +} +``` + +The two `#[cfg]` constructor variants collapse into one. The `writer_handle()` +method in `TypedRecord` no longer passes the snapshot Arc. + +### 6. Merge the two divergent update paths + +`TypedRecord::produce()` is deleted. All call sites migrate to going through +`RecordWriter::push()` (directly or via `Producer`): + +- `aimdb-core/src/builder.rs:1106` — body of the public + `AimDb::produce(key, value)`. Rewrite as + `self.producer::(key)?.produce(value)`. This is a per-call helper that + performs a key lookup and constructs a fresh `Producer` each call; the + docstring already steers hot-path users to `db.producer::(key)` once + and reuse. `Database::produce()` (`database.rs:84`) and + `aimdb-sync/src/handle.rs:407` go through this wrapper and need no + further changes. +- `aimdb-core/src/typed_record.rs:1716` — inside + `AnyRecord::set_from_json()`. Replace `self.produce(value)` with a push + through `self.writer_handle()`: + ```rust + self.writer_handle().push(value); + ``` + This keeps `set_from_json` synchronous and avoids constructing a + `Producer` purely to throw it away. `writer_handle()` is already + `pub(crate)` and exists for this kind of internal use. +- `aimdb-wasm-adapter/src/bindings.rs:553` and `ws_bridge.rs:760` — both + perform a `get_typed_record_by_key::(key)` lookup per call and then + call `typed.produce(val)`. Replace with `typed.writer_handle().push(val)` + for the same reason (no behavioural change vs. a one-shot `Producer`, + but avoids the extra `Arc` clone). A future optimisation can cache a + `Producer` per known record key at adapter build time, but that is + not required for this milestone. + +After these migrations, `TypedRecord::produce()` has zero callers and is +removed. `RecordWriter::push()` is the single update path. + +--- + +## Per-Buffer-Type Analysis + +### SingleLatest (watch) + +`TokioBufferInner::Watch { tx: watch::Sender> }` already holds the +value in its own internal slot. `tx.borrow()` returns a reference into that +slot without copying. `peek()` clones it out for the caller. + +**Result:** zero duplication. The value exists exactly once — in the watch +sender. AimX `record.get` reads it via `peek()`. AimX `record.subscribe` +creates a `watch::Receiver` clone, which tracks its own version counter and +streams future changes via `recv()`. No extra allocation anywhere. + +### Mailbox (notify + mutex slot) + +`TokioBufferInner::Notify { slot: Arc>>, notify }` already +holds the value in `slot`. `peek()` locks and clones it — the same operation +the snapshot previously duplicated with a second mutex. + +**Result:** zero duplication. The value exists exactly once — in the mailbox +slot. Removing the snapshot removes one of two identical mutexes. + +Note: `peek()` on `Mailbox` is non-destructive (it clones without taking), +unlike `try_recv()` which calls `slot.take()`. AimX `record.get` uses `peek()`; +AimX `record.drain` uses `try_recv()` via the drain reader and consumes the +slot, matching drain semantics. + +**Behaviour change vs. today.** Currently the snapshot is independent of the +mailbox slot, so a sequence `record.set` → `record.drain` → `record.get` +returns the value from snapshot on the final call. After this change, the +final `record.get` returns `not_found` because the drain already took the +slot. This matches Mailbox's documented "single-consumer take" semantics and +is the intended result, but it is a visible change for clients that interleave +`get` and `drain` on a Mailbox record. + +### SPMC Ring (broadcast) + +`TokioBufferInner::Broadcast { tx: broadcast::Sender }` is a ring buffer +with no canonical "latest" concept. `peek()` returns `None`. + +**For `record.get`:** returns `not_found`. See the explicit choice and +rationale in [§2](#2-aimx-recordget-uses-peek-directly). + +**Result:** no duplication. The ring holds values in its own allocation; +nothing else stores them. Clients that need point-in-time reads on the +"latest produced" should use `SingleLatest`; clients that want the buffered +history should subscribe via `record.drain`. + +### Bufferless records + +Records with no buffer (passive/settable records that have no consumers and +no `BufferCfg`) have no native storage. + +`peek()` on a `None` buffer returns `None`. `record.get` returns `not_found`. +`record.set` (write path) does not need to maintain any snapshot — it produces +via `RecordWriter` which pushes to the buffer. If a buffer is later added, the +next produced value becomes the first peekable value. + +This is a semantic change: today `record.get` on a settable record returns its +last-set value via the snapshot. After this change it returns `not_found` until +a value has been produced AND a buffer is configured. If this use case needs +preserving, such records should use a `SingleLatest` buffer (one slot, zero +overhead beyond the watch sender). + +--- + +## no\_std Impact + +AimX (Unix socket, JSON protocol) is entirely `#[cfg(feature = "std")]`. No +no\_std code path ever calls `DynBuffer::peek()`, accesses `drain_readers`, or +needs point-in-time snapshot reads. The `spin::Mutex>` field is +used only: + +1. On every `produce()` call — to update the snapshot (hot path, wasted work) +2. By `TypedRecord::latest()` — which is only called by std-gated code + +Removing the field from the no\_std struct eliminates: + +- One `spin::Mutex` acquisition per `produce()` on the hot path +- One `Option` clone per `produce()` on the hot path +- ~10 `#[cfg(not(feature = "std"))]` blocks in `typed_record.rs` and `writer.rs` + +The `spin` crate dependency is **not** removed by this milestone: +`producer_service`, `consumers`, and `transform` on `TypedRecord` still use +`spin::Mutex` in no\_std. Eliminating the dep would require unifying those +fields too — out of scope. + +The Embassy adapter does not need to implement `DynBuffer::peek()`. The +default `fn peek(&self) -> Option { None }` is correct and sufficient. + +If embedded introspection is ever needed (e.g., reading current state without +subscribing a consumer), `embassy_sync::watch::Watch::receiver().try_get()` +provides it with zero additional storage. + +--- + +## Implementation Plan + +The steps below are ordered so the codebase compiles and tests pass at every +stage. + +**Step 1 — Add `DynBuffer::peek()` with default `None`** +Add the method to the trait in `aimdb-core/src/buffer/traits.rs`. No existing +code changes. All adapters inherit the default. Green. + +**Step 2 — Implement `peek()` in `TokioBuffer`** +Add `Watch` and `Notify` arms in `aimdb-tokio-adapter/src/buffer.rs` +(`Broadcast` uses the default `None`). Add unit tests for both. Green. + +**Step 3 — Migrate `record.get` to `peek()`** +Rewrite `AnyRecord::latest_json()` for `TypedRecord` to call +`self.buffer.as_ref()?.peek()` and serialize directly via the existing +`json_serializer` closure, bypassing `RecordValue`. Keep `latest()` and +`RecordValue` untouched — they remain on the typed in-process API and +still read the snapshot for now. `handle_record_get` in +`aimdb-core/src/remote/handler.rs` continues to call +`db.try_latest_as_json()`; the change is internal to `latest_json`. +Update/add integration tests for `record.get` on `SingleLatest`, `Mailbox`, +and SPMC Ring (the last asserting `not_found`). Green. + +**Step 4 — Migrate all `TypedRecord::produce()` callers** +This step must remove **every** caller before the field is deleted in Step 5, +otherwise Step 5 won't compile. + +- `aimdb-core/src/typed_record.rs:1716` (`set_from_json`) — replace + `self.produce(value)` with `self.writer_handle().push(value)`. The + buffer-presence check immediately above this line stays. +- `aimdb-core/src/builder.rs:1106` (`AimDb::produce`) — rewrite the body as + `self.producer::(key)?.produce(value)`. `Database::produce` and + `aimdb-sync/src/handle.rs` are transitive and need no edits. +- `aimdb-wasm-adapter/src/bindings.rs:553` and + `aimdb-wasm-adapter/src/ws_bridge.rs:760` — replace `typed.produce(val)` + with `typed.writer_handle().push(val)`. + +After this step `TypedRecord::produce()` itself is still in place but unused +externally; leaving it for one more step keeps the diff focused. + +**Step 5 — Remove `latest_snapshot` from `RecordWriter`** +Remove the field. Remove the snapshot update from `push()`. Remove the +snapshot Arc parameter from `RecordWriter::new()` and from the +`writer_handle()` call site in `typed_record.rs`. Collapse the two `#[cfg]` +constructor variants into one. Green. + +**Step 6 — Remove `latest_snapshot` from `TypedRecord`** +Delete `TypedRecord::produce()` (now provably unused after Step 4). Remove +the `latest_snapshot` field and its constructor initialisers. Rewrite +`TypedRecord::latest()`: + +```rust +pub fn latest(&self) -> Option> { + let value = self.buffer.as_ref()?.peek()?; + #[cfg(feature = "std")] + { Some(RecordValue::new(value, self.json_serializer.clone())) } + #[cfg(not(feature = "std"))] + { Some(RecordValue::new(value, None)) } +} +``` + +This makes `latest()` consistent with `latest_json()` (Step 3) — both read +buffer-native storage. Settable records with no buffer now return `None` +from `latest()`; see [Breaking Changes](#breaking-changes). `RecordValue` +and its `json_serializer` Arc stay — they belong to the typed API and are +not part of this milestone. Green. + +**Step 7 — Out of scope: `JsonRecord` supertrait** +The `json_serializer` / `json_deserializer` closure fields on `TypedRecord` +still exist after this milestone. A follow-up proposal will replace them +with a blanket `impl` `JsonRecord` +supertrait. Tracked separately. + +--- + +## Breaking Changes + +**`TypedRecord::latest()` semantics** — return type is unchanged +(`Option>`), but the source is now `buffer.peek()`. Records +with no buffer (today: settable records that store only in the snapshot) +will return `None`. Mitigation: any settable record that needs `latest()` +or `record.get` to work must be configured with `SingleLatest` (one slot, +overhead is one `watch::Sender>` — strictly less than the snapshot +mutex it replaces). Audit `with_remote_access()` call sites at +implementation time and either attach a `SingleLatest` automatically in the +builder or fail loudly at `build()` if no buffer is configured for a +remote-readable record. + +**`record.get` on SPMC Ring records** — today returns the most recently +produced value via the snapshot. After this change returns `not_found`. +Clients that need history use `record.drain`; clients that need +point-in-time reads should be configured with `SingleLatest`. See +[§2](#2-aimx-recordget-uses-peek-directly) and +[SPMC Ring §](#spmc-ring-broadcast). + +**`record.get` on Mailbox after `record.drain`** — today survived a drain +via the independent snapshot; after this change a drained Mailbox slot is +empty and `record.get` returns `not_found` until the next produce. See +[Mailbox §](#mailbox-notify--mutex-slot). + +**Metadata `last_update` on previously-divergent paths** — values produced +via `set_from_json` (AimX `record.set`), the public `AimDb::produce` +wrapper, and the WASM adapter previously did not call +`RecordMetadataTracker::mark_updated()`. After unification all paths do. +Behaviour change is benign (metadata becomes more accurate) but is +observable via `record.metadata`. + +**`RecordWriter::new()` signature** — internal `pub(crate)`; the +`latest_snapshot` Arc parameter is removed. + +**`TypedRecord::produce()` removed** — internal. All call sites are +migrated in Step 4. + +--- + +## Out of Scope + +- **`JsonRecord` supertrait extraction** — removing `json_serializer` / + `json_deserializer` stored closures in favour of blanket impls on + `T: Serialize + DeserializeOwned`. `RecordValue`'s `json_serializer` + Arc lives or dies with that work; this milestone leaves it in place. + +- **`std`/`no_std` Mutex unification across the remaining `TypedRecord` + fields** (`producer_service`, `consumers`, `transform`). These still use + `spin::Mutex` in no_std after this milestone, so the `spin` crate + dependency is not removed. Tracked separately. + +- **Embassy `peek()` implementation** — optional and additive; can be done + as a follow-up when embedded introspection is required. The trait default + `None` is correct. + +- **Caching `Producer` per record key in the WASM adapter** — Step 4 + uses a per-call `writer_handle()` for `bindings.rs`/`ws_bridge.rs` to keep + the diff small. A future optimisation can pre-build a key→`Producer` + map at adapter init time. diff --git a/docs/design/032-M16-aimx-json-codec.md b/docs/design/032-M16-aimx-json-codec.md new file mode 100644 index 0000000..5df1747 --- /dev/null +++ b/docs/design/032-M16-aimx-json-codec.md @@ -0,0 +1,515 @@ +# Fold the AimX serializer/deserializer into a trait-derived JSON codec + +**Version:** 0.2 (implemented — codec is no_std-capable behind the `json-serialize` feature) +**Status:** ✅ Implemented +**Depends on:** [M15 — Remove `latest_snapshot`](031-M15-remove-latest-snapshot.md) +**Last Updated:** May 28, 2026 +**Milestone:** M16 — Remote-access codec extraction + +--- + +## Table of Contents + +- [Summary](#summary) +- [Motivation](#motivation) + - [The JSON fields are a hand-rolled vtable, not data](#the-json-fields-are-a-hand-rolled-vtable-not-data) + - [The connector layer already solved this exact problem](#the-connector-layer-already-solved-this-exact-problem) + - [Post-M15, AimX reads and writes the buffer like any connector](#post-m15-aimx-reads-and-writes-the-buffer-like-any-connector) + - [The capability is fragmented across three places](#the-capability-is-fragmented-across-three-places) +- [Current Architecture](#current-architecture) +- [Proposed Design](#proposed-design) + - [1. `RemoteSerialize` — the capability trait](#1-remoteserialize--the-capability-trait) + - [2. `JsonCodec` — the type-erased storage form](#2-jsoncodect--the-type-erased-storage-form) + - [3. One codec field replaces two closures](#3-one-codec-field-replaces-two-closures) + - [4. `with_remote_access()` requires the trait](#4-with_remote_access-requires-the-trait) + - [5. `JsonReaderAdapter` holds the codec](#5-jsonreaderadapter-holds-the-codec) + - [6. The three `AnyRecord` methods delegate to the codec](#6-the-three-anyrecord-methods-delegate-to-the-codec) + - [7. Remove `with_read_only_serialization`](#7-remove-with_read_only_serialization) +- [Alternatives Considered](#alternatives-considered) +- [no\_std Impact](#nostd-impact) +- [Implementation Plan](#implementation-plan) +- [Breaking Changes](#breaking-changes) +- [Open Questions](#open-questions) +- [Out of Scope](#out-of-scope) + +--- + +## Summary + +`TypedRecord` carries two loose `Option>` fields — +`json_serializer` and `json_deserializer` — set by `.with_remote_access()`. +A third fragment, `JsonReaderAdapter`, bundles a bare serializer closure with +a `BufferReader` for the subscription path. All three exist for one reason: +the type-erased `AnyRecord` methods (`latest_json`, `subscribe_json`, +`set_from_json`) must turn `T` into/out of `serde_json::Value` from a context +where `T: Serialize` is not in scope. + +This is the same capture-the-codec-at-config-time pattern the **connector +layer** already uses (`SerializerFn` / `DeserializerFn` on `ConnectorLink` / +`InboundConnectorLink`). After M15, AimX reads and writes records through the +same buffer surface (`peek()` / `subscribe()` / `push()`) that connectors use. +AimX *is* a connector — JSON wire, RPC transport — but its codec is modelled +as bespoke record fields rather than as a connector-style codec. + +This design: + +1. Adds a capability trait **`RemoteSerialize`** in a new top-level module + **`crate::codec`**, blanket-implemented for every `T: Serialize + + DeserializeOwned`. This is the AimX/connector analogue of the data-contract + traits (`Streamable`, `Linkable`) — a named contract that "unlocks" the JSON + codec. Every `Streamable` type satisfies it for free. +2. Adds an object-safe **`JsonCodec`** (methods `encode` / `decode`) plus a + zero-sized `SerdeJsonCodec` built only under that bound — the type-erased + storage form. +3. Replaces the two closure fields with **one** `remote_codec: + Option>>`, threaded through `latest_json`, + `set_from_json`, `subscribe_json`, `JsonReaderAdapter`, and `RecordValue`. +4. Removes the dead `with_read_only_serialization` API. + +The codec is gated by a dedicated **`json-serialize`** feature (not `std`), so it is +**no_std + alloc compatible** — `serde_json` runs on `alloc` alone (the same way +the `Linkable` data contracts already serialize on embedded targets). `std` +enables `json-serialize` automatically, so AimX is unaffected; embedded targets can opt in +to get `record.latest()?.as_json()` without std/AimX. The AimX type-erased entry +points (`latest_json` / `subscribe_json` / `set_from_json`) remain `std`-gated. + +Net: three fragments collapse to one trait-derived object, the serde bound is +named instead of ad-hoc, the codec is reusable on no_std, and the JSON concern +stops looking foreign inside `TypedRecord`. It does **not** relocate the codec +off the record or fold AimX into the connector spawn machinery — those are +discussed under [Alternatives](#alternatives-considered) and deferred. + +This is the follow-up named in +[M15 Step 7 / Out of Scope](031-M15-remove-latest-snapshot.md#out-of-scope). + +--- + +## Motivation + +### The JSON fields are a hand-rolled vtable, not data + +Every other field on `TypedRecord` speaks `T` natively: the buffer stores +`T`, producer/consumer/transform move `T`, metadata stores only the type *name*. +The two JSON fields are different in kind — they capture +`serde_json::to_value::` / `from_value::` at the one site where +`T: Serialize` is statically known (`with_remote_access`, `typed_record.rs:941`) +and stash the result for replay from the type-erased `AnyRecord` impl. Rust has +no specialization, so the single blanket `impl AnyRecord for TypedRecord` +(`typed_record.rs:1246`) cannot conditionally expose JSON only for serializable +`T`. Some stored capability is unavoidable — but it should be *one named thing*, +not two loose closures plus an adapter. + +### The connector layer already solved this exact problem + +Connectors face the identical "cross the type-erasure boundary with a captured +codec" problem and solve it on a **link**: + +| Concern | Connector | AimX (today) | +|---|---|---| +| out (T → wire) | `ConnectorLink.serializer: SerializerKind` | `json_serializer: Arc` | +| in (wire → T) | `InboundConnectorLink.deserializer: DeserializerFn` | `json_deserializer: Arc` | +| streaming out | `consumer_factory` + `serializer` | `JsonReaderAdapter` | +| captured at | `.link_to(...).with_serializer(...)` | `.with_remote_access()` | +| stored as | type-erased on the link struct | loose fields on the record | + +The connector evidence says the codec is a *link-shaped* concern, captured at +config time, not a core record field. + +### Post-M15, AimX reads and writes the buffer like any connector + +M15 removed `latest_snapshot`; `record.get` now reads `buffer.peek()`, +`record.subscribe` uses a real `BufferReader`, and `record.set` pushes through +`WriteHandle`. AimX no longer has any private storage — it operates on the same +buffer surface as connectors. The three operations map one-to-one onto connector +lanes: + +``` + record.get peek() + codec.to_json (T → Value) ── outbound point read + record.subscribe subscribe() + codec.to_json (T → Value) ── outbound stream + record.set codec.from_json + push() (Value → T) ── inbound write +``` + +### The capability is fragmented across three places + +`json_serializer`, `json_deserializer`, and `JsonReaderAdapter` are one concern +("turn this record's `T` into/out of JSON across the erasure boundary") +expressed three times. Consolidating them removes the ad-hoc inline +`serde_json` closures and the bare-`Fn` field on the adapter. + +--- + +## Current Architecture + +``` + TypedRecord + ┌─────────────────────────────────────────────────────────────────┐ + │ DATA PLANE (speaks T) producer · consumers · transform · │ + │ buffer · connectors · metadata │ + ├─────────────────────────────────────────────────────────────────┤ + │ SERIALIZATION (speaks serde_json::Value) ← foreign concern │ + │ json_serializer OptionValue>> │ + │ json_deserializer OptionT>> │ + └─────────────────────────────────────────────────────────────────┘ + + AnyRecord (T erased) TypedRecord field used JsonReaderAdapter + ─────────────────── ────────────────────── ───────────────── + latest_json() ──► json_serializer — + set_from_json() ──► json_deserializer — + subscribe_json() ──► json_serializer ───────► { inner: Reader, + serializer: Fn } +``` + +Entry points: `record.get` → `db.try_latest_as_json()` (`builder.rs:276`) → +`AnyRecord::latest_json()`; `record.subscribe` → `subscribe_json()`; +`record.set` → `set_from_json()`. + +--- + +## Proposed Design + +``` + TypedRecord + ┌─────────────────────────────────────────────────────────────────┐ + │ DATA PLANE (unchanged) │ + ├─────────────────────────────────────────────────────────────────┤ + │ remote_codec: Option>> ← one named thing │ + └─────────────────────────────────────────────────────────────────┘ + ▲ built from a ZST under the T: RemoteSerialize bound + │ + latest_json / set_from_json / subscribe_json / RecordValue::as_json + all route through remote_codec.{encode, decode} +``` + +### 1. `RemoteSerialize` — the capability trait + +New, **feature `json-serialize`** (no_std + alloc compatible), in `aimdb-core` module +`crate::codec`, re-exported from the crate root: + +```rust +/// A record type that can be encoded to / decoded from the JSON wire format. +/// +/// Blanket-implemented for every `serde` type, so any `T: Serialize + +/// DeserializeOwned` gets a JSON codec for free. This is the AimX/connector +/// analogue of the data-contract capability traits (`Streamable`, `Linkable`): +/// a named contract that unlocks a feature. +pub trait RemoteSerialize: Sized { + fn to_json(&self) -> Option; + fn from_json(value: &serde_json::Value) -> Option; +} + +impl RemoteSerialize for T +where + T: serde::Serialize + serde::de::DeserializeOwned, +{ + fn to_json(&self) -> Option { + serde_json::to_value(self).ok() + } + fn from_json(value: &serde_json::Value) -> Option { + serde_json::from_value(value.clone()).ok() + } +} +``` + +The whole `crate::codec` module is `#[cfg(feature = "json-serialize")]`. The feature is +`json-serialize = ["alloc", "serde_json"]`, and `std` enables it. `serde` and `serde_json` +are already `default-features = false` + `alloc` in the workspace, so nothing +pulls in `std`. + +**Why a new trait instead of reusing `aimdb-data-contracts::Streamable`:** the +dependency runs `aimdb-data-contracts` → `aimdb-core` +(`aimdb-data-contracts/Cargo.toml:28`), not the reverse. Bounding a core method +on `Streamable` would be a cycle. `RemoteSerialize` is the core-local +equivalent, and because `Streamable: Serialize + DeserializeOwned`, every +`Streamable` (and every `Linkable`) type satisfies `RemoteSerialize` +automatically through the blanket impl. This is the direct answer to "unlock +the feature with a trait, like the data contracts" without inverting the +dependency graph. + +### 2. `JsonCodec` — the type-erased storage form + +`TypedRecord`'s `AnyRecord` impl cannot carry a `T: RemoteSerialize` +bound (it must cover non-serializable `T` too), so the capability is stored as +an object. This is the AimX counterpart of the connector layer's +`SerializerFn` / `DeserializerFn`: + +Method names are `encode` / `decode` (not `to_json` / `from_json`) — a `from_*` +method taking `&self` trips clippy's `wrong_self_convention`, and `encode` / +`decode` read naturally for a codec object. + +```rust +/// Type-erased JSON codec for one record type. Stored where T's serde bounds +/// are out of scope (inside the blanket `AnyRecord` impl). +pub trait JsonCodec: Send + Sync { + fn encode(&self, value: &T) -> Option; + fn decode(&self, value: &serde_json::Value) -> Option; +} + +/// Zero-sized serde-backed codec. Constructed only under `T: RemoteSerialize`, +/// so the erased `JsonCodec` it yields is guaranteed valid. +pub struct SerdeJsonCodec; + +impl JsonCodec for SerdeJsonCodec { + fn encode(&self, value: &T) -> Option { + value.to_json() + } + fn decode(&self, value: &serde_json::Value) -> Option { + T::from_json(value) + } +} +``` + +`JsonCodec` is object-safe (`T` is fixed per record), so +`Arc>` is well-formed. The two private type aliases +`JsonSerializer` / `JsonDeserializer` (`typed_record.rs:54-59`) are +deleted. + +### 3. One codec field replaces two closures + +```rust +// Before +#[cfg(feature = "std")] +json_serializer: Option>, +#[cfg(feature = "std")] +json_deserializer: Option>, + +// After +/// Type-erased JSON codec; `Some` iff the record opted in via with_remote_access(). +/// RecordValue::as_json and (on std) the AimX read/write/subscribe paths route through it. +#[cfg(feature = "json-serialize")] +remote_codec: Option>>, +``` + +`new()` initialises a single `remote_codec: None` under `#[cfg(feature = +"json")]`. `RecordValue` likewise carries a `#[cfg(feature = "json-serialize")] +codec: Option>>` and its `as_json()` becomes +`#[cfg(feature = "json-serialize")]`, so `record.latest()?.as_json()` works on no_std. + +### 4. `with_remote_access()` requires the trait + +```rust +#[cfg(feature = "json-serialize")] +pub fn with_remote_access(&mut self) -> &mut Self +where + T: crate::codec::RemoteSerialize + 'static, +{ + self.remote_codec = Some(Arc::new(crate::codec::SerdeJsonCodec)); + self +} +``` + +Because `RemoteSerialize` is blanket-implemented over `Serialize + +DeserializeOwned`, the bound is *source-compatible* with the previous +`T: Serialize + DeserializeOwned` — existing call sites compile unchanged. The +mirror method on `RecordRegistrar` gets the same bound and `json-serialize` gate. The name +`with_remote_access` is kept for API stability; on no_std it installs the codec +purely for local `as_json()` (there is no remote access on embedded). + +### 5. `JsonReaderAdapter` holds the codec + +```rust +// Before +struct JsonReaderAdapter { + inner: Box + Send>, + serializer: JsonSerializer, +} + +// After +struct JsonReaderAdapter { + inner: Box + Send>, + codec: Arc>, +} +// recv_json / try_recv_json call self.codec.encode(&value) +``` + +`JsonReaderAdapter` stays `#[cfg(feature = "std")]` (it implements the std-only +`JsonBufferReader` for AimX streaming); under `std` the `json-serialize` feature is always +on, so `Arc>` resolves. + +### 6. The three `AnyRecord` methods delegate to the codec + +These remain `#[cfg(feature = "std")]` (they are the AimX type-erased entry +points). They read the now-`json-serialize`-gated `remote_codec` field, which is always +present under `std`. + +```rust +fn latest_json(&self) -> Option { + let value = self.buffer.as_ref()?.peek()?; + self.remote_codec.as_ref()?.encode(&value) +} + +fn subscribe_json(&self) -> DbResult> { + let codec = self.remote_codec.clone().ok_or_else(|| /* not configured */)?; + let reader = self.subscribe()?; + Ok(Box::new(JsonReaderAdapter { inner: reader, codec })) +} + +fn set_from_json(&self, json_value: serde_json::Value) -> DbResult<()> { + // unchanged: no-producer-override + buffer-present checks stay here + let codec = self.remote_codec.clone().ok_or_else(|| /* not configured */)?; + let value: T = codec.decode(&json_value).ok_or_else(|| /* schema mismatch */)?; + self.writer_handle().push(value); + Ok(()) +} +``` + +`TypedRecord::latest()` and `RecordValue` switch from holding a closure to +holding `Option>>`; `RecordValue::as_json()` calls +`codec.encode(&self.value)`. No public signature changes on `latest()` / +`as_json()`. + +### 7. Remove `with_read_only_serialization` + +`with_read_only_serialization` (`Serialize`-only) had **zero callers** outside +its own definition (confirmed across both `aimdb` and `aimdb-pro`). It is +removed. The read-only tier disappears with it — see +[Open Questions](#open-questions). + +--- + +## Alternatives Considered + +**A — Leave it as a single bare closure field.** Collapse the two closures into +one `Option>` without a named trait. Lighter, but keeps the bound +ad-hoc and gives the reviewer's "unlock via a trait" request no answer. +Rejected in favour of the named `RemoteSerialize`. + +**B — A `RemoteAccessLink` holder (codec + policy).** Wrap the codec in a struct +that *also* owns the AimX policy currently inlined in `set_from_json` +(no-producer-override, the `writable` flag, ReadOnly enforcement), making AimX a +first-class link structurally peer to `outbound_connectors` / +`inbound_connectors`. This is the fullest realisation of "AimX is a connector," +and is the natural next step. **Deferred:** migrating the security policy widens +scope into the remote-access permission model and should be its own milestone. +This design intentionally lands only the codec so the diff stays reviewable. + +**C — Relocate the codec to a db-level registry.** Move JSON access off +`TypedRecord` entirely into a `HashMap>` built at +registration, and drop `latest_json` / `subscribe_json` / `set_from_json` from +`AnyRecord`. This makes `TypedRecord` purely data-plane (the reviewer's ideal). +**Rejected for now:** it adds a parallel structure plus a second lookup on the +AimX path, and it diverges from how connectors attach — connector links live on +the record (as `Vec<…Link>`), not in a db-side registry. If the std-only +surface of `AnyRecord` later becomes a maintenance burden, revisit. + +**D — Bound on `aimdb-data-contracts::Streamable`.** Rejected: dependency cycle +(contracts → core). `RemoteSerialize` is the core-local equivalent and +`Streamable` types satisfy it automatically. + +**E — Reuse `ConnectorLink` / the connector spawn machinery literally.** +Rejected: AimX is RPC-driven (`get`/`set` are on-demand point ops, not spawned +loops bound to a URL), carries its own security policy, and also serves +introspection (`collect_metadata`) — none of which fit a `ConnectorLink`. The +reusable unit is the *codec*, not the link/spawn plumbing. + +--- + +## no\_std Impact + +The codec is now **no_std + alloc compatible**, gated by the `json-serialize` feature +(`json-serialize = ["alloc", "serde_json"]`). The workspace already configures `serde` and +`serde_json` as `default-features = false` + `alloc`, so enabling `json-serialize` pulls in +no `std` — the same path the `Linkable` data contracts already use to serialize +JSON on embedded targets. + +Feature gating: + +| Item | Gate | +|---|---| +| `crate::codec` (`RemoteSerialize`, `JsonCodec`, `SerdeJsonCodec`) | `json-serialize` | +| `TypedRecord::remote_codec`, `with_remote_access`, `RecordValue::{codec, as_json}` | `json-serialize` | +| AimX type-erased methods (`latest_json` / `subscribe_json` / `set_from_json`), `JsonReaderAdapter` | `std` (and `std` ⇒ `json-serialize`) | + +What no_std gains: with `json-serialize` on, embedded code can call +`record.latest()?.as_json()` and install the codec via `with_remote_access()`. +What stays std: the AimX protocol itself (Unix socket, type-erased dispatch). +Verified by building `aimdb-core` under `--no-default-features --features +alloc,json-serialize` with `#![cfg_attr(not(feature = "std"), no_std)]` active. + +--- + +## Implementation Plan + +Ordered so the workspace compiles and tests pass at every stage. + +**Step 1 — Add the codec module + feature.** Add `json-serialize = ["alloc", +"serde_json"]`; make `std` enable `json-serialize`. Create `aimdb-core/src/codec.rs` +(top-level, `#[cfg(feature = "json-serialize")]`) with `RemoteSerialize` (+ blanket impl), +`JsonCodec` (`encode`/`decode`), and `SerdeJsonCodec`. Declare + re-export +from the crate root. Nothing consumes it yet. Green. + +**Step 2 — Swap the record over to the codec.** In one cohesive change: replace +the two fields with `remote_codec`; update `new()`, `with_remote_access` (`json-serialize` +gate, `T: RemoteSerialize` bound), `latest_json`, `set_from_json`, +`subscribe_json`, `JsonReaderAdapter`, `latest()`, and `RecordValue`/`as_json`. +Delete the `JsonSerializer` / `JsonDeserializer` aliases. Update the +`RecordRegistrar::with_remote_access` bound + gate. Green. + +**Step 3 — Remove `with_read_only_serialization`.** Delete the method (zero +callers). Green. + +**Step 4 — Build matrix + tests.** Build `aimdb-core` under: default (std), +`--no-default-features --features alloc` (codec absent), and +`--no-default-features --features alloc,json-serialize` (codec present, no_std). Clippy +clean on all three. Run core lib tests + the tokio adapter remote-access/drain +integration suite (`record.get` / `set` / `subscribe` on `SingleLatest` / +`Mailbox` / SPMC Ring). Green. + +--- + +## Breaking Changes + +**`with_read_only_serialization` removed** — `pub` API with no callers in either +workspace. Any external code relying on serialize-only remote access must switch +to `with_remote_access` (which now also requires `DeserializeOwned`). + +**`with_remote_access` bound + gate** — bound changes from `Serialize + +DeserializeOwned` to `RemoteSerialize` (source-compatible via the blanket impl, +no call-site changes), and the method is now gated on `json-serialize` rather than `std`. +Since `std` enables `json-serialize`, all existing std callers are unaffected. + +**New public items** — `RemoteSerialize`, `JsonCodec` (methods `encode` / +`decode`), `SerdeJsonCodec`, all in `crate::codec` (additive, feature `json-serialize`). + +**No wire/protocol change** — `record.get` / `set` / `subscribe` behave +identically; the codec produces the same JSON as the closures did. + +--- + +## Open Questions + +1. **Drop the read-only tier?** *Resolved — dropped.* `RemoteSerialize` requires + both `Serialize` and `DeserializeOwned`. `with_read_only_serialization` had no + callers in `aimdb` or `aimdb-pro`, so the `Serialize`-only tier was removed. If + a `Serialize`-but-not-`DeserializeOwned` record ever needs read-only exposure, + reintroduce a read-only codec variant. +2. **Blanket impl vs explicit marker.** The blanket impl makes every `serde` + type codec-ready with zero boilerplate. The data contracts instead use + *explicit* opt-in (`impl Streamable for T {}`). If parity / intentional opt-in + is preferred, make `RemoteSerialize` a marker the user implements. Trade-off: + ergonomics vs. explicitness. *(Shipped as blanket impl.)* +3. **Land Alternative B in the same milestone?** i.e. also migrate the AimX + security policy onto a `RemoteAccessLink`. *Resolved — no.* Kept this milestone + to the codec; the link/policy move is a future milestone. +4. **Module home.** *Resolved — `crate::codec`.* Originally proposed + `crate::remote::codec`, but since the codec is now no_std-capable and feature- + (`json-serialize`-) gated rather than AimX/`std`-specific, a neutral top-level + `crate::codec` module is the correct home. `crate::remote` is itself + `std`-gated, which would have forced the codec to be std-only. +5. **`with_remote_access` naming on no_std.** The method installs a codec; on + no_std there is no "remote access," only local `as_json()`. The name is kept + for std API stability, but a neutral alias (e.g. `with_json()`) could be added + later if the embedded ergonomics warrant it. + +--- + +## Out of Scope + +- **AimX security-policy relocation** onto a link holder (Alternative B) — the + no-producer-override / `writable` / ReadOnly checks stay inline in + `set_from_json` for this milestone. +- **Full relocation to a db-level codec registry** (Alternative C) — + `TypedRecord` keeps the (now single) codec field; `AnyRecord` keeps its three + JSON methods. +- **Folding AimX into the connector spawn machinery** (Alternative E). +- **Custom (non-JSON) AimX wire formats** — `SerdeJsonCodec` is the only codec; + a pluggable codec per connection is not introduced here. diff --git a/examples/remote-access-demo/src/client.rs b/examples/remote-access-demo/src/client.rs index 8f7b4c5..0822cca 100644 --- a/examples/remote-access-demo/src/client.rs +++ b/examples/remote-access-demo/src/client.rs @@ -139,64 +139,37 @@ fn main() -> Result<(), Box> { println!(); - // Test record.get for Temperature - println!("📤 Requesting Temperature value..."); + // ── Point-in-time reads: record.get ────────────────────────────────── + // record.get serves a single "current value", so it only works on buffers + // that have a canonical latest. SingleLatest (Config/AppSettings, below) + // does. SpmcRing does NOT: a ring keeps a *history* for independent + // consumers, so there is no one "latest" to return — record.get answers + // not_found by design. Read rings with record.drain (history) or + // record.subscribe (live), both demonstrated further down. + + println!("📤 record.get on Temperature (SpmcRing — expecting not_found)..."); let get_request = Request { id: 2, method: "record.get".to_string(), params: Some(json!({"record": "server::Temperature"})), }; - let get_request_json = serde_json::to_string(&get_request)?; - writeln!(stream, "{}", get_request_json)?; + writeln!(stream, "{}", serde_json::to_string(&get_request)?)?; stream.flush()?; - // Read response let mut get_response_line = String::new(); reader.read_line(&mut get_response_line)?; - let get_response: Response = serde_json::from_str(&get_response_line)?; match get_response { - Response::Success { id, result } => { - println!("✅ Success! (request_id: {})", id); - println!(); - println!("🌡️ Current Temperature:"); - println!("{}", serde_json::to_string_pretty(&result)?); - } - Response::Error { id, error } => { - println!("❌ Error! (request_id: {})", id); - println!(" Code: {}", error.code); - println!(" Message: {}", error.message); - if let Some(details) = error.details { - println!(" Details: {}", details); - } + Response::Error { id, error } if error.code == "not_found" => { + println!("✅ Expected not_found (request_id: {}): {}", id, error.message); + println!( + " ℹ️ Rings have no point-in-time latest — use record.drain / record.subscribe (below)." + ); } - } - - println!(); - - // Test record.get for SystemStatus - println!("📤 Requesting SystemStatus value..."); - let status_request = Request { - id: 3, - method: "record.get".to_string(), - params: Some(json!({"record": "server::SystemStatus"})), - }; - - let status_request_json = serde_json::to_string(&status_request)?; - writeln!(stream, "{}", status_request_json)?; - stream.flush()?; - - let mut status_response_line = String::new(); - reader.read_line(&mut status_response_line)?; - let status_response: Response = serde_json::from_str(&status_response_line)?; - - match status_response { Response::Success { id, result } => { - println!("✅ Success! (request_id: {})", id); - println!(); - println!("💻 Current System Status:"); + println!("⚠️ Unexpected success (request_id: {}):", id); println!("{}", serde_json::to_string_pretty(&result)?); } Response::Error { id, error } => { @@ -209,7 +182,7 @@ fn main() -> Result<(), Box> { println!(); // Test record.get for Config - println!("📤 Requesting Config value..."); + println!("📤 record.get on Config (SingleLatest — point-in-time read)..."); let config_request = Request { id: 4, method: "record.get".to_string(),