Skip to content

Commit 028063b

Browse files
committed
Plumb incremental compaction all the way through
1 parent 626cc6a commit 028063b

File tree

6 files changed

+170
-102
lines changed

6 files changed

+170
-102
lines changed

src/persist-client/src/batch.rs

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,21 @@ use std::mem;
1717
use std::sync::Arc;
1818
use std::time::Instant;
1919

20+
use crate::async_runtime::IsolatedRuntime;
21+
use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
22+
use crate::error::InvalidUsage;
23+
use crate::internal::compact::{CompactConfig, Compactor};
24+
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
25+
use crate::internal::machine::{Machine, retry_external};
26+
use crate::internal::merge::{MergeTree, Pending};
27+
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
28+
use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
29+
use crate::internal::state::{
30+
BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart,
31+
RunMeta, RunOrder, RunPart,
32+
};
33+
use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
34+
use crate::{PersistConfig, ShardId};
2035
use arrow::array::{Array, Int64Array};
2136
use bytes::Bytes;
2237
use differential_dataflow::difference::Semigroup;
@@ -46,21 +61,6 @@ use timely::PartialOrder;
4661
use timely::order::TotalOrder;
4762
use timely::progress::{Antichain, Timestamp};
4863
use tracing::{Instrument, debug_span, trace_span, warn};
49-
use crate::async_runtime::IsolatedRuntime;
50-
use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
51-
use crate::error::InvalidUsage;
52-
use crate::internal::compact::{CompactConfig, Compactor};
53-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
54-
use crate::internal::machine::retry_external;
55-
use crate::internal::merge::{MergeTree, Pending};
56-
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
57-
use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
58-
use crate::internal::state::{
59-
BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart,
60-
RunMeta, RunOrder, RunPart,
61-
};
62-
use crate::stats::{STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED, untrimmable_columns};
63-
use crate::{PersistConfig, ShardId};
6464

6565
include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
6666

@@ -681,7 +681,9 @@ where
681681
) -> Option<HollowBatch<T>> {
682682
let runs = self.parts.finish_completed_runs();
683683

684-
if runs.is_empty() { return None; }
684+
if runs.is_empty() {
685+
return None;
686+
}
685687

686688
let mut run_parts = vec![];
687689
let mut run_splits = vec![];
@@ -701,8 +703,6 @@ where
701703
});
702704
run_parts.extend(parts);
703705
}
704-
//TODO(dov): should we fetch the last part and constrain the upper bound
705-
// to whatever we have actually flushed?
706706
let desc = registered_desc;
707707

708708
let batch = HollowBatch::new(desc, run_parts, self.num_updates, run_meta, run_splits);
@@ -877,8 +877,8 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
877877
shard_metrics,
878878
isolated_runtime,
879879
write_schemas,
880+
&None,
880881
None,
881-
None
882882
)
883883
.await
884884
.expect("successful compaction");
@@ -977,7 +977,6 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
977977
}
978978
}
979979

980-
981980
pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
982981
&mut self,
983982
write_schemas: &Schemas<K, V>,
@@ -1268,18 +1267,22 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
12681267

12691268
pub(crate) fn finish_completed_runs(&self) -> Vec<(RunOrder, Vec<RunPart<T>>)> {
12701269
match &self.writing_runs {
1271-
WritingRuns::Ordered(order, tree) => tree.iter()
1272-
.take_while(|part| matches!(part, Pending::Finished(_))).map(|part| match part {
1273-
Pending::Finished(p) => (order.clone(), vec![p.clone()]),
1274-
_ => (order.clone(), vec![]),
1275-
}).collect(),
1276-
WritingRuns::Compacting(tree) => tree.iter()
1270+
WritingRuns::Ordered(order, tree) => tree
1271+
.iter()
1272+
.take_while(|part| matches!(part, Pending::Finished(_)))
1273+
.map(|part| match part {
1274+
Pending::Finished(p) => (order.clone(), vec![p.clone()]),
1275+
_ => (order.clone(), vec![]),
1276+
})
1277+
.collect(),
1278+
WritingRuns::Compacting(tree) => tree
1279+
.iter()
12771280
.take_while(|(_, run)| matches!(run, Pending::Finished(_)))
12781281
.map(|(order, run)| match run {
12791282
Pending::Finished(parts) => (order.clone(), parts.clone()),
12801283
_ => (order.clone(), vec![]),
12811284
})
1282-
.collect()
1285+
.collect(),
12831286
}
12841287
}
12851288

src/persist-client/src/cli/admin.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,7 @@ where
500500
Arc::new(IsolatedRuntime::default()),
501501
req.clone(),
502502
schemas,
503+
&machine,
503504
);
504505
pin_mut!(stream);
505506

src/persist-client/src/internal/compact.rs

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ where
397397
Arc::clone(&machine_clone.isolated_runtime),
398398
req.clone(),
399399
compaction_schema,
400+
&machine_clone,
400401
);
401402

402403
let maintenance =
@@ -570,6 +571,7 @@ where
570571
isolated_runtime: Arc<IsolatedRuntime>,
571572
req: CompactReq<T>,
572573
write_schemas: Schemas<K, V>,
574+
machine: &Machine<K, V, T, D>,
573575
) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
574576
async_stream::stream! {
575577
let _ = Self::validate_req(&req)?;
@@ -605,9 +607,6 @@ where
605607
let run_reserved_memory_bytes =
606608
cfg.compaction_memory_bound_bytes - in_progress_part_reserved_memory_bytes;
607609

608-
//if there is an active compaction
609-
//We should call compact_runs with the same
610-
611610
// Flatten the input batches into a single list of runs
612611
let ordered_runs =
613612
Self::order_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?;
@@ -616,6 +615,8 @@ where
616615
let chunked_runs =
617616
Self::chunk_runs(&ordered_runs, &cfg, &*metrics, run_reserved_memory_bytes);
618617

618+
let (incremental_tx, mut incremental_rx) = mpsc::channel(0);
619+
619620
let total_chunked_runs = chunked_runs.len();
620621
let mut applied = 0;
621622
for (runs, run_chunk_max_memory_usage) in chunked_runs {
@@ -643,11 +644,16 @@ where
643644
Arc::clone(&shard_metrics),
644645
Arc::clone(&isolated_runtime),
645646
write_schemas.clone(),
646-
None,
647-
None
647+
&req.prev_batch,
648+
Some(incremental_tx.clone())
648649
)
649650
.await?;
650651

652+
while let Some(res) = incremental_rx.recv().await {
653+
let now = SYSTEM_TIME.clone();
654+
machine.checkpoint_compaction_progress(&res, now()).await;
655+
}
656+
651657
let (parts, run_splits, run_meta, updates) =
652658
(batch.parts, batch.run_splits, batch.run_meta, batch.len);
653659

@@ -690,6 +696,7 @@ where
690696

691697
applied += 1;
692698
}
699+
drop(incremental_tx);
693700
}
694701
}
695702

@@ -827,6 +834,28 @@ where
827834
Ok(ordered_runs)
828835
}
829836

837+
fn combine_hollow_batch_with_previous(
838+
previous_batch: &HollowBatch<T>,
839+
batch: &HollowBatch<T>,
840+
) -> HollowBatch<T> {
841+
// Simplifying assumption: you can't combine batches with different descriptions.
842+
assert_eq!(previous_batch.desc, batch.desc);
843+
let len = previous_batch.len + batch.len;
844+
let mut parts = Vec::with_capacity(previous_batch.parts.len() + batch.parts.len());
845+
parts.extend(previous_batch.parts.clone());
846+
parts.extend(batch.parts.clone());
847+
assert!(previous_batch.run_splits.is_empty());
848+
assert!(batch.run_splits.is_empty());
849+
assert_eq!(previous_batch.run_meta, batch.run_meta);
850+
HollowBatch::new(
851+
previous_batch.desc.clone(),
852+
parts,
853+
len,
854+
previous_batch.run_meta.clone(),
855+
previous_batch.run_splits.clone(),
856+
)
857+
}
858+
830859
/// Compacts runs together. If the input runs are sorted, a single run will be created as output.
831860
///
832861
/// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]`
@@ -840,7 +869,7 @@ where
840869
shard_metrics: Arc<ShardMetrics>,
841870
isolated_runtime: Arc<IsolatedRuntime>,
842871
write_schemas: Schemas<K, V>,
843-
batch_so_far: Option<HollowBatch<T>>,
872+
batch_so_far: &Option<HollowBatch<T>>,
844873
incremental_tx: Option<Sender<HollowBatch<T>>>,
845874
) -> Result<HollowBatch<T>, anyhow::Error> {
846875
// TODO: Figure out a more principled way to allocate our memory budget.
@@ -864,7 +893,7 @@ where
864893
// the config allows BatchBuilder to do its normal pipelining of writes.
865894
batch_cfg.inline_writes_single_max_bytes = 0;
866895

867-
if let Some(batch_so_far) = batch_so_far {
896+
if let Some(batch_so_far) = batch_so_far.as_ref() {
868897
let last_part = batch_so_far
869898
.last_part(shard_id.clone(), &*blob, &metrics)
870899
.await;
@@ -994,12 +1023,20 @@ where
9941023
break;
9951024
};
9961025

1026+
batch.flush_part(desc.clone(), updates).await;
1027+
9971028
if let Some(tx) = incremental_tx.as_ref() {
9981029
// This is where we record whatever parts were successfully flushed
9991030
// to blob. That way we can resume an interrupted compaction later.
10001031
let partial_batch = batch.batch_with_finished_parts(desc.clone());
1032+
10011033
if let Some(partial_batch) = partial_batch {
1002-
match tx.send(partial_batch).await {
1034+
let hollow_batch = if let Some(batch_so_far) = batch_so_far.as_ref() {
1035+
Self::combine_hollow_batch_with_previous(batch_so_far, &partial_batch)
1036+
} else {
1037+
partial_batch
1038+
};
1039+
match tx.send(hollow_batch).await {
10031040
Ok(_) => {
10041041
// metrics.compaction.incremental_batch_sent.inc();
10051042
}
@@ -1010,11 +1047,6 @@ where
10101047
};
10111048
}
10121049
}
1013-
1014-
// part
1015-
// upper bound kvt
1016-
// find the last part where the lower bound < recorded upper bound
1017-
batch.flush_part(desc.clone(), updates).await;
10181050
}
10191051
let mut batch = batch.finish(desc.clone()).await?;
10201052

@@ -1036,8 +1068,14 @@ where
10361068
.await;
10371069
}
10381070

1071+
let hollow_batch = if let Some(batch_so_far) = batch_so_far.as_ref() {
1072+
Self::combine_hollow_batch_with_previous(batch_so_far, &batch.into_hollow_batch())
1073+
} else {
1074+
batch.into_hollow_batch()
1075+
};
1076+
10391077
timings.record(&metrics);
1040-
Ok(batch.into_hollow_batch())
1078+
Ok(hollow_batch)
10411079
}
10421080

