Skip to content

[FEAT] Backpressure-aware produce_async / subscribe_async #115

@lxsaah

Description

@lxsaah

Background

After M14 (Design 029), Producer::produce is synchronous and infallible:

impl<T> Producer<T> {
    pub fn produce(&self, value: T);
}

That's correct for today's buffers (SpmcRing, SingleLatest, Mailbox)
because all three are non-blocking and overwrite on overflow — WriteHandle::push
genuinely cannot fail or block. The M14 design explicitly defers
backpressure-aware variants until a buffer that needs them actually exists.

Problem

Buffer types we'll want next don't fit the "infallible push" model:

  • Persistent / disk-backed — push performs an I/O write that can block on
    fsync or fail mid-write.
  • Network-backed (e.g. a record that publishes directly to a remote AimX
    shard or a distributed broker) — push is genuinely async, may need
    reconnect, may fail.
  • Bounded with await-on-full semantics — block the producer until a slot
    frees up rather than overwriting. Maps to tokio::sync::mpsc::Sender::send.

For all of these, the producer needs an async entry point that returns a
Result so it can await space / I/O completion and surface failure.

Proposed API

Add an async variant on Producer<T> alongside today's produce:

impl<T> Producer<T> {
    /// Sync, infallible. Today's API. Best for overwriting buffers.
    pub fn produce(&self, value: T);

    /// Backpressure-aware. Awaits space (bounded buffer) or I/O completion
    /// (persistent / network buffers). For overwriting buffers this is
    /// equivalent to `produce()` + `Ok(())` and completes immediately.
    pub async fn produce_async(&self, value: T) -> DbResult<()>;
}

A symmetric consumer-side subscribe_async is only needed if a buffer
genuinely requires async attachment (e.g. registering with a remote broker
before it can deliver). Defer until that case exists — BufferReader<T>
already exposes recv().await for the actual data path.

WriteHandle trait extension

Extend the crate-private trait in aimdb-core/src/buffer/traits.rs with a
default-implemented async method so existing buffer impls compile
unchanged:

pub(crate) trait WriteHandle<T: Clone + Send + 'static>: Send + Sync {
    fn push(&self, value: T);

    /// Default: delegate to `push` and return `Ok(())` immediately.
    /// Backpressure-aware buffers (persistent, network, bounded-await) override.
    fn push_async<'a>(
        &'a self,
        value: T,
    ) -> Pin<Box<dyn Future<Output = DbResult<()>> + Send + 'a>> {
        self.push(value);
        Box::pin(core::future::ready(Ok(())))
    }
}

This means SpmcRing / SingleLatest / Mailbox need no changes —
they get produce_async for free with semantically correct
"complete-immediately" behaviour.

Open questions

  1. ProducerTrait::produce_any symmetry — should the type-erased trait
    (used by inbound connector routing) gain produce_any_async? Connector
    routes today bind to a single produce-fn flavour at link-startup; the
    simplest approach is to keep produce_any as-is (calls sync push) and
    add produce_any_async only when a connector needs it.

  2. Namingproduce_async (mirrors how we'd add subscribe_async) vs.
    send_async (closer Tokio parity). Settle before merge.

  3. aimdb-sync interactionSyncProducer::send already bridges into a
    bounded tokio channel. Does it internally call produce_async instead of
    produce, so backpressure surfaces back to the sync caller? Probably yes,
    but only matters once a backpressure-aware buffer ships.

  4. Producer<T>: Send + 'static for produce_async — the returned future
    must be Send so it can cross await points in user services. Verify the
    Pin<Box<dyn Future + Send + '_>> shape composes with WriteHandle: Send + Sync
    without lifetime gymnastics.

Out of scope

  • Implementing an actual backpressure-aware buffer (separate milestone,
    driven by a concrete need: persistence / remote AimX / bounded fan-in).
  • Consumer<T>::subscribe_async — not yet needed.
  • AimX remote-access path — has its own backpressure story.
  • try_produce — covered by the sibling issue (non-blocking variant).

Acceptance criteria

  • Producer::produce unchanged (no call-site breakage).
  • Producer::produce_async exists, returns DbResult<()>.
  • WriteHandle::push_async has a default impl that delegates to push.
  • Existing buffers (SpmcRing, SingleLatest, Mailbox) compile and
    test unchanged.
  • At least one test buffer overrides push_async to exercise the async
    path (e.g. a fake "await for slot" buffer) — otherwise we're adding API
    surface without validation.
  • Doc-comment on produce and produce_async explains when to pick which.
  • make check clean.

Depends on / blocks

  • Depends on: Design 029 — Remove R from Producer<T> / Consumer<T>
    (M14, ✅ Implemented). The WriteHandle<T> indirection is what makes this
    cheap to add.
  • Blocks: any new buffer impl with persistent / network-backed / bounded-
    await semantics.

References

  • docs/design/029-M14-remove-r-from-typed-handles.mdDecisions item 2
    (kept async fn produce -> DbResult<()> "for headroom" — that headroom is
    what this issue redeems).
  • aimdb-core/src/buffer/traits.rsWriteHandle<T> trait.
  • aimdb-core/src/typed_api.rsProducer<T>::produce to extend.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions