From 8a5fefc9d0dfa6cde5d0a50d7893002d1e68a14a Mon Sep 17 00:00:00 2001 From: RAprogramm Date: Mon, 11 May 2026 17:50:39 +0700 Subject: [PATCH] #133 fix(aggregate_root): emit Created event from save() for streams entities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `crates/entity-derive-impl/src/entity/sql/postgres/save.rs::save_method` already wrapped the INSERT in a transaction (#118 era), but for entities with both `aggregate_root` and `streams` enabled it never spliced `pg_notify`. Subscribers missed every new aggregate-root insert silently, even though the row write itself was atomic. Splice `self.notify_created()` into the existing transaction body, after the INSERT but before the commit. The notify executes against `&mut *tx`, the same handle the INSERT uses, so Postgres only broadcasts `Created` on commit and discards it on rollback — the same guarantee already in place for `create_method` (#125). When `streams` is off, `notify_created()` returns an empty TokenStream, so non-streams aggregate roots stay one round-trip with no regression. Tests (3 new lib tests in `save::tests`): - `save_emits_pg_notify_when_streams_enabled`: combined attribute set produces a save body that contains `pg_notify` and executes it on `&mut *tx`. - `save_omits_pg_notify_when_streams_disabled`: aggregate_root without streams does NOT emit `pg_notify` (perf regression guard). - `save_is_empty_for_non_aggregate_root`: untouched control case — `save_method` returns empty when aggregate_root is off. Bump: - entity-derive-impl: 0.6.3 -> 0.6.4 - entity-derive: 0.8.4 -> 0.8.5 `entity-core` is unchanged. Closes #133 --- crates/entity-derive-impl/Cargo.toml | 2 +- .../src/entity/sql/postgres/save.rs | 85 +++++++++++++++++++ crates/entity-derive/Cargo.toml | 2 +- 3 files changed, 87 insertions(+), 2 deletions(-) diff --git a/crates/entity-derive-impl/Cargo.toml b/crates/entity-derive-impl/Cargo.toml index 8c8ec6f..ba243c7 100644 --- a/crates/entity-derive-impl/Cargo.toml +++ b/crates/entity-derive-impl/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "entity-derive-impl" -version = "0.6.3" +version = "0.6.4" edition.workspace = true rust-version.workspace = true authors.workspace = true diff --git a/crates/entity-derive-impl/src/entity/sql/postgres/save.rs b/crates/entity-derive-impl/src/entity/sql/postgres/save.rs index f4e13d5..ee3883e 100644 --- a/crates/entity-derive-impl/src/entity/sql/postgres/save.rs +++ b/crates/entity-derive-impl/src/entity/sql/postgres/save.rs @@ -75,6 +75,14 @@ impl Context<'_> { let span = instrument(&entity_name.to_string(), "save"); + // `notify_created` returns an empty TokenStream when streams are off, + // so the same generator covers both streams-enabled and plain + // aggregate roots. When streams ARE on, the splice runs against the + // same transaction (`&mut *tx`) that wraps the INSERT, so Postgres + // only broadcasts the `Created` event on commit and discards it on + // rollback — atomic with the row write. See #133. + let notify = self.notify_created(); + quote! { #span async fn save(&self, new: #new_name) -> Result<#entity_name, #error_type> { @@ -89,9 +97,86 @@ impl Context<'_> { .fetch_one(&mut *tx).await?; entity = #entity_name::from(row); + #notify + tx.commit().await?; Ok(entity) } } } } + +#[cfg(test)] +mod tests { + use syn::parse_quote; + + use super::*; + use crate::entity::parse::EntityDef; + + fn ctx_for(input: syn::DeriveInput) -> Context<'static> { + // Leak the parsed entity so the borrowed-against `Context` outlives + // the test scope. Acceptable in test code; saves shuffling lifetimes. + let entity: &'static EntityDef = Box::leak(Box::new( + EntityDef::from_derive_input(&input).expect("parse ok") + )); + Context::new(entity) + } + + #[test] + fn save_emits_pg_notify_when_streams_enabled() { + let ctx = ctx_for(parse_quote! { + #[entity(table = "users", aggregate_root, streams)] + pub struct User { + #[id] + pub id: ::uuid::Uuid, + #[field(create, update, response)] + pub email: String + } + }); + let tokens = ctx.save_method().to_string(); + assert!( + tokens.contains("pg_notify"), + "streams + aggregate_root must splice pg_notify into save(), got: {tokens}" + ); + // The notify must run on the transaction handle so it commits + // atomically with the INSERT, not after. + assert!( + tokens.contains("& mut * tx"), + "pg_notify must execute on `&mut *tx`, got: {tokens}" + ); + } + + #[test] + fn save_omits_pg_notify_when_streams_disabled() { + let ctx = ctx_for(parse_quote! { + #[entity(table = "users", aggregate_root)] + pub struct User { + #[id] + pub id: ::uuid::Uuid, + #[field(create, update, response)] + pub email: String + } + }); + let tokens = ctx.save_method().to_string(); + assert!( + !tokens.contains("pg_notify"), + "non-streams aggregate root must NOT emit pg_notify (perf regression guard), got: {tokens}" + ); + } + + #[test] + fn save_is_empty_for_non_aggregate_root() { + let ctx = ctx_for(parse_quote! { + #[entity(table = "users")] + pub struct User { + #[id] + pub id: ::uuid::Uuid + } + }); + let tokens = ctx.save_method(); + assert!( + tokens.is_empty(), + "save() must not be generated unless aggregate_root is on, got: {tokens}" + ); + } +} diff --git a/crates/entity-derive/Cargo.toml b/crates/entity-derive/Cargo.toml index 3873565..7f7aff6 100644 --- a/crates/entity-derive/Cargo.toml +++ b/crates/entity-derive/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "entity-derive" -version = "0.8.4" +version = "0.8.5" edition.workspace = true rust-version.workspace = true authors.workspace = true