10431081
fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
@@ -1163,6 +1201,7 @@ mod tests {
11631201
Arc::new(IsolatedRuntime::default()),
11641202
req.clone(),
11651203
schemas.clone(),
1204+
&write.machine,
11661205
);
11671206

11681207
let res = Compactor::<String, String, u64, i64>::compact_all(stream, req.clone())
@@ -1243,6 +1282,7 @@ mod tests {
12431282
Arc::new(IsolatedRuntime::default()),
12441283
req.clone(),
12451284
schemas.clone(),
1285+
&write.machine,
12461286
);
12471287

12481288
let res =

src/persist-client/src/internal/machine.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ where
501501
.into_iter()
502502
.map(|b| Arc::unwrap_or_clone(b.batch))
503503
.collect(),
504-
prev_batch: None,
504+
prev_batch: req.active_compaction,
505505
};
506506
compact_reqs.push(req);
507507
}
@@ -611,13 +611,15 @@ where
611611
let metrics = Arc::clone(&self.applier.metrics);
612612

613613
//TODO(dov): new metric
614-
let _ = self.apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
615-
let ret = state.apply_compaction_progress(batch_so_far, current_ts);
616-
if let Continue(_) = ret {
617-
// metrics.state.compaction_progress_applied.inc();
618-
}
619-
ret
620-
}).await;
614+
let _ = self
615+
.apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
616+
let ret = state.apply_compaction_progress(batch_so_far, current_ts);
617+
if let Continue(_) = ret {
618+
// metrics.state.compaction_progress_applied.inc();
619+
}
620+
ret
621+
})
622+
.await;
621623
}
622624

623625
pub async fn merge_res(
@@ -2033,6 +2035,7 @@ pub mod datadriven {
20332035
Arc::clone(&datadriven.client.isolated_runtime),
20342036
req_clone,
20352037
SCHEMAS.clone(),
2038+
&datadriven.machine,
20362039
);
20372040

20382041
let res = Compactor::<String, (), u64, i64>::compact_all(stream, req.clone()).await?;

src/persist-client/src/internal/state.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::collections::BTreeMap;
1616
use std::fmt::{Debug, Formatter};
1717
use std::marker::PhantomData;
1818
use std::ops::ControlFlow::{self, Break, Continue};
19-
use std::ops::{Deref, DerefMut};
19+
use std::ops::{Add, Deref, DerefMut};
2020
use std::time::Duration;
2121

2222
use arrow::array::{Array, ArrayData, make_array};
@@ -647,10 +647,16 @@ impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
647647
match self {
648648
RunPart::Single(p) => Ok(Box::new(p.clone())),
649649
RunPart::Many(r) => {
650-
let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
651-
let last_part = fetched.parts.last().ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
650+
let fetched = r
651+
.get(shard_id, blob, metrics)
652+
.await
653+
.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
654+
let last_part = fetched
655+
.parts
656+
.last()
657+
.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
652658
Ok(Box::pin(last_part.last_part(shard_id, blob, metrics)).await?)
653-
},
659+
}
654660
}
655661
}
656662
}
@@ -1687,7 +1693,7 @@ where
16871693
req.id,
16881694
ActiveCompaction {
16891695
start_ms: heartbeat_timestamp_ms,
1690-
batch_so_far: None
1696+
batch_so_far: None,
16911697
},
16921698
)
16931699
}
@@ -1727,10 +1733,11 @@ where
17271733

17281734
let new_active_compaction = ActiveCompaction {
17291735
start_ms: new_ts,
1730-
batch_so_far: Some(batch_so_far.clone())
1736+
batch_so_far: Some(batch_so_far.clone()),
17311737
};
17321738

1733-
self.trace.apply_incremental_compaction(&new_active_compaction);
1739+
self.trace
1740+
.apply_incremental_compaction(&new_active_compaction);
17341741

17351742
Continue(())
17361743
}

0 commit comments

Comments
 (0)