Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed (breaking)

- **M13 — `Spawn` trait removed across the workspace; `AimDbBuilder::build()` now returns `(AimDb, AimDbRunner)` (Issue #88, [Design 028](docs/design/028-M13-remove-spawn-trait.md)).** Every future the database drives — producer services, taps, transforms, join forwarders, connector loops, the remote-access supervisor, `on_start` tasks — is collected at build time and driven by a single `FuturesUnordered` inside `runner.run().await`. Adapter implementations (`TokioAdapter`, `EmbassyAdapter`, `WasmAdapter`) drop their `impl Spawn`. The `embassy-task-pool-8/16/32` features are deleted and `EmbassyAdapter::new_with_network` no longer takes a `Spawner`. Connector authors must update `ConnectorBuilder::build()` to return `Vec<BoxFuture>` instead of `Arc<dyn Connector>`. See each crate's CHANGELOG for the per-crate impact.

## [1.1.0] - 2026-05-22

### Added
Expand Down
15 changes: 11 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ tracing = { version = "0.1", default-features = false }

# Async utilities
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }

# CLI (for aimdb-cli)
clap = { version = "4.0", features = ["derive"] }
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,14 @@ examples:
@printf "$(GREEN)Building all example projects...$(NC)\n"
@printf "$(YELLOW) → Building sync-api-demo (synchronous API wrapper)$(NC)\n"
cargo build --package sync-api-demo
@printf "$(YELLOW) → Building mqtt-connector-demo-common (shared MQTT demo code, runtime-agnostic)$(NC)\n"
cargo build --package mqtt-connector-demo-common
@printf "$(YELLOW) → Building tokio-mqtt-connector-demo (native, tokio runtime)$(NC)\n"
cargo build --package tokio-mqtt-connector-demo
@printf "$(YELLOW) → Building embassy-mqtt-connector-demo (embedded, embassy runtime)$(NC)\n"
cargo build --package embassy-mqtt-connector-demo --target thumbv7em-none-eabihf
@printf "$(YELLOW) → Building knx-connector-demo-common (shared KNX demo code, runtime-agnostic)$(NC)\n"
cargo build --package knx-connector-demo-common
@printf "$(YELLOW) → Building tokio-knx-connector-demo (native, tokio runtime)$(NC)\n"
cargo build --package tokio-knx-connector-demo
@printf "$(YELLOW) → Building embassy-knx-connector-demo (embedded, embassy runtime)$(NC)\n"
Expand All @@ -295,6 +299,8 @@ examples:
cargo build --package weather-station-beta
@printf "$(YELLOW) → Building weather-station-gamma (embedded, embassy runtime)$(NC)\n"
cargo build --package weather-station-gamma --target thumbv7em-none-eabihf
@printf "$(YELLOW) → Building remote-access-demo (AimX server + client)$(NC)\n"
cargo build --package remote-access-demo
@printf "$(YELLOW) → Building hello-mailbox (sync)$(NC)\n"
cargo build --package hello-mailbox
@printf "$(YELLOW) → Building hello-single-latest-async$(NC)\n"
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.finish();
});

