88 replace spawn trait with futuresunordered#117
Draft
lxsaah wants to merge 11 commits into
Draft
Conversation
…of futures handling in AimDB
- Updated `WebSocketConnectorImpl` to collect outbound publisher futures instead of spawning them directly. - Refactored `ServerState` to build the WebSocket Axum server future and return it as a `BoxFuture`. - Revised design documentation to reflect the removal of the `Spawn` trait and the new approach for handling futures. - Modified examples to adapt to the new `build()` and `run()` pattern, ensuring proper initialization and concurrent execution of tasks. - Removed unnecessary `Spawn` trait bounds from various components and updated related code generation. - Cleaned up example projects to align with the new architecture, including adjustments to how the Embassy adapter is initialized and how the database runner is executed.
…rn `(AimDb, AimDbRunner)` across the workspace
- Updated `Producer<T, R>` to `Producer<T>` and `Consumer<T, R>` to `Consumer<T>`, simplifying user-facing signatures. - Removed the `.key()` method from `Producer<T>`, as the record key is now captured at registration. - Adjusted tests and examples to reflect the new type signatures. - Updated documentation to describe the architectural changes and the motivation behind the removal of `R`. - Ensured backward compatibility for existing async function signatures while optimizing the hot path for production and consumption of records.
- Changed the `produce` method in various modules from async to synchronous, removing unnecessary await calls. - Updated related documentation and examples to reflect the new synchronous behavior. - Ensured that all instances of `producer.produce(value).await` were replaced with `producer.produce(value)`. - This change simplifies the API and improves performance by eliminating the overhead of async handling where it is not needed.
9 tasks
…t and `R` parameter from `Producer<T>` and `Consumer<T>`
… config producer in server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two stacked architectural clean-ups in a single PR:
Spawntrait removed across the workspace. Every future the database needs (.source()/.tap()/.transform()tasks,on_starthooks, connector loops, the remote-access supervisor, join fan-in forwarders) is now collected at build time into anAimDbRunnerand driven by a singleFuturesUnorderedinsiderunner.run().await.AimDbBuilder::build()returns(AimDb<R>, AimDbRunner).Rremoved fromProducer<T>/Consumer<T>. Both types now pre-resolve their record at build time and carry anArc<dyn WriteHandle<T>>/Arc<dyn DynBuffer<T>>directly. Hot-pathproduce()collapses fromHashMaplookup +TypeIdcheck + downcast to one virtual call.produce()becomes sync + infallible;subscribe()becomes infallible.Together they eliminate the
Spawn/Rpropagation that previously threaded through every typed handle, transform, registrar, and connector builder inaimdb-core.Motivation
Spawnpermeated the type system. BecauseAimDb<R>requiredR: Spawn, andAimDb<R>was embedded in every typed handle, the bound propagated everywhere. Adding a new runtime adapter requiredimpl Spawneven when nothing dynamic was being spawned.BoxedFuture, and fed them into a compile-time-fixed task pool viaunsafe { Pin::new_unchecked(...) }. With noSpawn, the pool, theBoxedFuture, theunsafecast, and theembassy-task-pool-{8,16,32}Cargo features all disappear.Producer::produce()did real work before pushing. Every call performed aHashMap<StringKey, RecordId>probe, a bounds check, aTypeIdequality check, and adyn AnyRecord → &TypedRecord<T, R>downcast — on the hot path, at the producer'sproduce()rate. A 1 kHz sensor was paying that cost 1000×/s for no benefit. M14 pre-resolves the handle at build time so the steady-state path is one virtual call into the buffer.What changed
aimdb-coreAimDbBuilder::build()returns(AimDb<R>, AimDbRunner). No background work runs untilrunner.run().awaitis polled.AimDb::spawn_taskdeleted. Migrate toon_start()(collected at build) or to a privateFuturesUnorderedinside your own future.Runtimebundle trait no longer requiresSpawn.R: Spawnbounds replaced byR: RuntimeAdapteracrossProducer,Consumer,TypedRecord,TransformDescriptor,RecordRegistrar,RecordT,AnyRecordExt::as_typed, remote handler/supervisor,Database<A>.Producer<T, R>→Producer<T>;Consumer<T, R>→Consumer<T>. Backed by a new crate-privateWriteHandle<T>trait (aimdb-core/src/buffer/writer.rs) pre-bound to the record'sArc<dyn DynBuffer<T>>+ snapshot mutex + metadata tracker.Producer::produceis now sync + infallible.producer.produce(x).await?→producer.produce(x);. TheProducerTrait::produce_anytype-erased surface staysResult/asyncfor the downcast.Consumer::subscribeis now infallible.let Ok(reader) = consumer.subscribe() else { ... }→let reader = consumer.subscribe();.AimDb::producer<T>(key)/AimDb::consumer<T>(key)now returnDbResult<…>— they resolve the typed record up front, so callers must add?.Producer::key()removed. Capture the record key at the registration site instead..tap()on a record with no.buffer(...)surfaces asMissingConfigurationat build time (was a deferred subscribe-time error).RecordSpawner<T>→RecordFutureCollector<T>;spawn_all_tasks→collect_all_futures;spawn_consumer_tasks/spawn_producer_service/spawn_transform_task→collect_*_futures.JoinPipeline::into_descriptor()now returnsCollectedTransform { task_future, fanin_futures }; the lazyruntime.spawn(forwarder)insiderun_join_transformis gone.unsafe impl Send/Syncblocks onProducer<T, R>/Consumer<T, R>deleted — they auto-derive now thatRis gone.aimdb-executorSpawntrait deleted (includingSpawnTokenandExecutorError::SpawnFailed).futures-utiladded as a regular (alloc-only) dependency — providesFuturesUnorderedfor the new runner.JoinFanInRuntimesupertrait relaxed fromSpawntoRuntimeAdapter.aimdb-tokio-adapterimpl Spawn for TokioAdapterdeleted. Adapter is nowRuntimeAdapter + TimeOps + Logger.spawn_connectorstest-only helper andconnectormodule deleted. Outbound connector futures are collected byConnectorBuilder::build()now.Producer<T>/Consumer<T>(no, TokioAdapter).aimdb-embassy-adapterimpl Spawn for EmbassyAdapterdeleted. Staticgeneric_task_runnertask pool,BoxedFuture, and theunsafe Pin::new_uncheckedcast are all gone.embassy-task-pool-8/-16/-32Cargo features deleted.FuturesUnorderedgrows as needed within a single Embassy task's heap budget.EmbassyAdapter::new_with_spawnerdeleted.new_with_network(spawner, network)→new_with_network(network).unsafe impl Send/Sync for EmbassyAdapterretained whenembassy-net-supportis enabled (single-threaded cooperative executor makes it sound;Stackhas aRefCell).aimdb-wasm-adapterbindings::poll_synchelper deleted — no callers now thatTypedRecord::produceis sync.Connector crates (
aimdb-knx-connector,aimdb-mqtt-connector,aimdb-websocket-connector)ConnectorBuilder::build()now returnsVec<BoxFuture<'static, ()>>instead ofArc<dyn Connector>.spawn_connection_task→build_connection_future; outbound command channels are created up front and shared via the captured receiver.R: Spawn→R: RuntimeAdapterthroughout.transport::Connectorimpls on*ConnectorImpltypes removed (the discardedArc<dyn Connector>return is gone).start_server()→build_server_future()— theaxum::serve()accept loop is collected, not spawned.tokio::spawndirectly under#[cfg(feature = "std")]. To be moved to nestedFuturesUnorderedin the AimX portability follow-up.Remote access / AimX
runtime.spawn(...)sites (per-connection handler, per-subscription event stream,subscribe_record_updates) keep bridging totokio::spawnunder#[cfg(feature = "std")]and will be removed by the AimX portability follow-up.Examples
All example binaries updated to the new
let (db, runner) = builder.build()?; runner.run().awaitpattern: weather-mesh (alpha/beta/gamma), embassy-knx/mqtt-connector-demo, tokio-knx/mqtt-connector-demo, remote-access-demo, hello-single-latest-async, sync-api-demo. Producer/consumer signatures collapse fromProducer<LightControl, EmbassyAdapter>toProducer<LightControl>.Breaking changes — migration checklist
let db = builder.build()?;let (db, runner) = builder.build()?;thenrunner.run().awaitdb.spawn_task(fut)on_start(...)at build time, or ownFuturesUnorderedimpl Spawn for MyAdapter { ... }Runtimeno longer requiresSpawnproducer.produce(x).await?producer.produce(x);let Ok(reader) = consumer.subscribe() else { ... }let reader = consumer.subscribe();let p: Producer<T, TokioAdapter> = ...let p: Producer<T> = ...let c: Consumer<T, TokioAdapter> = ...let c: Consumer<T> = ...producer.key()EmbassyAdapter::new_with_network(spawner, network)EmbassyAdapter::new_with_network(network)EmbassyAdapter::new_with_spawner(spawner)EmbassyAdapter::new()ornew_with_network(network)features = ["embassy-task-pool-16"]ConnectorBuilder::build() -> Arc<dyn Connector>-> Vec<BoxFuture<'static, ()>>.tap()on a record with no.buffer(...)(silent → subscribe-time error)MissingConfigurationat buildPerformance
Producer::produce()steady state:HashMaplookup + bounds check +TypeIdcheck + downcast + buffer push → one virtual call + buffer push. On a 1 kHz source this saves ~1000 lookups/s per producer. The same simplification applies toConsumer::subscribe()(no buffer lookup; theArcis pre-resolved at build).Test plan
cargo test --workspace --all-featurescargo check -p aimdb-embassy-adapter --target thumbv7em-none-eabihf(no_std + alloc)cargo check -p aimdb-wasm-adapter --target wasm32-unknown-unknownexamples/tokio-knx-connector-demo,examples/tokio-mqtt-connector-demo,examples/remote-access-demoend-to-endexamples/embassy-knx-connector-demoandexamples/embassy-mqtt-connector-demoand confirm KNX/MQTT round-trips workexamples/weather-mesh-tokio(alpha/beta/gamma) and confirmDewPointjoin transform produces valuesexamples/sync-api-demostill compiles and runs without.awaitonproduce()cargo check -p aimdb-core --no-default-features --features alloc(no_std + alloc minimum)runner.run().awaitblocks until shutdown and that allon_start/ connector / transform futures are drivenOut of scope (follow-ups)
tokio::spawnsites under#[cfg(feature = "std")](per-connection handler, per-subscription event stream,subscribe_record_updates) are deferred to the AimX remote-access portability follow-up so this PR stays a focused trait-removal change.FuturesUnordered.