From d8a217ea198b263e9b29dbf16089be93ade4483f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 29 May 2026 18:49:24 +0800 Subject: [PATCH 1/4] feat: enhance HashJoinStream lifecycle management and testing - Added BuildReportHandle to streamline operations - Centralized the schedule, deliver, cancel, and finalize lifecycle processes - Moved OnceFut waiter functionality into the handle for better management - Simplified the drop and wait path for HashJoinStream - Introduced three focused lifecycle tests for improved coverage - Added test-only helpers in shared_bounds for stream lifecycle tests - Reused existing accumulator test setup for efficiency --- .../src/joins/hash_join/shared_bounds.rs | 85 +++++--- .../src/joins/hash_join/stream.rs | 200 +++++++++++++++--- 2 files changed, 223 insertions(+), 62 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index fba6b2c2db2e2..69798d7956349 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -699,43 +699,70 @@ impl fmt::Debug for SharedBuildAccumulator { } } +#[cfg(test)] +pub(super) fn make_partitioned_accumulator_for_test( + num_partitions: usize, +) -> SharedBuildAccumulator { + let probe_schema = Arc::new(Schema::new(vec![Field::new( + "probe_key", + DataType::Int32, + false, + )])); + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); + SharedBuildAccumulator { + inner: Mutex::new(AccumulatorState { + data: AccumulatedBuildData::Partitioned { + partitions: vec![PartitionStatus::Pending; num_partitions], + completed_partitions: 0, + }, + completion: CompletionState::Pending, + }), + completion_notify: Notify::new(), + dynamic_filter, + on_right: vec![], + repartition_random_state: SeededRandomState::with_seed(1), + probe_schema, + } +} + +#[cfg(test)] +pub(super) fn completed_partitions_for_test(acc: &SharedBuildAccumulator) -> usize { + let guard = acc.inner.lock(); + let AccumulatedBuildData::Partitioned { + completed_partitions, + .. + } = &guard.data + else { + panic!("expected partitioned accumulator"); + }; + *completed_partitions +} + +#[cfg(test)] +fn partitioned_state_for_test( + acc: &SharedBuildAccumulator, +) -> (Vec, usize) { + let guard = acc.inner.lock(); + let AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + } = &guard.data + else { + panic!("expected partitioned accumulator"); + }; + (partitions.clone(), *completed_partitions) +} + #[cfg(test)] mod tests { use super::*; fn make_partitioned_accumulator(num_partitions: usize) -> SharedBuildAccumulator { - let probe_schema = Arc::new(Schema::new(vec![Field::new( - "probe_key", - DataType::Int32, - false, - )])); - let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); - SharedBuildAccumulator { - inner: Mutex::new(AccumulatorState { - data: AccumulatedBuildData::Partitioned { - partitions: vec![PartitionStatus::Pending; num_partitions], - completed_partitions: 0, - }, - completion: CompletionState::Pending, - }), - completion_notify: Notify::new(), - dynamic_filter, - on_right: vec![], - repartition_random_state: SeededRandomState::with_seed(1), - probe_schema, - } + make_partitioned_accumulator_for_test(num_partitions) } fn partitioned_state(acc: &SharedBuildAccumulator) -> (Vec, usize) { - let guard = acc.inner.lock(); - let AccumulatedBuildData::Partitioned { - partitions, - completed_partitions, - } = &guard.data - else { - panic!("expected partitioned accumulator"); - }; - (partitions.clone(), *completed_partitions) + partitioned_state_for_test(acc) } // Regression guard for the build-report lifecycle fix: on `Drop`, a stream diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 040470c9be12b..4c8ed08ce1a70 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -173,15 +173,106 @@ impl ProcessProbeBatchState { /// Lifecycle of this partition's build-data report to the shared coordinator. /// -/// `ReportScheduled` means the reporting `OnceFut` has been constructed but is -/// lazy: the coordinator has not yet observed the report. Only `ReportDelivered` -/// guarantees the coordinator saw it, so `Drop` must still cancel the partition -/// when the state is `ReportScheduled` — otherwise sibling partitions wait -/// forever for a report that never runs. +/// `Scheduled` means the reporting `OnceFut` has been constructed but is lazy: +/// the coordinator has not necessarily observed the report. Only `Delivered` +/// guarantees the coordinator saw it, so `Drop` must still cancel a `Scheduled` +/// partition — otherwise sibling partitions can wait forever for a report that +/// never runs. enum BuildReportState { NotReported, - ReportScheduled, - ReportDelivered, + Scheduled, + Delivered, + Canceled, + Finalized, +} + +/// Owns the stream-side lifecycle for one partition's build-data report. +struct BuildReportHandle { + partition: usize, + mode: PartitionMode, + build_accumulator: Option>, + waiter: Option>, + state: BuildReportState, +} + +impl BuildReportHandle { + fn new( + partition: usize, + mode: PartitionMode, + build_accumulator: Option>, + ) -> Self { + Self { + partition, + mode, + build_accumulator, + waiter: None, + state: BuildReportState::NotReported, + } + } + + fn has_accumulator(&self) -> bool { + self.build_accumulator.is_some() + } + + fn schedule(&mut self, build_data: PartitionBuildData) { + let Some(build_accumulator) = &self.build_accumulator else { + self.finalize(); + return; + }; + + debug_assert!(matches!(self.state, BuildReportState::NotReported)); + let acc = Arc::clone(build_accumulator); + self.waiter = Some(OnceFut::new(async move { + acc.report_build_data(build_data).await + })); + self.state = BuildReportState::Scheduled; + } + + fn wait_for_delivery(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + if let Some(ref mut fut) = self.waiter { + ready!(fut.get_shared(cx))?; + self.mark_delivered(); + } + Poll::Ready(Ok(())) + } + + fn mark_delivered(&mut self) { + if matches!(self.state, BuildReportState::Delivered) { + return; + } + debug_assert!(matches!(self.state, BuildReportState::Scheduled)); + self.state = BuildReportState::Delivered; + } + + fn cancel_if_pending(&mut self) { + if matches!( + self.state, + BuildReportState::Delivered + | BuildReportState::Canceled + | BuildReportState::Finalized + ) { + return; + } + + if self.mode == PartitionMode::Partitioned + && let Some(build_accumulator) = &self.build_accumulator + { + build_accumulator.report_canceled_partition(self.partition); + self.state = BuildReportState::Canceled; + } else { + self.finalize(); + } + } + + fn finalize(&mut self) { + self.state = BuildReportState::Finalized; + } +} + +impl Drop for BuildReportHandle { + fn drop(&mut self) { + self.cancel_if_pending(); + } } /// [`Stream`] for [`super::HashJoinExec`] that does the actual join. @@ -228,13 +319,8 @@ pub(super) struct HashJoinStream { build_indices_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, - /// Shared build accumulator for coordinating dynamic filter updates (collects hash maps and/or bounds, optional) - build_accumulator: Option>, - /// Optional future to signal when build information has been reported by all partitions - /// and the dynamic filter has been updated - build_waiter: Option>, - /// Tracks where this partition is in the build-data reporting lifecycle. - build_report_state: BuildReportState, + /// Owns this partition's build-data report lifecycle. + build_report: BuildReportHandle, /// Partitioning mode to use mode: PartitionMode, /// Output buffer for coalescing small batches into larger ones with optional fetch limit. @@ -414,9 +500,7 @@ impl HashJoinStream { probe_indices_buffer: Vec::with_capacity(batch_size), build_indices_buffer: Vec::with_capacity(batch_size), right_side_ordered, - build_accumulator, - build_waiter: None, - build_report_state: BuildReportState::NotReported, + build_report: BuildReportHandle::new(partition, mode, build_accumulator), mode, output_buffer, null_aware, @@ -449,9 +533,9 @@ impl HashJoinStream { &mut self, left_data: &Arc, ) -> HashJoinStreamState { - let Some(build_accumulator) = self.build_accumulator.as_ref() else { + if !self.build_report.has_accumulator() { return Self::state_after_build_ready(self.join_type, left_data.as_ref()); - }; + } let pushdown = left_data.membership().clone(); let bounds = left_data @@ -473,11 +557,7 @@ impl HashJoinStream { ), }; - let acc = Arc::clone(build_accumulator); - self.build_waiter = Some(OnceFut::new(async move { - acc.report_build_data(build_data).await - })); - self.build_report_state = BuildReportState::ReportScheduled; + self.build_report.schedule(build_data); HashJoinStreamState::WaitPartitionBoundsReport } @@ -541,10 +621,7 @@ impl HashJoinStream { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>>> { - if let Some(ref mut fut) = self.build_waiter { - ready!(fut.get_shared(cx))?; - self.build_report_state = BuildReportState::ReportDelivered; - } + ready!(self.build_report.wait_for_delivery(cx))?; let build_side = self.build_side.try_as_ready()?; self.state = Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); @@ -968,12 +1045,69 @@ impl Stream for HashJoinStream { impl Drop for HashJoinStream { fn drop(&mut self) { - if self.mode == PartitionMode::Partitioned - && !matches!(self.build_report_state, BuildReportState::ReportDelivered) - && let Some(build_accumulator) = &self.build_accumulator + self.build_report.cancel_if_pending(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::joins::hash_join::shared_bounds::{ + PushdownStrategy, completed_partitions_for_test, + make_partitioned_accumulator_for_test, + }; + + fn empty_build_data(partition_id: usize) -> PartitionBuildData { + PartitionBuildData::Partitioned { + partition_id, + pushdown: PushdownStrategy::Empty, + bounds: PartitionBounds::new(vec![]), + } + } + + #[test] + fn build_report_handle_cancels_scheduled_partition_on_drop() { + let acc = Arc::new(make_partitioned_accumulator_for_test(2)); + { - build_accumulator.report_canceled_partition(self.partition); - self.build_report_state = BuildReportState::ReportDelivered; + let mut handle = BuildReportHandle::new( + 0, + PartitionMode::Partitioned, + Some(Arc::clone(&acc)), + ); + handle.schedule(empty_build_data(0)); + } + + assert_eq!(completed_partitions_for_test(&acc), 1); + } + + #[test] + fn build_report_handle_does_not_cancel_delivered_partition_on_drop() { + let acc = Arc::new(make_partitioned_accumulator_for_test(2)); + + { + let mut handle = BuildReportHandle::new( + 0, + PartitionMode::Partitioned, + Some(Arc::clone(&acc)), + ); + handle.schedule(empty_build_data(0)); + handle.mark_delivered(); } + + assert_eq!(completed_partitions_for_test(&acc), 0); + } + + #[test] + fn build_report_handle_cancel_if_pending_is_idempotent() { + let acc = Arc::new(make_partitioned_accumulator_for_test(2)); + let mut handle = + BuildReportHandle::new(0, PartitionMode::Partitioned, Some(Arc::clone(&acc))); + handle.schedule(empty_build_data(0)); + + handle.cancel_if_pending(); + handle.cancel_if_pending(); + + assert_eq!(completed_partitions_for_test(&acc), 1); } } From f80f08964fe4d083fb648cd61b68104d54af99fd Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 29 May 2026 18:55:50 +0800 Subject: [PATCH 2/4] feat(stream): tighten delivered-state transition and streamline cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Updated `wait_for_delivery()` to directly set `Delivered` after a successful `fut.get_shared(cx)`. - Removed the `mark_delivered()` helper to prevent independent marking of delivered states in tests and internal code. - Eliminated redundant stream-level drop cleanup entry point; cleanup is now handled by `BuildReportHandle`’s Drop implementation. - Updated the delivered-path test for improved verification of the waiter flow: - Changed test to use a one-partition accumulator. - Polls `wait_for_delivery()` with a noop waker/context. - Asserts that the completion count is 1 to confirm expected behavior without extra cancellation side effects. --- .../src/joins/hash_join/stream.rs | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 4c8ed08ce1a70..b26c92e5e5ce8 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -231,19 +231,14 @@ impl BuildReportHandle { fn wait_for_delivery(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { if let Some(ref mut fut) = self.waiter { ready!(fut.get_shared(cx))?; - self.mark_delivered(); + if !matches!(self.state, BuildReportState::Delivered) { + debug_assert!(matches!(self.state, BuildReportState::Scheduled)); + self.state = BuildReportState::Delivered; + } } Poll::Ready(Ok(())) } - fn mark_delivered(&mut self) { - if matches!(self.state, BuildReportState::Delivered) { - return; - } - debug_assert!(matches!(self.state, BuildReportState::Scheduled)); - self.state = BuildReportState::Delivered; - } - fn cancel_if_pending(&mut self) { if matches!( self.state, @@ -1043,12 +1038,6 @@ impl Stream for HashJoinStream { } } -impl Drop for HashJoinStream { - fn drop(&mut self) { - self.build_report.cancel_if_pending(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -1083,7 +1072,7 @@ mod tests { #[test] fn build_report_handle_does_not_cancel_delivered_partition_on_drop() { - let acc = Arc::new(make_partitioned_accumulator_for_test(2)); + let acc = Arc::new(make_partitioned_accumulator_for_test(1)); { let mut handle = BuildReportHandle::new( @@ -1092,10 +1081,15 @@ mod tests { Some(Arc::clone(&acc)), ); handle.schedule(empty_build_data(0)); - handle.mark_delivered(); + let waker = futures::task::noop_waker_ref(); + let mut cx = std::task::Context::from_waker(waker); + assert!(matches!( + handle.wait_for_delivery(&mut cx), + Poll::Ready(Ok(())) + )); } - assert_eq!(completed_partitions_for_test(&acc), 0); + assert_eq!(completed_partitions_for_test(&acc), 1); } #[test] From a0d9214650224c0377037c1adaa98a9ce68eecda Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 29 May 2026 19:04:47 +0800 Subject: [PATCH 3/4] chore: clean up test helpers and simplify code in shared_bounds and stream modules - shared_bounds.rs: - Removed unused top-level test helper - Removed forwarding test wrapper - Kept only cross-module helpers at the top level - stream.rs: - Added partitioned_handle test helper - Removed repeated BuildReportHandle::new(...) boilerplate - Simplified noop context creation - Documented defensive no-accumulator fallback in schedule --- .../src/joins/hash_join/shared_bounds.rs | 33 +++++++------------ .../src/joins/hash_join/stream.rs | 24 ++++++-------- 2 files changed, 21 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 69798d7956349..0af4015ff7239 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -738,31 +738,20 @@ pub(super) fn completed_partitions_for_test(acc: &SharedBuildAccumulator) -> usi *completed_partitions } -#[cfg(test)] -fn partitioned_state_for_test( - acc: &SharedBuildAccumulator, -) -> (Vec, usize) { - let guard = acc.inner.lock(); - let AccumulatedBuildData::Partitioned { - partitions, - completed_partitions, - } = &guard.data - else { - panic!("expected partitioned accumulator"); - }; - (partitions.clone(), *completed_partitions) -} - #[cfg(test)] mod tests { use super::*; - fn make_partitioned_accumulator(num_partitions: usize) -> SharedBuildAccumulator { - make_partitioned_accumulator_for_test(num_partitions) - } - fn partitioned_state(acc: &SharedBuildAccumulator) -> (Vec, usize) { - partitioned_state_for_test(acc) + let guard = acc.inner.lock(); + let AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + } = &guard.data + else { + panic!("expected partitioned accumulator"); + }; + (partitions.clone(), *completed_partitions) } // Regression guard for the build-report lifecycle fix: on `Drop`, a stream @@ -775,7 +764,7 @@ mod tests { // `Reported`. This test pins that invariant. #[test] fn report_canceled_partition_is_noop_after_report() { - let acc = make_partitioned_accumulator(2); + let acc = make_partitioned_accumulator_for_test(2); { let mut guard = acc.inner.lock(); @@ -807,7 +796,7 @@ mod tests { // which is what unblocks sibling partitions waiting on the coordinator. #[test] fn report_canceled_partition_marks_pending_partition_canceled() { - let acc = make_partitioned_accumulator(2); + let acc = make_partitioned_accumulator_for_test(2); acc.report_canceled_partition(0); let (partitions, completed) = partitioned_state(&acc); diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index b26c92e5e5ce8..94903a477f957 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -216,6 +216,8 @@ impl BuildReportHandle { fn schedule(&mut self, build_data: PartitionBuildData) { let Some(build_accumulator) = &self.build_accumulator else { + // Defensive no-op terminal state; current callers avoid scheduling + // unless an accumulator is present. self.finalize(); return; }; @@ -1054,16 +1056,16 @@ mod tests { } } + fn partitioned_handle(acc: &Arc) -> BuildReportHandle { + BuildReportHandle::new(0, PartitionMode::Partitioned, Some(Arc::clone(acc))) + } + #[test] fn build_report_handle_cancels_scheduled_partition_on_drop() { let acc = Arc::new(make_partitioned_accumulator_for_test(2)); { - let mut handle = BuildReportHandle::new( - 0, - PartitionMode::Partitioned, - Some(Arc::clone(&acc)), - ); + let mut handle = partitioned_handle(&acc); handle.schedule(empty_build_data(0)); } @@ -1075,14 +1077,9 @@ mod tests { let acc = Arc::new(make_partitioned_accumulator_for_test(1)); { - let mut handle = BuildReportHandle::new( - 0, - PartitionMode::Partitioned, - Some(Arc::clone(&acc)), - ); + let mut handle = partitioned_handle(&acc); handle.schedule(empty_build_data(0)); - let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(waker); + let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); assert!(matches!( handle.wait_for_delivery(&mut cx), Poll::Ready(Ok(())) @@ -1095,8 +1092,7 @@ mod tests { #[test] fn build_report_handle_cancel_if_pending_is_idempotent() { let acc = Arc::new(make_partitioned_accumulator_for_test(2)); - let mut handle = - BuildReportHandle::new(0, PartitionMode::Partitioned, Some(Arc::clone(&acc))); + let mut handle = partitioned_handle(&acc); handle.schedule(empty_build_data(0)); handle.cancel_if_pending(); From 64115cceb0cd96d5fef163012dcc4b99a28efc3b Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Fri, 29 May 2026 20:58:39 +0800 Subject: [PATCH 4/4] feat(stream): rename delivery methods and enhance BuildReportState - Renamed `wait_for_delivery` to `poll_delivery` - Renamed `cancel_if_pending` to `cancel_pending` - Implemented `Debug`, `PartialEq`, and `Eq` for `BuildReportState` - Added test-only `state()` accessor - Included state assertions in tests - Added no-accumulator finalize test - Updated idempotency test name to align with new method naming --- .../src/joins/hash_join/stream.rs | 38 +++++++++++++------ 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 94903a477f957..d403fa43cda4b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -178,6 +178,7 @@ impl ProcessProbeBatchState { /// guarantees the coordinator saw it, so `Drop` must still cancel a `Scheduled` /// partition — otherwise sibling partitions can wait forever for a report that /// never runs. +#[derive(Debug, PartialEq, Eq)] enum BuildReportState { NotReported, Scheduled, @@ -230,7 +231,7 @@ impl BuildReportHandle { self.state = BuildReportState::Scheduled; } - fn wait_for_delivery(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + fn poll_delivery(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { if let Some(ref mut fut) = self.waiter { ready!(fut.get_shared(cx))?; if !matches!(self.state, BuildReportState::Delivered) { @@ -241,7 +242,7 @@ impl BuildReportHandle { Poll::Ready(Ok(())) } - fn cancel_if_pending(&mut self) { + fn cancel_pending(&mut self) { if matches!( self.state, BuildReportState::Delivered @@ -264,11 +265,16 @@ impl BuildReportHandle { fn finalize(&mut self) { self.state = BuildReportState::Finalized; } + + #[cfg(test)] + fn state(&self) -> &BuildReportState { + &self.state + } } impl Drop for BuildReportHandle { fn drop(&mut self) { - self.cancel_if_pending(); + self.cancel_pending(); } } @@ -618,7 +624,7 @@ impl HashJoinStream { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>>> { - ready!(self.build_report.wait_for_delivery(cx))?; + ready!(self.build_report.poll_delivery(cx))?; let build_side = self.build_side.try_as_ready()?; self.state = Self::state_after_build_ready(self.join_type, build_side.left_data.as_ref()); @@ -1067,6 +1073,7 @@ mod tests { { let mut handle = partitioned_handle(&acc); handle.schedule(empty_build_data(0)); + assert_eq!(handle.state(), &BuildReportState::Scheduled); } assert_eq!(completed_partitions_for_test(&acc), 1); @@ -1080,24 +1087,33 @@ mod tests { let mut handle = partitioned_handle(&acc); handle.schedule(empty_build_data(0)); let mut cx = std::task::Context::from_waker(futures::task::noop_waker_ref()); - assert!(matches!( - handle.wait_for_delivery(&mut cx), - Poll::Ready(Ok(())) - )); + assert!(matches!(handle.poll_delivery(&mut cx), Poll::Ready(Ok(())))); + assert_eq!(handle.state(), &BuildReportState::Delivered); } assert_eq!(completed_partitions_for_test(&acc), 1); } #[test] - fn build_report_handle_cancel_if_pending_is_idempotent() { + fn build_report_handle_cancel_pending_is_idempotent() { let acc = Arc::new(make_partitioned_accumulator_for_test(2)); let mut handle = partitioned_handle(&acc); handle.schedule(empty_build_data(0)); - handle.cancel_if_pending(); - handle.cancel_if_pending(); + handle.cancel_pending(); + handle.cancel_pending(); + assert_eq!(handle.state(), &BuildReportState::Canceled); assert_eq!(completed_partitions_for_test(&acc), 1); } + + #[test] + fn build_report_handle_no_accumulator_finalizes() { + let mut handle = BuildReportHandle::new(0, PartitionMode::Partitioned, None); + + handle.schedule(empty_build_data(0)); + handle.cancel_pending(); + + assert_eq!(handle.state(), &BuildReportState::Finalized); + } }