builder.build()?.run().await?;
// `.run()` builds the database, collects every producer/consumer/transform
// future, and drives them all on a single `FuturesUnordered`. It blocks
// until shutdown. For programmatic access to the `AimDb` handle, call
// `.build().await?` directly — it returns `(AimDb, AimDbRunner)`.
builder.run().await?;
Ok(())
}
```
Expand Down
2 changes: 1 addition & 1 deletion _external/embassy
Submodule embassy updated 181 files
5 changes: 5 additions & 0 deletions aimdb-codegen/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed (breaking)

- Emitted task scaffolds now use `Producer<T>` / `Consumer<T>` (no `, TokioAdapter` second parameter) and emitted doc tables show the same form, matching the M14 cleanup in `aimdb-core` (Design 029). Regenerate downstream scaffolds after upgrading.
- Emitted `configure_schema` signature changed from `<R: Spawn + 'static>` to `<R: RuntimeAdapter + 'static>`; emitted prelude now imports `aimdb_executor::RuntimeAdapter` instead of `Spawn` (Issue #88). Regenerate downstream schemas.

## [0.2.0] - 2026-05-22

### Changed
Expand Down
28 changes: 14 additions & 14 deletions aimdb-codegen/src/rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,12 @@ pub fn generate_tasks_rs(state: &ArchitectureState, binary_name: &str) -> Option
for input in &task.inputs {
let arg_name = format_ident!("{}", to_snake_case(&input.record));
let value_type = format_ident!("{}Value", input.record);
params.push(quote! { #arg_name: Consumer<#value_type, TokioAdapter> });
params.push(quote! { #arg_name: Consumer<#value_type> });
}
for output in &task.outputs {
let arg_name = format_ident!("{}", to_snake_case(&output.record));
let value_type = format_ident!("{}Value", output.record);
params.push(quote! { #arg_name: Producer<#value_type, TokioAdapter> });
params.push(quote! { #arg_name: Producer<#value_type> });
}

let todo_msg = match &task.task_type {
Expand Down Expand Up @@ -557,7 +557,7 @@ fn emit_imports(state: &ArchitectureState) -> TokenStream {
use aimdb_core::builder::AimDbBuilder;
use aimdb_core::RecordKey;
use aimdb_data_contracts::{#(#contract_traits),*};
use aimdb_executor::Spawn;
use aimdb_executor::RuntimeAdapter;
use serde::{Deserialize, Serialize};
}
}
Expand Down Expand Up @@ -731,7 +731,7 @@ fn emit_configure_schema(state: &ArchitectureState) -> TokenStream {
/// addresses. Producers, consumers, serializers, and deserializers contain
/// business logic and must be provided by application code — they are not
/// generated here.
pub fn configure_schema<R: Spawn + 'static>(builder: &mut AimDbBuilder<R>) {
pub fn configure_schema<R: RuntimeAdapter + 'static>(builder: &mut AimDbBuilder<R>) {
#(#record_blocks)*
}
}
Expand Down Expand Up @@ -1250,7 +1250,7 @@ pub fn generate_hub_schema_rs(state: &ArchitectureState) -> String {
let file_tokens = quote! {
use aimdb_core::buffer::BufferCfg;
use aimdb_core::builder::AimDbBuilder;
use aimdb_executor::Spawn;
use aimdb_executor::RuntimeAdapter;
use #common_crate::*;

#configure_fn
Expand Down Expand Up @@ -1611,10 +1611,10 @@ fn build_transform_call(task: &TaskDef, variant_ident: &syn::Ident) -> TokenStre
///
/// | Inputs | Outputs | API | Generated stub |
/// |--------|---------|-----------------------|---------------------------|
/// | N > 1 | ≥ 1 | `.transform_join()` | `async fn task_handler(JoinEventRx, Producer<O, R>)` |
/// | N > 1 | ≥ 1 | `.transform_join()` | `async fn task_handler(JoinEventRx, Producer<O>)` |
/// | 1 | ≥ 1 | `.transform().map()` | `fn task_transform(&Input) -> Option<Output>` |
/// | 0 | ≥ 1 | `.source()` | `async fn task(RuntimeContext, Producer<O, R>)` |
/// | ≥ 1 | 0 | `.tap()` | `async fn task(RuntimeContext, Consumer<I, R>)` |
/// | 0 | ≥ 1 | `.source()` | `async fn task(RuntimeContext, Producer<O>)` |
/// | ≥ 1 | 0 | `.tap()` | `async fn task(RuntimeContext, Consumer<I>)` |
pub fn generate_hub_tasks_rs(state: &ArchitectureState) -> String {
let project = state
.project
Expand Down Expand Up @@ -1660,7 +1660,7 @@ pub fn generate_hub_tasks_rs(state: &ArchitectureState) -> String {
/// {inputs_doc}\n\
pub async fn {handler}(\n\
mut _rx: aimdb_core::transform::JoinEventRx,\n\
_producer: aimdb_core::Producer<{out_t}, TokioAdapter>,\n\
_producer: aimdb_core::Producer<{out_t}>,\n\
) {{\n\
while let Ok(_trigger) = _rx.recv().await {{\n\
todo!(\"implement {handler}\")\n\
Expand Down Expand Up @@ -1689,7 +1689,7 @@ pub fn {handler}(input: &{in_t}) -> Option<{out_t}> {{\n\
fns.push_str(&format!(
"pub async fn {}(\n\
_ctx: aimdb_core::RuntimeContext<TokioAdapter>,\n\
_producer: aimdb_core::Producer<{out_t}, TokioAdapter>,\n\
_producer: aimdb_core::Producer<{out_t}>,\n\
) {{\n\
todo!(\"implement {}\")\n\
}}\n\n",
Expand All @@ -1700,7 +1700,7 @@ pub fn {handler}(input: &{in_t}) -> Option<{out_t}> {{\n\
fns.push_str(&format!(
"pub async fn {}(\n\
_ctx: aimdb_core::RuntimeContext<TokioAdapter>,\n\
_consumer: aimdb_core::Consumer<{in_t}, TokioAdapter>,\n\
_consumer: aimdb_core::Consumer<{in_t}>,\n\
) {{\n\
todo!(\"implement {}\")\n\
}}\n\n",
Expand Down Expand Up @@ -1833,8 +1833,8 @@ url = "mqtt://ota/cmd/{variant}"
"Missing RecordKey import:\n{out}"
);
assert!(
out.contains("use aimdb_executor::Spawn;"),
"Missing Spawn import:\n{out}"
out.contains("use aimdb_executor::RuntimeAdapter;"),
"Missing RuntimeAdapter import:\n{out}"
);
assert!(
out.contains("use serde::{Deserialize, Serialize};"),
Expand Down Expand Up @@ -1938,7 +1938,7 @@ url = "mqtt://ota/cmd/{variant}"
let out = generated();
assert!(
out.contains(
"pub fn configure_schema<R: Spawn + 'static>(builder: &mut AimDbBuilder<R>)"
"pub fn configure_schema<R: RuntimeAdapter + 'static>(builder: &mut AimDbBuilder<R>)"
),
"Missing configure_schema function:\n{out}"
);
Expand Down
32 changes: 32 additions & 0 deletions aimdb-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,38 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed (breaking)

- **`Producer::produce` is now sync + infallible; `Consumer::subscribe` is now infallible (Design 029 follow-up, M14).** The pre-resolved `WriteHandle::push` cannot fail and the pre-resolved buffer Arc makes `subscribe()` infallible. Call sites collapse: `producer.produce(x).await?` → `producer.produce(x);` and `let Ok(reader) = consumer.subscribe() else { ... }` → `let reader = consumer.subscribe();`. The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces stay `Result`/`async` because the type-erasure downcast remains fallible.
- `AimDb::produce<T>(key, value) -> DbResult<()>` is now sync; `.await` on the call site goes away. Only the key lookup can fail.
- `Database::produce` likewise sync.
- `TypedRecord::produce` is now `pub fn produce(&self, val: T)` (was `pub async fn produce`).
- `aimdb-wasm-adapter`: `bindings::poll_sync` helper deleted — no remaining callers now that `TypedRecord::produce` is sync.
- Dead `consumer.subscribe()` error arms in `transform/single.rs` and `transform/join.rs` removed (the `Err` branch was unreachable after M14).

- **`Producer<T>` / `Consumer<T>` drop the runtime parameter `R` and pre-resolve the record at build time (Design 029, M14).** Producer/Consumer become handles to a buffer rather than tickets to look one up: `produce()` is one virtual call (no `HashMap<key>` probe, no `TypeId` check, no downcast), and `subscribe()` collapses to `buffer.subscribe_boxed()`. The internal mechanic is a new crate-private `WriteHandle<T>` trait backed by `RecordWriter<T>` (in `aimdb-core/src/buffer/writer.rs`), pre-bound to the record's `Arc<dyn DynBuffer<T>>` + snapshot mutex + metadata tracker.
- `Producer<T, R>` → `Producer<T>`; `Consumer<T, R>` → `Consumer<T>`. User code that names the two-parameter form must drop the trailing adapter arg.
- `Producer::key(&self) -> &str` is **removed**. Capture the record key at the registration site instead.
- `Producer::produce(value) -> ()` and `Consumer::subscribe() -> Box<dyn BufferReader<T> + Send>` (v0.4 revision — see the sync/infallible bullet above for the rationale and migration). The `ProducerTrait::produce_any` / `ConsumerTrait::subscribe_any` trait surfaces retain `async`/`Result` for the type-erased downcast that can still fail.
- `AimDb::producer<T>(key)` / `AimDb::consumer<T>(key)` now return `DbResult<…>` (was infallible). They resolve the typed record up front, so callers that previously assumed inference must add `?`.
- `Consumer<T>` cannot exist without a buffer: `.tap()` on a record with no `.buffer(...)` now surfaces as `MissingConfiguration` at build time (was a deferred subscribe-time error).
- `TypedRecord::buffer` field is `Option<Arc<dyn DynBuffer<T>>>` (was `Box`); `TypedRecord::set_buffer(Box<…>)` keeps its public signature and converts via `Arc::from(box_)` internally.
- `TypedRecord::create_producer_trait(&self)` no longer takes `db` / `record_key` — it uses the new `writer_handle()`.
- `ConnectorBuilder<R>` cascade is zero-LOC: no connector struct carried `R` after M13. The outbound `consumer_factory` / inbound `producer_factory` callbacks now resolve the record once at link-startup time (via `db.inner().get_typed_record_by_key`) and construct the new handles.
- Codegen-emitted task scaffolds use `Producer<T>` / `Consumer<T>` (no `, TokioAdapter`).
- `data-contracts` `log_tap` parameter is `Consumer<T>`.

- **`Spawn` trait removed; `AimDbBuilder::build()` now returns `(AimDb<R>, AimDbRunner)` (Issue #88, Design 028).** Every future the database needs — `.source()`/`.tap()`/`.transform()` tasks, on_start hooks, connector loops, the remote-access supervisor — is collected at build time into the new `AimDbRunner`, then driven by a single `FuturesUnordered` from `runner.run().await`. No background work runs until the runner is polled.
- `AimDb::spawn_task` is **deleted**. Migrate to `on_start()` (collected at build) or to a private `FuturesUnordered` inside your own future.
- The `Runtime` bundle no longer supertrait-requires `Spawn`. Custom adapters drop `impl Spawn`.
- `R: Spawn` bounds are gone everywhere in `aimdb-core` (`Producer`, `Consumer`, `TypedRecord`, `TransformDescriptor`, `RecordRegistrar`, `RecordT`, `AnyRecordExt::as_typed`, remote handler/supervisor, `Database<A>`) — replaced by `R: RuntimeAdapter`.
- `RecordSpawner<T>` renamed to `RecordFutureCollector<T>`; its `spawn_all_tasks` → `collect_all_futures`. Internal `spawn_consumer_tasks`/`spawn_producer_service`/`spawn_transform_task` on `TypedRecord` become `collect_consumer_futures`/`collect_producer_future`/`collect_transform_futures`.
- Join transforms now hoist their per-input forwarder construction to build time — `JoinPipeline::into_descriptor()` returns a `CollectedTransform { task_future, fanin_futures }` and the lazy `runtime.spawn(forwarder)` inside `run_join_transform` is gone.
- `ConnectorBuilder::build()` now returns `Vec<BoxFuture<'static, ()>>` instead of `Arc<dyn Connector>` (which `AimDbBuilder` already discarded).
- Unsafe `impl Send/Sync` blocks on `Producer<T, R>` / `Consumer<T, R>` deleted — they auto-derive now.
- On the AimX remote-access path, three `runtime.spawn(...)` call sites bridge to `tokio::spawn` directly under `#[cfg(feature = "std")]`. These (per-connection handler, per-subscription event stream, `subscribe_record_updates`) are addressed in the AimX portability follow-up.
- `on_start` no_std bifurcation collapsed: a single `StartFnType<R>` alias replaces the byte-identical std/no_std pair.

