diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index e81a462dd78e4..3d43204c64dc0 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -61,12 +61,17 @@ use crate::internal::trace::{ use crate::read::{LeasedReaderId, READER_LEASE_DURATION}; use crate::{PersistConfig, ShardId, WriterId, cfg}; +/// A key and value `Schema` of data written to a batch or shard. #[derive(Debug)] pub struct Schemas { // TODO: Remove the Option once this finishes rolling out and all shards // have a registered schema. + /// Id under which this schema is registered in the shard's schema registry, + /// if any. pub id: Option, + /// Key `Schema`. pub key: Arc, + /// Value `Schema`. pub val: Arc, } diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs index dd47b77c476ec..3912776225905 100644 --- a/src/persist-client/src/internal/state.rs +++ b/src/persist-client/src/internal/state.rs @@ -869,7 +869,7 @@ impl Ord for HollowBatch { } impl HollowBatch { - pub fn part_stream<'a>( + pub(crate) fn part_stream<'a>( &'a self, shard_id: ShardId, blob: &'a dyn Blob, @@ -973,11 +973,11 @@ impl HollowBatch { self.parts.iter().map(|x| x.inline_bytes()).sum() } - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.parts.is_empty() } - pub fn part_count(&self) -> usize { + pub(crate) fn part_count(&self) -> usize { self.parts.len() } diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 9f27abe4cce23..aaeb098b585ed 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -21,27 +21,39 @@ use std::sync::Arc; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; +use differential_dataflow::trace::Description; use mz_build_info::{BuildInfo, build_info}; use mz_dyncfg::ConfigSet; use mz_ore::{instrument, soft_assert_or_log}; use mz_persist::location::{Blob, Consensus, ExternalError}; use mz_persist_types::schema::SchemaId; use mz_persist_types::{Codec, Codec64, Opaque}; -use timely::progress::Timestamp; +use mz_proto::{IntoRustIfSome, ProtoType}; +use semver::Version; +use timely::progress::{Antichain, Timestamp}; use crate::async_runtime::IsolatedRuntime; +use crate::batch::{ + BATCH_DELETE_ENABLED, BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, + BatchBuilderInternal, BatchParts, ProtoBatch, +}; use crate::cache::{PersistClientCache, StateCache}; -use crate::cfg::PersistConfig; +use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, PersistConfig}; use crate::critical::{CriticalReaderId, SinceHandle}; use crate::error::InvalidUsage; -use crate::fetch::{BatchFetcher, BatchFetcherConfig}; -use crate::internal::compact::Compactor; -use crate::internal::encoding::{Schemas, parse_id}; +use crate::fetch::{BatchFetcher, BatchFetcherConfig, FetchBatchFilter, Lease}; +use crate::internal::compact::{CompactConfig, Compactor}; +use crate::internal::encoding::parse_id; use crate::internal::gc::GarbageCollector; use crate::internal::machine::{Machine, retry_external}; +use crate::internal::state::RunOrder; use crate::internal::state_versions::StateVersions; +use crate::iter::{Consolidator, StructuredSort}; use crate::metrics::Metrics; -use crate::read::{LeasedReaderId, READER_LEASE_DURATION, ReadHandle}; +use crate::read::{ + Cursor, CursorConsolidator, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, + Since, +}; use crate::rpc::PubSubSender; use crate::schema::CaESchema; use crate::write::{WriteHandle, WriterId}; @@ -121,6 +133,9 @@ pub const BUILD_INFO: BuildInfo = build_info!(); // Re-export for convenience. pub use mz_persist_types::{PersistLocation, ShardId}; +pub use crate::internal::encoding::Schemas; +pub use crate::internal::state::HollowBatch; + /// Additional diagnostic information used within Persist /// e.g. for logging, metric labels, etc. #[derive(Clone, Debug)] @@ -539,6 +554,188 @@ impl PersistClient { Ok(writer) } + /// Returns a [BatchBuilder] that can be used to write a batch of updates to + /// blob storage which can then be appended to the given shard using + /// [WriteHandle::compare_and_append_batch] or [WriteHandle::append_batch], + /// or which can be read using [PersistClient::read_batches_consolidated]. + /// + /// The builder uses a bounded amount of memory, even when the number of + /// updates is very large. Individual records, however, should be small + /// enough that we can reasonably chunk them up: O(KB) is definitely fine, + /// O(MB) come talk to us. + #[instrument(level = "debug", fields(shard = %shard_id))] + pub async fn batch_builder( + &self, + shard_id: ShardId, + write_schemas: Schemas, + lower: Antichain, + ) -> Result, InvalidUsage> + where + K: Debug + Codec, + V: Debug + Codec, + T: Timestamp + Lattice + Codec64 + Sync, + D: Semigroup + Ord + Codec64 + Send + Sync, + { + let cfg = CompactConfig::new(&self.cfg, shard_id); + // WIP: Pass this in as an argument? + let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash"); + + let parts = if let Some(max_runs) = cfg.batch.max_runs { + BatchParts::new_compacting::( + cfg, + Description::new( + lower.clone(), + Antichain::new(), + Antichain::from_elem(T::minimum()), + ), + max_runs, + Arc::clone(&self.metrics), + shard_metrics, + shard_id, + Arc::clone(&self.blob), + Arc::clone(&self.isolated_runtime), + &self.metrics.user, + write_schemas.clone(), + ) + } else { + BatchParts::new_ordered( + cfg.batch, + RunOrder::Unordered, + Arc::clone(&self.metrics), + shard_metrics, + shard_id, + Arc::clone(&self.blob), + Arc::clone(&self.isolated_runtime), + &self.metrics.user, + ) + }; + let builder = BatchBuilderInternal::new( + BatchBuilderConfig::new(&self.cfg, shard_id), + parts, + Arc::clone(&self.metrics), + write_schemas.clone(), + Arc::clone(&self.blob), + shard_id, + self.cfg.build_version.clone(), + ); + Ok(BatchBuilder::new( + builder, + Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())), + )) + } + + /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used + /// to append it to the given shard or to read it via + /// [PersistClient::read_batches_consolidated] + pub fn batch_from_transmittable_batch( + &self, + shard_id: &ShardId, + batch: ProtoBatch, + ) -> Batch + where + K: Debug + Codec, + V: Debug + Codec, + T: Timestamp + Lattice + Codec64 + Sync, + D: Semigroup + Ord + Codec64 + Send + Sync, + { + let batch_shard_id: ShardId = batch + .shard_id + .into_rust() + .expect("valid transmittable batch"); + assert_eq!(&batch_shard_id, shard_id); + + let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash"); + + let ret = Batch { + batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg), + metrics: Arc::clone(&self.metrics), + shard_metrics, + version: Version::parse(&batch.version).expect("valid transmittable batch"), + batch: batch + .batch + .into_rust_if_some("ProtoBatch::batch") + .expect("valid transmittable batch"), + blob: Arc::clone(&self.blob), + _phantom: std::marker::PhantomData, + }; + + assert_eq!(&ret.shard_id(), shard_id); + ret + } + + /// Returns a [Cursor] for reading the given batches. Yielded updates are + /// consolidated if the given batches contain sorted runs, which is true + /// when they have been written using a [BatchBuilder]. + /// + /// To keep memory usage down when reading a snapshot that consolidates + /// well, this consolidates as it goes. However, note that only the + /// serialized data is consolidated: the deserialized data will only be + /// consolidated if your K/V codecs are one-to-one. + // WIP: Do we want to let callers inject sth like MFP here? + // WIP: This doesn't need async right now, but still might want it in the + // API to have the option in the future? + #[allow(clippy::unused_async)] + pub async fn read_batches_consolidated( + &mut self, + shard_id: ShardId, + as_of: Antichain, + read_schemas: Schemas, + batches: &[Batch], + should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool, + ) -> Result, Since> + where + K: Debug + Codec, + V: Debug + Codec, + T: Timestamp + Lattice + Codec64 + Sync, + D: Semigroup + Ord + Codec64 + Send + Sync, + { + let context = format!("{}[as_of={:?}]", shard_id, as_of.elements()); + let filter = FetchBatchFilter::Snapshot { + as_of: as_of.clone(), + }; + + let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash"); + + let consolidator = { + let mut consolidator = Consolidator::new( + context, + shard_id, + StructuredSort::new(read_schemas.clone()), + Arc::clone(&self.blob), + Arc::clone(&self.metrics), + Arc::clone(&shard_metrics), + self.metrics.read.snapshot.clone(), + filter, + COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg), + ); + for batch in batches { + for (meta, run) in batch.batch.runs() { + consolidator.enqueue_run( + &batch.batch.desc, + meta, + run.into_iter() + .filter(|p| should_fetch_part(p.stats())) + .cloned(), + ); + } + } + CursorConsolidator::Structured { + consolidator, + // This default may end up consolidating more records than previously + // for cases like fast-path peeks, where only the first few entries are used. + // If this is a noticeable performance impact, thread the max-len in from the caller. + max_len: self.cfg.compaction_yield_after_n_updates, + max_bytes: BLOB_TARGET_SIZE.get(&self.cfg).max(1), + } + }; + + Ok(Cursor { + consolidator, + _lease: Lease::default(), + read_schemas, + }) + } + /// Returns the requested schema, if known at the current state. pub async fn get_schema( &self, diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 03cebe1e64a76..1a34ffe1da59b 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -865,13 +865,13 @@ pub(crate) struct UnexpiredReadHandleState { /// but it's also free to abandon the instance at any time if it eg. only needs a few entries. #[derive(Debug)] pub struct Cursor { - consolidator: CursorConsolidator, - _lease: Lease, - read_schemas: Schemas, + pub(crate) consolidator: CursorConsolidator, + pub(crate) _lease: Lease, + pub(crate) read_schemas: Schemas, } #[derive(Debug)] -enum CursorConsolidator { +pub(crate) enum CursorConsolidator { Structured { consolidator: Consolidator>, max_len: usize,