From cbfe2ed8aaffb0ce566376b77cb5ffae60c89936 Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Tue, 13 May 2025 12:05:04 -0400 Subject: [PATCH 1/5] WIP checkin --- src/persist-client/src/batch.rs | 54 +++++++++++++ src/persist-client/src/internal/compact.rs | 25 +++++- src/persist-client/src/internal/encoding.rs | 4 +- src/persist-client/src/internal/machine.rs | 22 ++++++ src/persist-client/src/internal/state.proto | 1 + src/persist-client/src/internal/state.rs | 22 ++++++ src/persist-client/src/internal/trace.rs | 88 +++++++++++++++++++-- 7 files changed, 207 insertions(+), 9 deletions(-) diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index f600f1bd03988..949a6dd215fea 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -676,6 +676,41 @@ where } } + pub fn batch_with_finished_parts( + &self, + registered_desc: Description, + ) -> Option> { + let runs = self.parts.finish_completed_runs(); + + if runs.is_empty() { return None; } + + let mut run_parts = vec![]; + let mut run_splits = vec![]; + let mut run_meta = vec![]; + for (order, parts) in runs { + if parts.is_empty() { + continue; + } + if run_parts.len() != 0 { + run_splits.push(run_parts.len()); + } + run_meta.push(RunMeta { + order: Some(order), + schema: self.write_schemas.id, + // Field has been deprecated but kept around to roundtrip state. + deprecated_schema: None, + }); + run_parts.extend(parts); + } + //TODO(dov): should we fetch the last part and constrain the upper bound + // to whatever we have actually flushed? + let desc = registered_desc; + + let batch = HollowBatch::new(desc, run_parts, self.num_updates, run_meta, run_splits); + + Some(batch) + } + /// Finish writing this batch and return a handle to the written batch. /// /// This fails if any of the updates in this batch are beyond the given @@ -843,6 +878,7 @@ impl BatchParts { shard_metrics, isolated_runtime, write_schemas, + None ) .await .expect("successful compaction"); @@ -941,6 +977,7 @@ impl BatchParts { } } + pub(crate) async fn write( &mut self, write_schemas: &Schemas, @@ -1229,6 +1266,23 @@ impl BatchParts { }) } + pub(crate) fn finish_completed_runs(&self) -> Vec<(RunOrder, Vec>)> { + match &self.writing_runs { + WritingRuns::Ordered(order, tree) => tree.iter() + .take_while(|part| matches!(part, Pending::Finished(_))).map(|part| match part { + Pending::Finished(p) => (order.clone(), vec![p.clone()]), + _ => (order.clone(), vec![]), + }).collect(), + WritingRuns::Compacting(tree) => tree.iter() + .take_while(|(_, run)| matches!(run, Pending::Finished(_))) + .map(|(order, run)| match run { + Pending::Finished(parts) => (order.clone(), parts.clone()), + _ => (order.clone(), vec![]), + }) + .collect() + } + } + #[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))] pub(crate) async fn finish(self) -> Vec<(RunOrder, Vec>)> { match self.writing_runs { diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 7cf476773f4c6..e9cdd43b3c937 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -636,6 +636,7 @@ where Arc::clone(&shard_metrics), Arc::clone(&isolated_runtime), write_schemas.clone(), + None ) .await?; @@ -657,7 +658,7 @@ where // Set up active compaction metadata let clock = SYSTEM_TIME.clone(); let active_compaction = if applied < total_chunked_runs - 1 { - Some(ActiveCompaction { start_ms: clock() }) + Some(ActiveCompaction { start_ms: clock(), batch_so_far: None }) } else { None }; @@ -831,6 +832,7 @@ where shard_metrics: Arc, isolated_runtime: Arc, write_schemas: Schemas, + incremental_tx: Option>> ) -> Result, anyhow::Error> { // TODO: Figure out a more principled way to allocate our memory budget. // Currently, we give any excess budget to write parallelism. If we had @@ -928,6 +930,27 @@ where else { break; }; + + if let Some(tx) = incremental_tx.as_ref() { + // This is where we record whatever parts were successfully flushed + // to blob. That way we can resume an interrupted compaction later. + let partial_batch = batch.batch_with_finished_parts(desc.clone()); + if let Some(partial_batch) = partial_batch { + match tx.send(partial_batch).await { + Ok(_) => { + // metrics.compaction.incremental_batch_sent.inc(); + }, + Err(e) => { + error!("Failed to send batch to incremental compaction: {}", e); + // metrics.compaction.incremental_batch_send_fail.inc() + } + }; + } + } + + // part + // upper bound kvt + // find the last part where the lower bound < recorded upper bound batch.flush_part(desc.clone(), updates).await; } let mut batch = batch.finish(desc.clone()).await?; diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index e81a462dd78e4..f65262454b373 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -1099,16 +1099,18 @@ impl ProtoMapEntry> for Proto } } -impl RustType for ActiveCompaction { +impl RustType for ActiveCompaction { fn into_proto(&self) -> ProtoCompaction { ProtoCompaction { start_ms: self.start_ms, + batch_so_far: self.batch_so_far.as_ref().map(|x| x.into_proto()), } } fn from_proto(proto: ProtoCompaction) -> Result { Ok(Self { start_ms: proto.start_ms, + batch_so_far: proto.batch_so_far.map(|x| x.into_rust()).transpose()?, }) } } diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 3cbd669f314b3..757ec2346014e 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -596,6 +596,28 @@ where } } + /// As we build up batches during compaction, we incrementally commit + /// information about the completed (merged and persisted) parts. + /// That way if compaction is interrupted, we can safely resume the work. + /// + /// `checkpoint_compaction_progress` stores that in progress batch in state. + pub async fn checkpoint_compaction_progress( + &self, + batch_so_far: &HollowBatch, + current_ts: u64, + ) { + let metrics = Arc::clone(&self.applier.metrics); + + //TODO(dov): new metric + let _ = self.apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| { + let ret = state.apply_compaction_progress(batch_so_far, current_ts); + if let Continue(_) = ret { + // metrics.state.compaction_progress_applied.inc(); + } + ret + }).await; + } + pub async fn merge_res( &self, res: &FueledMergeRes, diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto index f89a4e678f851..4a370011cf1b6 100644 --- a/src/persist-client/src/internal/state.proto +++ b/src/persist-client/src/internal/state.proto @@ -131,6 +131,7 @@ message ProtoIdSpineBatch { message ProtoCompaction { uint64 start_ms = 1; + optional ProtoHollowBatch batch_so_far = 2; } message ProtoMerge { diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index dd47b77c476ec..ca5f88fcbbe26 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -1658,6 +1658,7 @@ where req.id, ActiveCompaction { start_ms: heartbeat_timestamp_ms, + batch_so_far: None }, ) } @@ -1684,6 +1685,27 @@ where Continue(merge_reqs) } + /// This is a best effort attempt to apply incremental compaction + /// to the spine. + pub fn apply_compaction_progress( + &mut self, + batch_so_far: &HollowBatch, + new_ts: u64, + ) -> ControlFlow, ()> { + if self.is_tombstone() { + return Break(NoOpStateTransition(())); + } + + let new_active_compaction = ActiveCompaction { + start_ms: new_ts, + batch_so_far: Some(batch_so_far.clone()) + }; + + self.trace.apply_incremental_compaction(&new_active_compaction); + + Continue(()) + } + pub fn apply_merge_res( &mut self, res: &FueledMergeRes, diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index fcf27e21afd87..4b0fce3af1105 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -77,7 +77,7 @@ pub struct FueledMergeReq { #[derive(Debug)] pub struct FueledMergeRes { pub output: HollowBatch, - pub new_active_compaction: Option, + pub new_active_compaction: Option>, } /// An append-only collection of compactable update batches. @@ -140,7 +140,7 @@ impl PartialEq for ThinSpineBatch { pub struct ThinMerge { pub(crate) since: Antichain, pub(crate) remaining_work: usize, - pub(crate) active_compaction: Option, + pub(crate) active_compaction: Option>, } impl ThinMerge { @@ -517,7 +517,7 @@ impl Trace { Self::remove_redundant_merge_reqs(merge_reqs) } - pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) { + pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) { // TODO: we ought to be able to look up the id for a batch by binary searching the levels. // In the meantime, search backwards, since most compactions are for recent batches. for batch in self.spine.spine_batches_mut().rev() { @@ -563,6 +563,15 @@ impl Trace { self.spine.validate() } + pub fn apply_incremental_compaction(&mut self, compaction: &ActiveCompaction) { + for batch in self.spine.spine_batches_mut().rev() { + let result = batch.maybe_update_active_compaction(compaction); + if result.matched() { + return; + } + } + } + pub fn apply_merge_res(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { for batch in self.spine.spine_batches_mut().rev() { let result = batch.maybe_replace(res); @@ -690,8 +699,9 @@ pub struct IdHollowBatch { } #[derive(Debug, Clone, Eq, PartialEq, Serialize)] -pub struct ActiveCompaction { +pub struct ActiveCompaction { pub start_ms: u64, + pub batch_so_far: Option> } #[derive(Debug, Clone, PartialEq)] @@ -699,13 +709,13 @@ struct SpineBatch { id: SpineId, desc: Description, parts: Vec>, - active_compaction: Option, + active_compaction: Option>, // A cached version of parts.iter().map(|x| x.len).sum() len: usize, } impl SpineBatch { - fn merged(batch: IdHollowBatch, active_compaction: Option) -> Self + fn merged(batch: IdHollowBatch, active_compaction: Option>) -> Self where T: Clone, { @@ -828,6 +838,70 @@ impl SpineBatch { }) } + fn maybe_update_active_compaction(&mut self, active_compaction: &ActiveCompaction) -> ApplyMergeResult { + let batch = match active_compaction.batch_so_far.as_ref() { + Some(batch) => batch, + None => return ApplyMergeResult::NotAppliedNoMatch, + }; + + // The spine's and merge res's sinces don't need to match (which could occur if Spine + // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine + // since must be in advance of the merge res since. + if !PartialOrder::less_equal(batch.desc.since(), self.desc().since()) { + return ApplyMergeResult::NotAppliedInvalidSince; + } + + // If our merge result exactly matches a spine batch, we can swap it in directly + let exact_match = batch.desc.lower() == self.desc().lower() + && batch.desc.upper() == self.desc().upper(); + if exact_match { + self.active_compaction = Some(active_compaction.clone()); + + return ApplyMergeResult::AppliedExact; + } + + // It is possible the structure of the spine has changed since the merge res + // was created, such that it no longer exactly matches the description of a + // spine batch. This can happen if another merge has happened in the interim, + // or if spine needed to be rebuilt from state. + // + // When this occurs, we can still attempt to slot the merge res in to replace + // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts + // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res. + let SpineBatch { + id, + parts, + desc, + active_compaction: _, + len: _, + } = self; + // first, determine if a subset of parts can be cleanly replaced by the merge res + let mut lower = None; + let mut upper = None; + for (i, batch) in parts.iter().enumerate() { + if batch.batch.desc.lower() == desc.lower() { + lower = Some((i, batch.id.0)); + } + if batch.batch.desc.upper() == desc.upper() { + upper = Some((i, batch.id.1)); + } + if lower.is_some() && upper.is_some() { + break; + } + } + + match (lower, upper) { + (Some((_lower, _id_lower)), Some((_upper, _id_upper))) => { + // This is a subset of the batch, so we can update the active compaction. + // Normally we would want to actually slot the merge res in, but we aren't + // focusing on that here. + self.active_compaction = Some(active_compaction.clone()); + ApplyMergeResult::AppliedSubset + } + _ => ApplyMergeResult::NotAppliedNoMatch, + } + } + // TODO: Roundtrip the SpineId through FueledMergeReq/FueledMergeRes? fn maybe_replace(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { // The spine's and merge res's sinces don't need to match (which could occur if Spine @@ -1876,7 +1950,7 @@ pub(crate) mod tests { .fueled_merge_reqs_before_ms(timeout_ms, None) .collect(); for req in reqs { - trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 }) + trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0, batch_so_far: None }) } trace.roundtrip_structure = roundtrip_structure; trace From bd0e49186d2e61a064c7de31b7261537a015cbb1 Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Thu, 15 May 2025 18:10:46 -0400 Subject: [PATCH 2/5] More complete filtering for resumes --- src/persist-client/src/batch.rs | 2 +- src/persist-client/src/cli/admin.rs | 1 + src/persist-client/src/internal/compact.rs | 79 ++++++++++++++++++++-- src/persist-client/src/internal/machine.rs | 3 + src/persist-client/src/internal/state.rs | 29 ++++++++ src/persist-client/src/iter.rs | 32 +++++++-- src/persist-client/src/read.rs | 16 ++--- 7 files changed, 141 insertions(+), 21 deletions(-) diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index 949a6dd215fea..b691f2f0d13dd 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -46,7 +46,6 @@ use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; use tracing::{Instrument, debug_span, trace_span, warn}; - use crate::async_runtime::IsolatedRuntime; use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB}; use crate::error::InvalidUsage; @@ -878,6 +877,7 @@ impl BatchParts { shard_metrics, isolated_runtime, write_schemas, + None, None ) .await diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index bb5e000081a2f..c8b4b7ba80020 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -462,6 +462,7 @@ where .into_iter() .map(|b| Arc::unwrap_or_clone(b.batch)) .collect(), + prev_batch: None, }; let parts = req.inputs.iter().map(|x| x.part_count()).sum::(); let bytes = req diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index e9cdd43b3c937..3f2134b17feb3 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -33,7 +33,7 @@ use timely::progress::{Antichain, Timestamp}; use tokio::sync::mpsc::Sender; use tokio::sync::{TryAcquireError, mpsc, oneshot}; use tracing::{Instrument, Span, debug, debug_span, error, trace, warn}; - +use mz_persist_types::arrow::ArrayBound; use crate::async_runtime::IsolatedRuntime; use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes}; use crate::cfg::{ @@ -41,7 +41,7 @@ use crate::cfg::{ COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES, GC_BLOB_DELETE_CONCURRENCY_LIMIT, INCREMENTAL_COMPACTION_DISABLED, MiB, }; -use crate::fetch::FetchBatchFilter; +use crate::fetch::{EncodedPart, FetchBatchFilter}; use crate::internal::encoding::Schemas; use crate::internal::gc::GarbageCollector; use crate::internal::machine::Machine; @@ -49,7 +49,7 @@ use crate::internal::maintenance::RoutineMaintenance; use crate::internal::metrics::ShardMetrics; use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart}; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; -use crate::iter::{Consolidator, StructuredSort}; +use crate::iter::{Consolidator, SortKV, StructuredSort}; use crate::{Metrics, PersistConfig, ShardId}; use super::trace::ActiveCompaction; @@ -68,6 +68,10 @@ pub struct CompactReq { /// The updates to include in the output batch. Any data in these outside of /// the output descriptions bounds should be ignored. pub inputs: Vec>, + + /// If this compaction is a resume of a previously interrupted compaction + /// then prev_batch contains the work done so far. + pub prev_batch: Option>, } /// A response from compaction. @@ -601,6 +605,9 @@ where let run_reserved_memory_bytes = cfg.compaction_memory_bound_bytes - in_progress_part_reserved_memory_bytes; + //if there is an active compaction + //We should call compact_runs with the same + // Flatten the input batches into a single list of runs let ordered_runs = Self::order_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?; @@ -636,6 +643,7 @@ where Arc::clone(&shard_metrics), Arc::clone(&isolated_runtime), write_schemas.clone(), + None, None ) .await?; @@ -826,12 +834,13 @@ where cfg: &CompactConfig, shard_id: &ShardId, desc: &Description, - runs: Vec<(&Description, &RunMeta, &[RunPart])>, + mut runs: Vec<(&Description, &RunMeta, &[RunPart])>, blob: Arc, metrics: Arc, shard_metrics: Arc, isolated_runtime: Arc, write_schemas: Schemas, + batch_so_far: Option>, incremental_tx: Option>> ) -> Result, anyhow::Error> { // TODO: Figure out a more principled way to allocate our memory budget. @@ -847,12 +856,67 @@ where let mut batch_cfg = cfg.batch.clone(); + let mut k_bound = None; + let mut v_bound = None; + let mut timestamp = None; + let mut sort: Option<(SortKV<'_>, T)> = None; + // Use compaction as a method of getting inline writes out of state, to // make room for more inline writes. We could instead do this at the end // of compaction by flushing out the batch, but doing it here based on // the config allows BatchBuilder to do its normal pipelining of writes. batch_cfg.inline_writes_single_max_bytes = 0; + if let Some(batch_so_far) = batch_so_far { + let last_part = batch_so_far.last_part(shard_id.clone(), &*blob, &metrics).await; + if let Some(last_part) = last_part { + let fetched = EncodedPart::fetch( + shard_id, + &*blob, + &metrics, + &shard_metrics, + &metrics.read.batch_fetcher, + &batch_so_far.desc, + &last_part, + ).await.map_err(|blob_key| anyhow!("missing key {blob_key}"))?; + + let updates = fetched.normalize(&metrics.columnar); + let structured = updates.as_structured::( + write_schemas.key.as_ref(), + write_schemas.val.as_ref(), + ); + let part = match structured.as_part() { + Some(p) => p, + None => return Err(anyhow!("unexpected empty part")), + }; + + let last = part.len() - 1; + let key_bound = ArrayBound::new(part.key.clone(), last); + let val_bound = ArrayBound::new(part.val.clone(), last); + timestamp = Some(T::decode(part.time.values()[last].to_le_bytes())); + k_bound = Some(key_bound); + v_bound = Some(val_bound); + } + }; + + if let (Some(k_ref), Some(v_ref), Some(ts_value)) = (k_bound.as_ref(), v_bound.as_ref(), timestamp) { + let key_idx = k_ref.get(); + let val_idx = v_ref.get(); + sort = Some(((key_idx, Some(val_idx)), ts_value)); + + for (_, _, run) in &mut runs { + let start = run.iter() + .position(|part| { + part.structured_key_lower() + .map_or(true, |lower| lower.get() >= key_idx) + }) + .unwrap_or(run.len()); + + *run = &run[start.saturating_sub(1)..]; + } + } + + let parts = BatchParts::new_ordered( batch_cfg, cfg.batch.preferred_order, @@ -889,9 +953,10 @@ where FetchBatchFilter::Compaction { since: desc.since().clone(), }, + sort, prefetch_budget_bytes, ); - + for (desc, meta, parts) in runs { consolidator.enqueue_run(desc, meta, parts.iter().cloned()); } @@ -1085,6 +1150,7 @@ mod tests { Antichain::from_elem(10u64), ), inputs: vec![b0, b1], + prev_batch: None, }; let schemas = Schemas { id: None, @@ -1164,6 +1230,7 @@ mod tests { Antichain::from_elem(Product::new(10, 0)), ), inputs: vec![b0, b1], + prev_batch: None, }; let schemas = Schemas { id: None, @@ -1234,6 +1301,7 @@ mod tests { Antichain::from_elem(10u64), ), inputs: vec![b0, b1], + prev_batch: None, }; write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1); let compactor = write.compact.as_ref().expect("compaction hard disabled"); @@ -1279,6 +1347,7 @@ mod tests { Antichain::from_elem(20u64), ), inputs: vec![b2, b3], + prev_batch: None, }; let compactor = write.compact.as_ref().expect("compaction hard disabled"); diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 757ec2346014e..f921cae033af0 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -278,6 +278,7 @@ where .into_iter() .map(|b| Arc::unwrap_or_clone(b.batch)) .collect(), + prev_batch: None, }) .collect(); (reqs, maintenance) @@ -500,6 +501,7 @@ where .into_iter() .map(|b| Arc::unwrap_or_clone(b.batch)) .collect(), + prev_batch: None, }; compact_reqs.push(req); } @@ -2019,6 +2021,7 @@ pub mod datadriven { shard_id: datadriven.shard_id, desc: Description::new(lower, upper, since), inputs, + prev_batch: None, }; let req_clone = req.clone(); diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index ca5f88fcbbe26..955cba3157521 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -638,6 +638,21 @@ impl RunPart { } } } + pub async fn last_part<'a>( + &'a self, + shard_id: ShardId, + blob: &'a dyn Blob, + metrics: &'a Metrics, + ) -> Result>, MissingBlob> { + match self { + RunPart::Single(p) => Ok(Box::new(p.clone())), + RunPart::Many(r) => { + let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; + let last_part = fetched.parts.last().ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; + Ok(Box::pin(last_part.last_part(shard_id, blob, metrics)).await?) + }, + } + } } impl PartialOrd for BatchPart { @@ -883,6 +898,20 @@ impl HollowBatch { } } } + + pub async fn last_part<'a>( + &'a self, + shard_id: ShardId, + blob: &'a dyn Blob, + metrics: &'a Metrics, + ) -> Option> { + let last_part = self.parts.last()?; + let last_part = last_part.last_part(shard_id, blob, metrics).await; + match last_part { + Ok(part) => Some(*part), + Err(MissingBlob(_)) => None, + } + } } impl HollowBatch { /// Construct an in-memory hollow batch from the given metadata. diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 129b85f065292..9911cc839176b 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -157,8 +157,7 @@ impl StructuredSort { } } -type SortKV<'a> = (ArrayIdx<'a>, Option>); - +pub type SortKV<'a> = (ArrayIdx<'a>, Option>); fn kv_lower(data: &FetchData) -> Option> { let key_idx = data.structured_lower.as_ref().map(|l| l.get())?; Some((key_idx, None)) @@ -332,7 +331,7 @@ impl ConsolidationPart { /// client should call `next` until it returns `None`, which signals all data has been returned... /// but it's also free to abandon the instance at any time if it eg. only needs a few entries. #[derive(Debug)] -pub(crate) struct Consolidator> { +pub(crate) struct Consolidator<'a, T, D, Sort: RowSort> { context: String, shard_id: ShardId, sort: Sort, @@ -343,6 +342,7 @@ pub(crate) struct Consolidator> { runs: Vec, usize)>>, filter: FetchBatchFilter, budget: usize, + lower_bound: Option<(SortKV<'a>, T)>, // NB: this is the tricky part! // One hazard of streaming consolidation is that we may start consolidating a particular KVT, // but not be able to finish, because some other part that might also contain the same KVT @@ -351,7 +351,7 @@ pub(crate) struct Consolidator> { drop_stash: Option, } -impl Consolidator +impl<'a, T, D, Sort> Consolidator<'a, T, D, Sort> where T: Timestamp + Codec64 + Lattice, D: Codec64 + Semigroup + Ord, @@ -369,6 +369,7 @@ where shard_metrics: Arc, read_metrics: ReadMetrics, filter: FetchBatchFilter, + lower_bound: Option<(SortKV<'a>, T)>, prefetch_budget_bytes: usize, ) -> Self { Self { @@ -382,12 +383,13 @@ where runs: vec![], filter, budget: prefetch_budget_bytes, + lower_bound, drop_stash: None, } } } -impl Consolidator +impl<'a, T, D, Sort> Consolidator<'a, T, D, Sort> where T: Timestamp + Codec64 + Lattice + Sync, D: Codec64 + Semigroup + Ord, @@ -485,7 +487,7 @@ where return None; } - let mut iter = ConsolidatingIter::new(&self.context, &self.filter, &mut self.drop_stash); + let mut iter = ConsolidatingIter::new(&self.context, &self.filter, &self.lower_bound, &mut self.drop_stash); for run in &mut self.runs { let last_in_run = run.len() < 2; @@ -754,7 +756,7 @@ where } } -impl> Drop for Consolidator { +impl<'a, T, D, Sort: RowSort> Drop for Consolidator<'a, T, D, Sort> { fn drop(&mut self) { for run in &self.runs { for (part, _) in run { @@ -837,6 +839,7 @@ where parts: Vec<&'a StructuredUpdates>, heap: BinaryHeap>, upper_bound: Option<(SortKV<'a>, T)>, + lower_bound: &'a Option<(SortKV<'a>, T)>, state: Option<(Indices, SortKV<'a>, T, D)>, drop_stash: &'a mut Option, } @@ -849,6 +852,7 @@ where fn new( context: &'a str, filter: &'a FetchBatchFilter, + lower_bound: &'a Option<(SortKV<'a>, T)>, drop_stash: &'a mut Option, ) -> Self { Self { @@ -857,6 +861,7 @@ where parts: vec![], heap: BinaryHeap::new(), upper_bound: None, + lower_bound, state: None, drop_stash, } @@ -926,6 +931,17 @@ where } } + // This code checks our inclusive lower bound against + if let Some((kv_lower, t_lower)) = &self.lower_bound { + if (kv_lower, t_lower) >= (kv1, t1) { + // Discard this item from the part, since it's past our lower bound. + let _ = part.pop(&self.parts, self.filter); + + // Continue to the next part, since it might still be relevant. + continue; + } + } + self.state = part.pop(&self.parts, self.filter); } } else { @@ -1093,6 +1109,7 @@ mod tests { filter, budget: 0, drop_stash: None, + lower_bound: None, }; let mut out = vec![]; @@ -1164,6 +1181,7 @@ mod tests { FetchBatchFilter::Compaction { since: desc.since().clone(), }, + None, budget, ); diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 03cebe1e64a76..1cec58c262cb9 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -864,22 +864,22 @@ pub(crate) struct UnexpiredReadHandleState { /// client should call `next` until it returns `None`, which signals all data has been returned... /// but it's also free to abandon the instance at any time if it eg. only needs a few entries. #[derive(Debug)] -pub struct Cursor { - consolidator: CursorConsolidator, +pub struct Cursor<'a, K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> { + consolidator: CursorConsolidator<'a, K, V, T, D>, _lease: Lease, read_schemas: Schemas, } #[derive(Debug)] -enum CursorConsolidator { +enum CursorConsolidator<'a, K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> { Structured { - consolidator: Consolidator>, + consolidator: Consolidator<'a, T, D, StructuredSort>, max_len: usize, max_bytes: usize, }, } -impl Cursor +impl<'a, K, V, T, D> Cursor<'a, K, V, T, D> where K: Debug + Codec + Ord, V: Debug + Codec + Ord, @@ -950,6 +950,7 @@ where &mut self, as_of: Antichain, ) -> Result, Result), T, D)>, Since> { + let shard_metrics = &self.machine.applier.shard_metrics.clone(); let mut cursor = self.snapshot_cursor(as_of, |_| true).await?; let mut contents = Vec::new(); while let Some(iter) = cursor.next().await { @@ -962,9 +963,7 @@ where consolidate_updates(&mut contents); if old_len != contents.len() { // TODO(bkirwi): do we need more / finer-grained metrics for this? - self.machine - .applier - .shard_metrics + shard_metrics .unconsolidated_snapshot .inc(); } @@ -1001,6 +1000,7 @@ where Arc::clone(&self.machine.applier.shard_metrics), self.metrics.read.snapshot.clone(), filter, + None, COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg), ); for batch in batches { From 3f74c77b6b2f6205e18396e1835517d10911ed1c Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Thu, 15 May 2025 19:51:17 -0400 Subject: [PATCH 3/5] Use owned data instead --- src/persist-client/src/internal/compact.rs | 74 ++++++++++++---------- src/persist-client/src/iter.rs | 37 +++++++---- src/persist-client/src/read.rs | 15 +++-- src/persist-types/src/arrow.rs | 24 +++++++ 4 files changed, 99 insertions(+), 51 deletions(-) diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 3f2134b17feb3..4a2a0faf23aeb 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -15,6 +15,23 @@ use std::pin::pin; use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::async_runtime::IsolatedRuntime; +use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes}; +use crate::cfg::{ + COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS, + COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES, + GC_BLOB_DELETE_CONCURRENCY_LIMIT, INCREMENTAL_COMPACTION_DISABLED, MiB, +}; +use crate::fetch::{EncodedPart, FetchBatchFilter}; +use crate::internal::encoding::Schemas; +use crate::internal::gc::GarbageCollector; +use crate::internal::machine::Machine; +use crate::internal::maintenance::RoutineMaintenance; +use crate::internal::metrics::ShardMetrics; +use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart}; +use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; +use crate::iter::{Consolidator, OwnedSortKV, SortKV, StructuredSort}; +use crate::{Metrics, PersistConfig, ShardId}; use anyhow::anyhow; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; @@ -26,6 +43,7 @@ use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; use mz_ore::now::SYSTEM_TIME; use mz_persist::location::Blob; +use mz_persist_types::arrow::{ArrayBound, OwnedArrayIdx}; use mz_persist_types::part::Part; use mz_persist_types::{Codec, Codec64}; use timely::PartialOrder; @@ -33,24 +51,6 @@ use timely::progress::{Antichain, Timestamp}; use tokio::sync::mpsc::Sender; use tokio::sync::{TryAcquireError, mpsc, oneshot}; use tracing::{Instrument, Span, debug, debug_span, error, trace, warn}; -use mz_persist_types::arrow::ArrayBound; -use crate::async_runtime::IsolatedRuntime; -use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes}; -use crate::cfg::{ - COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS, - COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES, - GC_BLOB_DELETE_CONCURRENCY_LIMIT, INCREMENTAL_COMPACTION_DISABLED, MiB, -}; -use crate::fetch::{EncodedPart, FetchBatchFilter}; -use crate::internal::encoding::Schemas; -use crate::internal::gc::GarbageCollector; -use crate::internal::machine::Machine; -use crate::internal::maintenance::RoutineMaintenance; -use crate::internal::metrics::ShardMetrics; -use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart}; -use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; -use crate::iter::{Consolidator, SortKV, StructuredSort}; -use crate::{Metrics, PersistConfig, ShardId}; use super::trace::ActiveCompaction; @@ -841,7 +841,7 @@ where isolated_runtime: Arc, write_schemas: Schemas, batch_so_far: Option>, - incremental_tx: Option>> + incremental_tx: Option>>, ) -> Result, anyhow::Error> { // TODO: Figure out a more principled way to allocate our memory budget. // Currently, we give any excess budget to write parallelism. If we had @@ -859,7 +859,7 @@ where let mut k_bound = None; let mut v_bound = None; let mut timestamp = None; - let mut sort: Option<(SortKV<'_>, T)> = None; + let mut sort: Option<(OwnedSortKV, T)> = None; // Use compaction as a method of getting inline writes out of state, to // make room for more inline writes. We could instead do this at the end @@ -868,7 +868,9 @@ where batch_cfg.inline_writes_single_max_bytes = 0; if let Some(batch_so_far) = batch_so_far { - let last_part = batch_so_far.last_part(shard_id.clone(), &*blob, &metrics).await; + let last_part = batch_so_far + .last_part(shard_id.clone(), &*blob, &metrics) + .await; if let Some(last_part) = last_part { let fetched = EncodedPart::fetch( shard_id, @@ -878,13 +880,13 @@ where &metrics.read.batch_fetcher, &batch_so_far.desc, &last_part, - ).await.map_err(|blob_key| anyhow!("missing key {blob_key}"))?; + ) + .await + .map_err(|blob_key| anyhow!("missing key {blob_key}"))?; let updates = fetched.normalize(&metrics.columnar); - let structured = updates.as_structured::( - write_schemas.key.as_ref(), - write_schemas.val.as_ref(), - ); + let structured = updates + .as_structured::(write_schemas.key.as_ref(), write_schemas.val.as_ref()); let part = match structured.as_part() { Some(p) => p, None => return Err(anyhow!("unexpected empty part")), @@ -899,13 +901,22 @@ where } }; - if let (Some(k_ref), Some(v_ref), Some(ts_value)) = (k_bound.as_ref(), v_bound.as_ref(), timestamp) { + if let (Some(k_ref), Some(v_ref), Some(ts_value)) = + (k_bound.as_ref(), v_bound.as_ref(), timestamp) + { let key_idx = k_ref.get(); let val_idx = v_ref.get(); - sort = Some(((key_idx, Some(val_idx)), ts_value)); + sort = Some(( + ( + OwnedArrayIdx::from(key_idx), + Some(OwnedArrayIdx::from(val_idx)), + ), + ts_value, + )); for (_, _, run) in &mut runs { - let start = run.iter() + let start = run + .iter() .position(|part| { part.structured_key_lower() .map_or(true, |lower| lower.get() >= key_idx) @@ -916,7 +927,6 @@ where } } - let parts = BatchParts::new_ordered( batch_cfg, cfg.batch.preferred_order, @@ -956,7 +966,7 @@ where sort, prefetch_budget_bytes, ); - + for (desc, meta, parts) in runs { consolidator.enqueue_run(desc, meta, parts.iter().cloned()); } @@ -1004,7 +1014,7 @@ where match tx.send(partial_batch).await { Ok(_) => { // metrics.compaction.incremental_batch_sent.inc(); - }, + } Err(e) => { error!("Failed to send batch to incremental compaction: {}", e); // metrics.compaction.incremental_batch_send_fail.inc() diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 9911cc839176b..71a560ed22277 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -29,10 +29,10 @@ use mz_ore::task::JoinHandle; use mz_persist::indexed::encoding::BlobTraceUpdates; use mz_persist::location::Blob; use mz_persist::metrics::ColumnarMetrics; -use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd}; +use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd, OwnedArrayIdx}; use mz_persist_types::part::Part; use mz_persist_types::{Codec, Codec64}; -use semver::Version; +use semver::{Op, Version}; use timely::progress::Timestamp; use tracing::{Instrument, debug_span}; @@ -158,6 +158,13 @@ impl StructuredSort { } pub type SortKV<'a> = (ArrayIdx<'a>, Option>); +pub type OwnedSortKV = (OwnedArrayIdx, Option); + +pub fn sort_kv_from_owned<'a>(kv: &'a OwnedSortKV) -> SortKV<'a> { + let (key, value) = kv; + (key.as_ref(), value.as_ref().map(|v| v.as_ref())) +} + fn kv_lower(data: &FetchData) -> Option> { let key_idx = data.structured_lower.as_ref().map(|l| l.get())?; Some((key_idx, None)) @@ -331,7 +338,7 @@ impl ConsolidationPart { /// client should call `next` until it returns `None`, which signals all data has been returned... /// but it's also free to abandon the instance at any time if it eg. only needs a few entries. #[derive(Debug)] -pub(crate) struct Consolidator<'a, T, D, Sort: RowSort> { +pub(crate) struct Consolidator> { context: String, shard_id: ShardId, sort: Sort, @@ -342,7 +349,7 @@ pub(crate) struct Consolidator<'a, T, D, Sort: RowSort> { runs: Vec, usize)>>, filter: FetchBatchFilter, budget: usize, - lower_bound: Option<(SortKV<'a>, T)>, + lower_bound: Option<(OwnedSortKV, T)>, // NB: this is the tricky part! // One hazard of streaming consolidation is that we may start consolidating a particular KVT, // but not be able to finish, because some other part that might also contain the same KVT @@ -351,7 +358,7 @@ pub(crate) struct Consolidator<'a, T, D, Sort: RowSort> { drop_stash: Option, } -impl<'a, T, D, Sort> Consolidator<'a, T, D, Sort> +impl Consolidator where T: Timestamp + Codec64 + Lattice, D: Codec64 + Semigroup + Ord, @@ -369,7 +376,7 @@ where shard_metrics: Arc, read_metrics: ReadMetrics, filter: FetchBatchFilter, - lower_bound: Option<(SortKV<'a>, T)>, + lower_bound: Option<(OwnedSortKV, T)>, prefetch_budget_bytes: usize, ) -> Self { Self { @@ -389,7 +396,7 @@ where } } -impl<'a, T, D, Sort> Consolidator<'a, T, D, Sort> +impl Consolidator where T: Timestamp + Codec64 + Lattice + Sync, D: Codec64 + Semigroup + Ord, @@ -487,7 +494,12 @@ where return None; } - let mut iter = ConsolidatingIter::new(&self.context, &self.filter, &self.lower_bound, &mut self.drop_stash); + let mut iter = ConsolidatingIter::new( + &self.context, + &self.filter, + &self.lower_bound, + &mut self.drop_stash, + ); for run in &mut self.runs { let last_in_run = run.len() < 2; @@ -756,7 +768,7 @@ where } } -impl<'a, T, D, Sort: RowSort> Drop for Consolidator<'a, T, D, Sort> { +impl> Drop for Consolidator { fn drop(&mut self) { for run in &self.runs { for (part, _) in run { @@ -839,7 +851,7 @@ where parts: Vec<&'a StructuredUpdates>, heap: BinaryHeap>, upper_bound: Option<(SortKV<'a>, T)>, - lower_bound: &'a Option<(SortKV<'a>, T)>, + lower_bound: &'a Option<(OwnedSortKV, T)>, state: Option<(Indices, SortKV<'a>, T, D)>, drop_stash: &'a mut Option, } @@ -852,7 +864,7 @@ where fn new( context: &'a str, filter: &'a FetchBatchFilter, - lower_bound: &'a Option<(SortKV<'a>, T)>, + lower_bound: &'a Option<(OwnedSortKV, T)>, drop_stash: &'a mut Option, ) -> Self { Self { @@ -933,7 +945,8 @@ where // This code checks our inclusive lower bound against if let Some((kv_lower, t_lower)) = &self.lower_bound { - if (kv_lower, t_lower) >= (kv1, t1) { + let sort_kv = sort_kv_from_owned(kv_lower); + if (&sort_kv, t_lower) >= (kv1, t1) { // Discard this item from the part, since it's past our lower bound. let _ = part.pop(&self.parts, self.filter); diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 1cec58c262cb9..01e4c17286e34 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -864,22 +864,22 @@ pub(crate) struct UnexpiredReadHandleState { /// client should call `next` until it returns `None`, which signals all data has been returned... /// but it's also free to abandon the instance at any time if it eg. only needs a few entries. #[derive(Debug)] -pub struct Cursor<'a, K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> { - consolidator: CursorConsolidator<'a, K, V, T, D>, +pub struct Cursor { + consolidator: CursorConsolidator, _lease: Lease, read_schemas: Schemas, } #[derive(Debug)] -enum CursorConsolidator<'a, K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> { +enum CursorConsolidator { Structured { - consolidator: Consolidator<'a, T, D, StructuredSort>, + consolidator: Consolidator>, max_len: usize, max_bytes: usize, }, } -impl<'a, K, V, T, D> Cursor<'a, K, V, T, D> +impl Cursor where K: Debug + Codec + Ord, V: Debug + Codec + Ord, @@ -950,7 +950,6 @@ where &mut self, as_of: Antichain, ) -> Result, Result), T, D)>, Since> { - let shard_metrics = &self.machine.applier.shard_metrics.clone(); let mut cursor = self.snapshot_cursor(as_of, |_| true).await?; let mut contents = Vec::new(); while let Some(iter) = cursor.next().await { @@ -963,7 +962,9 @@ where consolidate_updates(&mut contents); if old_len != contents.len() { // TODO(bkirwi): do we need more / finer-grained metrics for this? - shard_metrics + self.machine + .applier + .shard_metrics .unconsolidated_snapshot .inc(); } diff --git a/src/persist-types/src/arrow.rs b/src/persist-types/src/arrow.rs index c1700952f435c..1e694f5ef2027 100644 --- a/src/persist-types/src/arrow.rs +++ b/src/persist-types/src/arrow.rs @@ -449,6 +449,30 @@ pub struct ArrayIdx<'a> { pub array: &'a ArrayOrd, } +#[derive(Clone, Debug)] +pub struct OwnedArrayIdx { + pub idx: usize, + pub array: ArrayOrd, +} + +impl<'a> From> for OwnedArrayIdx { + fn from(idx: ArrayIdx<'a>) -> Self { + OwnedArrayIdx { + idx: idx.idx, + array: idx.array.clone(), + } + } +} + +impl OwnedArrayIdx { + pub fn as_ref(&self) -> ArrayIdx<'_> { + ArrayIdx { + idx: self.idx, + array: &self.array, + } + } +} + impl Display for ArrayIdx<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.array { From 626cc6a312ea6240bf756056c30de81485e1e0cc Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Thu, 15 May 2025 20:25:01 -0400 Subject: [PATCH 4/5] Clean up the API a little bit --- src/persist-client/src/internal/compact.rs | 36 ++++++----------- src/persist-client/src/iter.rs | 46 ++++++++++++---------- src/persist-types/src/arrow.rs | 24 ----------- 3 files changed, 38 insertions(+), 68 deletions(-) diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 4a2a0faf23aeb..840e97af67884 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -30,7 +30,7 @@ use crate::internal::maintenance::RoutineMaintenance; use crate::internal::metrics::ShardMetrics; use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart}; use crate::internal::trace::{ApplyMergeResult, FueledMergeRes}; -use crate::iter::{Consolidator, OwnedSortKV, SortKV, StructuredSort}; +use crate::iter::{Consolidator, LowerBound, StructuredSort}; use crate::{Metrics, PersistConfig, ShardId}; use anyhow::anyhow; use differential_dataflow::difference::Semigroup; @@ -43,7 +43,7 @@ use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; use mz_ore::now::SYSTEM_TIME; use mz_persist::location::Blob; -use mz_persist_types::arrow::{ArrayBound, OwnedArrayIdx}; +use mz_persist_types::arrow::ArrayBound; use mz_persist_types::part::Part; use mz_persist_types::{Codec, Codec64}; use timely::PartialOrder; @@ -856,10 +856,7 @@ where let mut batch_cfg = cfg.batch.clone(); - let mut k_bound = None; - let mut v_bound = None; - let mut timestamp = None; - let mut sort: Option<(OwnedSortKV, T)> = None; + let mut lower_bound = None; // Use compaction as a method of getting inline writes out of state, to // make room for more inline writes. We could instead do this at the end @@ -895,31 +892,22 @@ where let last = part.len() - 1; let key_bound = ArrayBound::new(part.key.clone(), last); let val_bound = ArrayBound::new(part.val.clone(), last); - timestamp = Some(T::decode(part.time.values()[last].to_le_bytes())); - k_bound = Some(key_bound); - v_bound = Some(val_bound); + let t = T::decode(part.time.values()[last].to_le_bytes()); + lower_bound = Some(LowerBound { + val_bound, + key_bound, + t, + }); } }; - if let (Some(k_ref), Some(v_ref), Some(ts_value)) = - (k_bound.as_ref(), v_bound.as_ref(), timestamp) - { - let key_idx = k_ref.get(); - let val_idx = v_ref.get(); - sort = Some(( - ( - OwnedArrayIdx::from(key_idx), - Some(OwnedArrayIdx::from(val_idx)), - ), - ts_value, - )); - + if let Some(lower_bound) = lower_bound.as_ref() { for (_, _, run) in &mut runs { let start = run .iter() .position(|part| { part.structured_key_lower() - .map_or(true, |lower| lower.get() >= key_idx) + .map_or(true, |lower| lower.get() >= lower_bound.key_bound.get()) }) .unwrap_or(run.len()); @@ -963,7 +951,7 @@ where FetchBatchFilter::Compaction { since: desc.since().clone(), }, - sort, + lower_bound, prefetch_budget_bytes, ); diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 71a560ed22277..90e2fa3f15b7c 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -29,7 +29,7 @@ use mz_ore::task::JoinHandle; use mz_persist::indexed::encoding::BlobTraceUpdates; use mz_persist::location::Blob; use mz_persist::metrics::ColumnarMetrics; -use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd, OwnedArrayIdx}; +use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd}; use mz_persist_types::part::Part; use mz_persist_types::{Codec, Codec64}; use semver::{Op, Version}; @@ -158,12 +158,6 @@ impl StructuredSort { } pub type SortKV<'a> = (ArrayIdx<'a>, Option>); -pub type OwnedSortKV = (OwnedArrayIdx, Option); - -pub fn sort_kv_from_owned<'a>(kv: &'a OwnedSortKV) -> SortKV<'a> { - let (key, value) = kv; - (key.as_ref(), value.as_ref().map(|v| v.as_ref())) -} fn kv_lower(data: &FetchData) -> Option> { let key_idx = data.structured_lower.as_ref().map(|l| l.get())?; @@ -349,7 +343,7 @@ pub(crate) struct Consolidator> { runs: Vec, usize)>>, filter: FetchBatchFilter, budget: usize, - lower_bound: Option<(OwnedSortKV, T)>, + lower_bound: Option>, // NB: this is the tricky part! // One hazard of streaming consolidation is that we may start consolidating a particular KVT, // but not be able to finish, because some other part that might also contain the same KVT @@ -358,6 +352,22 @@ pub(crate) struct Consolidator> { drop_stash: Option, } +#[derive(Debug)] +pub struct LowerBound { + pub(crate) key_bound: ArrayBound, + pub(crate) val_bound: ArrayBound, + pub(crate) t: T, +} + +impl LowerBound { + pub fn kvt_bound(&self) -> (SortKV<'_>, T) { + ( + (self.key_bound.get(), Some(self.val_bound.get())), + self.t.clone(), + ) + } +} + impl Consolidator where T: Timestamp + Codec64 + Lattice, @@ -376,7 +386,7 @@ where shard_metrics: Arc, read_metrics: ReadMetrics, filter: FetchBatchFilter, - lower_bound: Option<(OwnedSortKV, T)>, + lower_bound: Option>, prefetch_budget_bytes: usize, ) -> Self { Self { @@ -390,8 +400,8 @@ where runs: vec![], filter, budget: prefetch_budget_bytes, - lower_bound, drop_stash: None, + lower_bound, } } } @@ -494,12 +504,9 @@ where return None; } - let mut iter = ConsolidatingIter::new( - &self.context, - &self.filter, - &self.lower_bound, - &mut self.drop_stash, - ); + let bound = self.lower_bound.as_ref().map(|b| b.kvt_bound()); + let mut iter = + ConsolidatingIter::new(&self.context, &self.filter, bound, &mut self.drop_stash); for run in &mut self.runs { let last_in_run = run.len() < 2; @@ -851,7 +858,7 @@ where parts: Vec<&'a StructuredUpdates>, heap: BinaryHeap>, upper_bound: Option<(SortKV<'a>, T)>, - lower_bound: &'a Option<(OwnedSortKV, T)>, + lower_bound: Option<(SortKV<'a>, T)>, state: Option<(Indices, SortKV<'a>, T, D)>, drop_stash: &'a mut Option, } @@ -864,7 +871,7 @@ where fn new( context: &'a str, filter: &'a FetchBatchFilter, - lower_bound: &'a Option<(OwnedSortKV, T)>, + lower_bound: Option<(SortKV<'a>, T)>, drop_stash: &'a mut Option, ) -> Self { Self { @@ -945,8 +952,7 @@ where // This code checks our inclusive lower bound against if let Some((kv_lower, t_lower)) = &self.lower_bound { - let sort_kv = sort_kv_from_owned(kv_lower); - if (&sort_kv, t_lower) >= (kv1, t1) { + if (kv_lower, t_lower) >= (kv1, t1) { // Discard this item from the part, since it's past our lower bound. let _ = part.pop(&self.parts, self.filter); diff --git a/src/persist-types/src/arrow.rs b/src/persist-types/src/arrow.rs index 1e694f5ef2027..c1700952f435c 100644 --- a/src/persist-types/src/arrow.rs +++ b/src/persist-types/src/arrow.rs @@ -449,30 +449,6 @@ pub struct ArrayIdx<'a> { pub array: &'a ArrayOrd, } -#[derive(Clone, Debug)] -pub struct OwnedArrayIdx { - pub idx: usize, - pub array: ArrayOrd, -} - -impl<'a> From> for OwnedArrayIdx { - fn from(idx: ArrayIdx<'a>) -> Self { - OwnedArrayIdx { - idx: idx.idx, - array: idx.array.clone(), - } - } -} - -impl OwnedArrayIdx { - pub fn as_ref(&self) -> ArrayIdx<'_> { - ArrayIdx { - idx: self.idx, - array: &self.array, - } - } -} - impl Display for ArrayIdx<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self.array { From d95808f752a44f28f2a48d89161bf7d1d133d03a Mon Sep 17 00:00:00 2001 From: Dov Alperin Date: Fri, 16 May 2025 23:09:26 -0400 Subject: [PATCH 5/5] Plumb incremental compaction all the way through --- src/persist-client/src/batch.rs | 57 +++++------ src/persist-client/src/cli/admin.rs | 1 + src/persist-client/src/internal/compact.rs | 68 ++++++++++--- src/persist-client/src/internal/machine.rs | 19 ++-- src/persist-client/src/internal/state.rs | 21 ++-- src/persist-client/src/internal/trace.rs | 106 ++++++++++++--------- 6 files changed, 170 insertions(+), 102 deletions(-) diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index b691f2f0d13dd..61ace2483fd71 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -17,6 +17,21 @@ use std::mem; use std::sync::Arc; use std::time::Instant; +use crate::async_runtime::IsolatedRuntime; +use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB}; +use crate::error::InvalidUsage; +use crate::internal::compact::{CompactConfig, Compactor}; +use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas}; +use crate::internal::machine::{Machine, retry_external}; +use crate::internal::merge::{MergeTree, Pending}; +use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics}; +use crate::internal::paths::{PartId, PartialBatchKey, WriterKey}; +use crate::internal::state::{ + BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart, + RunMeta, RunOrder, RunPart, +}; +use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns}; +use crate::{PersistConfig, ShardId}; use arrow::array::{Array, Int64Array}; use bytes::Bytes; use differential_dataflow::difference::Semigroup; @@ -46,21 +61,6 @@ use timely::PartialOrder; use timely::order::TotalOrder; use timely::progress::{Antichain, Timestamp}; use tracing::{Instrument, debug_span, trace_span, warn}; -use crate::async_runtime::IsolatedRuntime; -use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB}; -use crate::error::InvalidUsage; -use crate::internal::compact::{CompactConfig, Compactor}; -use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas}; -use crate::internal::machine::retry_external; -use crate::internal::merge::{MergeTree, Pending}; -use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics}; -use crate::internal::paths::{PartId, PartialBatchKey, WriterKey}; -use crate::internal::state::{ - BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart, - RunMeta, RunOrder, RunPart, -}; -use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns}; -use crate::{PersistConfig, ShardId}; include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs")); @@ -681,7 +681,9 @@ where ) -> Option> { let runs = self.parts.finish_completed_runs(); - if runs.is_empty() { return None; } + if runs.is_empty() { + return None; + } let mut run_parts = vec![]; let mut run_splits = vec![]; @@ -701,8 +703,6 @@ where }); run_parts.extend(parts); } - //TODO(dov): should we fetch the last part and constrain the upper bound - // to whatever we have actually flushed? let desc = registered_desc; let batch = HollowBatch::new(desc, run_parts, self.num_updates, run_meta, run_splits); @@ -877,8 +877,8 @@ impl BatchParts { shard_metrics, isolated_runtime, write_schemas, + &None, None, - None ) .await .expect("successful compaction"); @@ -977,7 +977,6 @@ impl BatchParts { } } - pub(crate) async fn write( &mut self, write_schemas: &Schemas, @@ -1268,18 +1267,22 @@ impl BatchParts { pub(crate) fn finish_completed_runs(&self) -> Vec<(RunOrder, Vec>)> { match &self.writing_runs { - WritingRuns::Ordered(order, tree) => tree.iter() - .take_while(|part| matches!(part, Pending::Finished(_))).map(|part| match part { - Pending::Finished(p) => (order.clone(), vec![p.clone()]), - _ => (order.clone(), vec![]), - }).collect(), - WritingRuns::Compacting(tree) => tree.iter() + WritingRuns::Ordered(order, tree) => tree + .iter() + .take_while(|part| matches!(part, Pending::Finished(_))) + .map(|part| match part { + Pending::Finished(p) => (order.clone(), vec![p.clone()]), + _ => (order.clone(), vec![]), + }) + .collect(), + WritingRuns::Compacting(tree) => tree + .iter() .take_while(|(_, run)| matches!(run, Pending::Finished(_))) .map(|(order, run)| match run { Pending::Finished(parts) => (order.clone(), parts.clone()), _ => (order.clone(), vec![]), }) - .collect() + .collect(), } } diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index c8b4b7ba80020..d1416b2242ba7 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -500,6 +500,7 @@ where Arc::new(IsolatedRuntime::default()), req.clone(), schemas, + &machine, ); pin_mut!(stream); diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 840e97af67884..32bf5d09221d7 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -397,6 +397,7 @@ where Arc::clone(&machine_clone.isolated_runtime), req.clone(), compaction_schema, + &machine_clone, ); let maintenance = @@ -570,6 +571,7 @@ where isolated_runtime: Arc, req: CompactReq, write_schemas: Schemas, + machine: &Machine, ) -> impl Stream, anyhow::Error>> { async_stream::stream! { let _ = Self::validate_req(&req)?; @@ -605,9 +607,6 @@ where let run_reserved_memory_bytes = cfg.compaction_memory_bound_bytes - in_progress_part_reserved_memory_bytes; - //if there is an active compaction - //We should call compact_runs with the same - // Flatten the input batches into a single list of runs let ordered_runs = Self::order_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?; @@ -616,6 +615,8 @@ where let chunked_runs = Self::chunk_runs(&ordered_runs, &cfg, &*metrics, run_reserved_memory_bytes); + let (incremental_tx, mut incremental_rx) = mpsc::channel(1); + let total_chunked_runs = chunked_runs.len(); let mut applied = 0; for (runs, run_chunk_max_memory_usage) in chunked_runs { @@ -643,11 +644,16 @@ where Arc::clone(&shard_metrics), Arc::clone(&isolated_runtime), write_schemas.clone(), - None, - None + &req.prev_batch, + Some(incremental_tx.clone()) ) .await?; + while let Some(res) = incremental_rx.recv().await { + let now = SYSTEM_TIME.clone(); + machine.checkpoint_compaction_progress(&res, now()).await; + } + let (parts, run_splits, run_meta, updates) = (batch.parts, batch.run_splits, batch.run_meta, batch.len); @@ -690,6 +696,7 @@ where applied += 1; } + drop(incremental_tx); } } @@ -827,6 +834,28 @@ where Ok(ordered_runs) } + fn combine_hollow_batch_with_previous( + previous_batch: &HollowBatch, + batch: &HollowBatch, + ) -> HollowBatch { + // Simplifying assumption: you can't combine batches with different descriptions. + assert_eq!(previous_batch.desc, batch.desc); + let len = previous_batch.len + batch.len; + let mut parts = Vec::with_capacity(previous_batch.parts.len() + batch.parts.len()); + parts.extend(previous_batch.parts.clone()); + parts.extend(batch.parts.clone()); + assert!(previous_batch.run_splits.is_empty()); + assert!(batch.run_splits.is_empty()); + assert_eq!(previous_batch.run_meta, batch.run_meta); + HollowBatch::new( + previous_batch.desc.clone(), + parts, + len, + previous_batch.run_meta.clone(), + previous_batch.run_splits.clone(), + ) + } + /// Compacts runs together. If the input runs are sorted, a single run will be created as output. /// /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]` @@ -840,7 +869,7 @@ where shard_metrics: Arc, isolated_runtime: Arc, write_schemas: Schemas, - batch_so_far: Option>, + batch_so_far: &Option>, incremental_tx: Option>>, ) -> Result, anyhow::Error> { // TODO: Figure out a more principled way to allocate our memory budget. @@ -864,7 +893,7 @@ where // the config allows BatchBuilder to do its normal pipelining of writes. batch_cfg.inline_writes_single_max_bytes = 0; - if let Some(batch_so_far) = batch_so_far { + if let Some(batch_so_far) = batch_so_far.as_ref() { let last_part = batch_so_far .last_part(shard_id.clone(), &*blob, &metrics) .await; @@ -994,12 +1023,20 @@ where break; }; + batch.flush_part(desc.clone(), updates).await; + if let Some(tx) = incremental_tx.as_ref() { // This is where we record whatever parts were successfully flushed // to blob. That way we can resume an interrupted compaction later. let partial_batch = batch.batch_with_finished_parts(desc.clone()); + if let Some(partial_batch) = partial_batch { - match tx.send(partial_batch).await { + let hollow_batch = if let Some(batch_so_far) = batch_so_far.as_ref() { + Self::combine_hollow_batch_with_previous(batch_so_far, &partial_batch) + } else { + partial_batch + }; + match tx.send(hollow_batch).await { Ok(_) => { // metrics.compaction.incremental_batch_sent.inc(); } @@ -1010,11 +1047,6 @@ where }; } } - - // part - // upper bound kvt - // find the last part where the lower bound < recorded upper bound - batch.flush_part(desc.clone(), updates).await; } let mut batch = batch.finish(desc.clone()).await?; @@ -1036,8 +1068,14 @@ where .await; } + let hollow_batch = if let Some(batch_so_far) = batch_so_far.as_ref() { + Self::combine_hollow_batch_with_previous(batch_so_far, &batch.into_hollow_batch()) + } else { + batch.into_hollow_batch() + }; + timings.record(&metrics); - Ok(batch.into_hollow_batch()) + Ok(hollow_batch) } fn validate_req(req: &CompactReq) -> Result<(), anyhow::Error> { @@ -1163,6 +1201,7 @@ mod tests { Arc::new(IsolatedRuntime::default()), req.clone(), schemas.clone(), + &write.machine, ); let res = Compactor::::compact_all(stream, req.clone()) @@ -1243,6 +1282,7 @@ mod tests { Arc::new(IsolatedRuntime::default()), req.clone(), schemas.clone(), + &write.machine, ); let res = diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index f921cae033af0..0f105d998f8fe 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -501,7 +501,7 @@ where .into_iter() .map(|b| Arc::unwrap_or_clone(b.batch)) .collect(), - prev_batch: None, + prev_batch: req.active_compaction, }; compact_reqs.push(req); } @@ -611,13 +611,15 @@ where let metrics = Arc::clone(&self.applier.metrics); //TODO(dov): new metric - let _ = self.apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| { - let ret = state.apply_compaction_progress(batch_so_far, current_ts); - if let Continue(_) = ret { - // metrics.state.compaction_progress_applied.inc(); - } - ret - }).await; + let _ = self + .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| { + let ret = state.apply_compaction_progress(batch_so_far, current_ts); + if let Continue(_) = ret { + // metrics.state.compaction_progress_applied.inc(); + } + ret + }) + .await; } pub async fn merge_res( @@ -2033,6 +2035,7 @@ pub mod datadriven { Arc::clone(&datadriven.client.isolated_runtime), req_clone, SCHEMAS.clone(), + &datadriven.machine, ); let res = Compactor::::compact_all(stream, req.clone()).await?; diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index 955cba3157521..1a237cf91c673 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -16,7 +16,7 @@ use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::ops::ControlFlow::{self, Break, Continue}; -use std::ops::{Deref, DerefMut}; +use std::ops::{Add, Deref, DerefMut}; use std::time::Duration; use arrow::array::{Array, ArrayData, make_array}; @@ -647,10 +647,16 @@ impl RunPart { match self { RunPart::Single(p) => Ok(Box::new(p.clone())), RunPart::Many(r) => { - let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; - let last_part = fetched.parts.last().ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; + let fetched = r + .get(shard_id, blob, metrics) + .await + .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; + let last_part = fetched + .parts + .last() + .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?; Ok(Box::pin(last_part.last_part(shard_id, blob, metrics)).await?) - }, + } } } } @@ -1687,7 +1693,7 @@ where req.id, ActiveCompaction { start_ms: heartbeat_timestamp_ms, - batch_so_far: None + batch_so_far: None, }, ) } @@ -1727,10 +1733,11 @@ where let new_active_compaction = ActiveCompaction { start_ms: new_ts, - batch_so_far: Some(batch_so_far.clone()) + batch_so_far: Some(batch_so_far.clone()), }; - self.trace.apply_incremental_compaction(&new_active_compaction); + self.trace + .apply_incremental_compaction(&new_active_compaction); Continue(()) } diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index 4b0fce3af1105..d58f0cf217a02 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -72,6 +72,7 @@ pub struct FueledMergeReq { pub id: SpineId, pub desc: Description, pub inputs: Vec>, + pub active_compaction: Option>, } #[derive(Debug)] @@ -610,10 +611,23 @@ impl Trace { .as_ref() .map_or(true, move |c| c.start_ms <= threshold_ms) }) - .map(|b| FueledMergeReq { - id: b.id, - desc: b.desc.clone(), - inputs: b.parts.clone(), + .map(|b| { + let inputs = b + .active_compaction + .as_ref() + .and_then(|ac| ac.batch_so_far.as_ref()) // If Some, chain to Option<&BatchSoFarType> + .map_or_else(|| b.parts.clone(), |bsf| b.filtered_batches(&bsf.desc)); + let active_compaction_hollow_batch = b + .active_compaction + .as_ref() + .and_then(|ac| ac.batch_so_far.clone()) + .map_or_else(|| None, |bsf| Some(bsf)); + FueledMergeReq { + id: b.id, + desc: b.desc.clone(), + inputs, + active_compaction: active_compaction_hollow_batch, + } }) } @@ -701,7 +715,7 @@ pub struct IdHollowBatch { #[derive(Debug, Clone, Eq, PartialEq, Serialize)] pub struct ActiveCompaction { pub start_ms: u64, - pub batch_so_far: Option> + pub batch_so_far: Option>, } #[derive(Debug, Clone, PartialEq)] @@ -838,11 +852,14 @@ impl SpineBatch { }) } - fn maybe_update_active_compaction(&mut self, active_compaction: &ActiveCompaction) -> ApplyMergeResult { - let batch = match active_compaction.batch_so_far.as_ref() { - Some(batch) => batch, - None => return ApplyMergeResult::NotAppliedNoMatch, - }; + fn maybe_update_active_compaction( + &mut self, + active_compaction: &ActiveCompaction, + ) -> ApplyMergeResult { + let batch = match active_compaction.batch_so_far.as_ref() { + Some(batch) => batch, + None => return ApplyMergeResult::NotAppliedNoMatch, + }; // The spine's and merge res's sinces don't need to match (which could occur if Spine // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine @@ -852,56 +869,44 @@ impl SpineBatch { } // If our merge result exactly matches a spine batch, we can swap it in directly - let exact_match = batch.desc.lower() == self.desc().lower() - && batch.desc.upper() == self.desc().upper(); + let exact_match = + batch.desc.lower() == self.desc().lower() && batch.desc.upper() == self.desc().upper(); if exact_match { self.active_compaction = Some(active_compaction.clone()); return ApplyMergeResult::AppliedExact; } - // It is possible the structure of the spine has changed since the merge res - // was created, such that it no longer exactly matches the description of a - // spine batch. This can happen if another merge has happened in the interim, - // or if spine needed to be rebuilt from state. - // - // When this occurs, we can still attempt to slot the merge res in to replace - // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts - // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res. let SpineBatch { - id, - parts, + id: _, + parts: _, desc, active_compaction: _, len: _, } = self; - // first, determine if a subset of parts can be cleanly replaced by the merge res - let mut lower = None; - let mut upper = None; - for (i, batch) in parts.iter().enumerate() { - if batch.batch.desc.lower() == desc.lower() { - lower = Some((i, batch.id.0)); - } - if batch.batch.desc.upper() == desc.upper() { - upper = Some((i, batch.id.1)); - } - if lower.is_some() && upper.is_some() { - break; - } - } - match (lower, upper) { - (Some((_lower, _id_lower)), Some((_upper, _id_upper))) => { - // This is a subset of the batch, so we can update the active compaction. - // Normally we would want to actually slot the merge res in, but we aren't - // focusing on that here. - self.active_compaction = Some(active_compaction.clone()); - ApplyMergeResult::AppliedSubset - } - _ => ApplyMergeResult::NotAppliedNoMatch, + if PartialOrder::less_equal(desc.lower(), batch.desc.lower()) + && PartialOrder::less_equal(batch.desc.upper(), desc.upper()) + { + self.active_compaction = Some(active_compaction.clone()); + ApplyMergeResult::AppliedSubset + } else { + ApplyMergeResult::NotAppliedNoMatch } } + fn filtered_batches(&self, desc: &Description) -> Vec> { + self.parts + .iter() + .filter(|b| { + let part_desc = &b.batch.desc; + PartialOrder::less_equal(part_desc.lower(), desc.lower()) + && PartialOrder::less_equal(desc.upper(), part_desc.upper()) + }) + .cloned() + .collect() + } + // TODO: Roundtrip the SpineId through FueledMergeReq/FueledMergeRes? fn maybe_replace(&mut self, res: &FueledMergeRes) -> ApplyMergeResult { // The spine's and merge res's sinces don't need to match (which could occur if Spine @@ -1107,6 +1112,7 @@ impl FuelingMerge { id, desc: desc.clone(), inputs: merged_parts.clone(), + active_compaction: None, }); } @@ -1950,7 +1956,13 @@ pub(crate) mod tests { .fueled_merge_reqs_before_ms(timeout_ms, None) .collect(); for req in reqs { - trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0, batch_so_far: None }) + trace.claim_compaction( + req.id, + ActiveCompaction { + start_ms: 0, + batch_so_far: None, + }, + ) } trace.roundtrip_structure = roundtrip_structure; trace @@ -2020,6 +2032,7 @@ pub(crate) mod tests { Antichain::new(), ), inputs: vec![], + active_compaction: None, } } @@ -2089,6 +2102,7 @@ pub(crate) mod tests { Antichain::from_elem(5), ), inputs: vec![], + active_compaction: None, }; assert_eq!( Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),