## [1.1.0] - 2026-05-22

### Added
Expand Down
4 changes: 2 additions & 2 deletions aimdb-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Portable code uses `R: Runtime` — no platform imports needed:

```rust
/// Producer: reads a sensor and pushes typed values into AimDB.
async fn sensor_producer<R: Runtime>(ctx: RuntimeContext<R>, producer: Producer<Temperature, R>) {
async fn sensor_producer<R: Runtime>(ctx: RuntimeContext<R>, producer: Producer<Temperature>) {
loop {
let reading = read_sensor().await;
producer.produce(Temperature {
Expand All @@ -65,7 +65,7 @@ async fn sensor_producer<R: Runtime>(ctx: RuntimeContext<R>, producer: Producer<
}

/// Consumer: subscribes to the buffer and reacts to every new value.
async fn temp_logger<R: Runtime>(ctx: RuntimeContext<R>, consumer: Consumer<Temperature, R>) {
async fn temp_logger<R: Runtime>(ctx: RuntimeContext<R>, consumer: Consumer<Temperature>) {
let mut reader = consumer.subscribe().unwrap();
while let Ok(temp) = reader.recv().await {
ctx.log().info(&format!("{}: {:.1}°C", temp.sensor_id, temp.celsius));
Expand Down
5 changes: 5 additions & 0 deletions aimdb-core/src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@ mod cfg;
#[cfg(feature = "metrics")]
mod counters;
mod traits;
mod writer;

// Public API exports
pub use cfg::BufferCfg;
pub use traits::{Buffer, BufferReader, DynBuffer};

// Crate-private — used by Producer<T> to push without per-call lookup
pub(crate) use traits::WriteHandle;
pub(crate) use writer::RecordWriter;

// JSON streaming support (std only)
#[cfg(feature = "std")]
pub use traits::JsonBufferReader;
Expand Down
20 changes: 20 additions & 0 deletions aimdb-core/src/buffer/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ pub trait DynBuffer<T: Clone + Send>: Send + Sync {
fn reset_metrics(&self) {}
}

/// Write-side handle for a single record (design 029, M14).
///
/// `Producer<T>` holds an `Arc<dyn WriteHandle<T>>` so it can be parameterised
/// over `T` alone — no runtime adapter `R` and no per-call record-key string
/// lookup on the produce hot path. The implementor (`RecordWriter<T>`)
/// pre-binds the underlying buffer, the latest-snapshot slot, and the metadata
/// tracker at build time.
///
/// Crate-private on purpose. `Producer<T>::new` is the only construction path;
/// external test code that needs a fake Producer should go through a future
/// `Producer::for_testing(...)` helper rather than implementing `WriteHandle`
/// directly.
pub(crate) trait WriteHandle<T: Clone + Send + 'static>: Send + Sync {
/// Push a value into the buffer, update the latest-snapshot cache, and
/// (when a buffer is present) mark the metadata `last_update` timestamp.
/// Infallible — all three operations are synchronous and lock-free or
/// spin-locked.
fn push(&self, value: T);
}

/// Reader trait for consuming values from a buffer
///
/// All read operations are async. Each reader is independent with its own state.
Expand Down
Loading