From d8cc5e76698a90102b332f820c6121567f15ac0f Mon Sep 17 00:00:00 2001 From: kould Date: Mon, 15 Jun 2026 03:54:48 +0800 Subject: [PATCH] perf: stream analyze statistics with KLL sketch --- .github/workflows/ci.yml | 91 ++-- Cargo.lock | 2 +- Cargo.toml | 2 +- src/execution/ddl/create_index.rs | 2 +- src/execution/dml/analyze.rs | 4 +- src/execution/dml/delete.rs | 2 +- src/execution/dml/insert.rs | 2 +- src/execution/dml/update.rs | 4 +- src/execution/dql/sort.rs | 5 - src/execution/mod.rs | 22 +- src/optimizer/core/cm_sketch.rs | 65 ++- src/optimizer/core/histogram.rs | 472 +++++++++--------- src/optimizer/core/kll_sketch.rs | 451 +++++++++++++++++ src/optimizer/core/mod.rs | 1 + src/optimizer/core/statistics_meta.rs | 41 +- .../rule/implementation/dql/table_scan.rs | 41 +- .../rule/normalization/agg_elimination.rs | 28 +- src/storage/table_codec.rs | 6 +- src/types/index.rs | 23 +- src/types/value.rs | 65 +++ 20 files changed, 969 insertions(+), 360 deletions(-) create mode 100644 src/optimizer/core/kll_sketch.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2dd8cece..e9044108 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,67 +15,71 @@ jobs: # 1 check: name: Rust project check - runs-on: [self-hosted, Linux, X64] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Use preinstalled tools - run: | - echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - echo "$HOME/actions-runner/externals/node20/bin" >> "$GITHUB_PATH" + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 - name: Run cargo test run: make test # 2 fmt: name: Rust fmt - runs-on: [self-hosted, Linux, X64] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Use preinstalled tools - run: | - echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - echo "$HOME/actions-runner/externals/node20/bin" >> "$GITHUB_PATH" + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + - uses: Swatinem/rust-cache@v2 - name: Run cargo fmt run: make fmt # 3 e2e: name: Rust e2e sqllogictest - runs-on: [self-hosted, Linux, X64] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - name: Use preinstalled tools - run: | - echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - echo "$HOME/actions-runner/externals/node20/bin" >> "$GITHUB_PATH" + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 - name: Run sqllogictest suite run: make test-slt # 4 wasm-tests: name: Wasm cargo tests - runs-on: [self-hosted, Linux, X64] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-unknown-unknown + - uses: actions/setup-node@v4 + with: + node-version: 20 + - uses: Swatinem/rust-cache@v2 - - name: Use preinstalled tools - run: | - echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - echo "$HOME/actions-runner/externals/node20/bin" >> "$GITHUB_PATH" + - uses: taiki-e/install-action@wasm-pack - name: Run wasm-bindgen tests (wasm32 target) run: make test-wasm # 5 wasm-examples: name: Wasm examples (nodejs) - runs-on: [self-hosted, Linux, X64] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + targets: wasm32-unknown-unknown + - uses: actions/setup-node@v4 + with: + node-version: 20 + - uses: Swatinem/rust-cache@v2 - - name: Use preinstalled tools - run: | - echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - echo "$HOME/actions-runner/externals/node20/bin" >> "$GITHUB_PATH" + - uses: taiki-e/install-action@wasm-pack - name: Build wasm package run: make wasm-build @@ -85,28 +89,25 @@ jobs: # 6 native-examples: name: Native examples - runs-on: [self-hosted, Linux, X64] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - - name: Use preinstalled tools - run: | - echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - echo "$HOME/actions-runner/externals/node20/bin" >> "$GITHUB_PATH" + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 - name: Run native examples run: make native-examples # 7 python-tests: name: Python bindings tests - runs-on: [self-hosted, Linux, X64] + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - - name: Use preinstalled tools - run: | - echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - echo "$HOME/actions-runner/externals/node20/bin" >> "$GITHUB_PATH" + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - uses: Swatinem/rust-cache@v2 - name: Run python binding tests - run: make test-python + run: PYO3_PYTHON="$(which python)" make test-python diff --git a/Cargo.lock b/Cargo.lock index 8a9bb0e5..b2d5bbdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1073,7 +1073,7 @@ dependencies = [ [[package]] name = "kite_sql" -version = "0.3.0" +version = "0.3.1" dependencies = [ "bumpalo", "chrono", diff --git a/Cargo.toml b/Cargo.toml index b854a42f..fb482419 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ [package] name = "kite_sql" -version = "0.3.0" +version = "0.3.1" edition = "2021" build = "build.rs" authors = ["Kould ", "Xwg "] diff --git a/src/execution/ddl/create_index.rs b/src/execution/ddl/create_index.rs index 340d0b13..6e8f34e0 100644 --- a/src/execution/ddl/create_index.rs +++ b/src/execution/ddl/create_index.rs @@ -140,7 +140,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for CreateIndex { with_projection_tmp_value(arena, None, &column_exprs, |arena, value| { let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); - let index = Index::new(index_id, value, ty); + let index = Index::new(index_id, &value, ty); transaction.add_index(table_codec, table_name.as_ref(), index, &tuple_pk) })?; } diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 7dcb4320..cf20cdd0 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -20,7 +20,7 @@ use crate::execution::{ }; use crate::expression::ScalarExpression; use crate::iter_ext::Itertools; -use crate::optimizer::core::histogram::HistogramBuilder; +use crate::optimizer::core::histogram::{HistogramBuilder, ANALYZE_STATISTICS_RELATIVE_ERROR}; use crate::optimizer::core::statistics_meta::StatisticsMeta; use crate::planner::operator::analyze::AnalyzeOperator; use crate::planner::LogicalPlan; @@ -105,7 +105,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Analyze { Ok(State { index_id: index.id, exprs: index.column_exprs(table, plan_arena)?, - builder: HistogramBuilder::new(index, None), + builder: HistogramBuilder::new(index, ANALYZE_STATISTICS_RELATIVE_ERROR)?, histogram_buckets: self.histogram_buckets, }) }) diff --git a/src/execution/dml/delete.rs b/src/execution/dml/delete.rs index 5fd87880..7dd33e99 100644 --- a/src/execution/dml/delete.rs +++ b/src/execution/dml/delete.rs @@ -102,7 +102,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Delete { transaction.del_index( table_codec, &self.table_name, - &Index::new(*index_id, value, *index_ty), + &Index::new(*index_id, &value, *index_ty), tuple_id, ) })?; diff --git a/src/execution/dml/insert.rs b/src/execution/dml/insert.rs index b294cf35..71867b45 100644 --- a/src/execution/dml/insert.rs +++ b/src/execution/dml/insert.rs @@ -170,7 +170,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Insert { with_projection_tmp_value(arena, Some(&tuple), exprs, |arena, value| { let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); - let index = Index::new(index_meta.id, value, index_meta.ty); + let index = Index::new(index_meta.id, &value, index_meta.ty); transaction.add_index(table_codec, &self.table_name, index, tuple_id) })?; } diff --git a/src/execution/dml/update.rs b/src/execution/dml/update.rs index 783f31a6..b0f7bfa5 100644 --- a/src/execution/dml/update.rs +++ b/src/execution/dml/update.rs @@ -124,7 +124,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update { with_projection_tmp_value(arena, Some(&tuple), exprs, |arena, value| { let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); - let index = Index::new(index_meta.id, value, index_meta.ty); + let index = Index::new(index_meta.id, &value, index_meta.ty); transaction.del_index(table_codec, &self.table_name, &index, &old_pk) })?; } @@ -152,7 +152,7 @@ impl<'a, T: Transaction + 'a> ExecutorNode<'a, T> for Update { with_projection_tmp_value(arena, Some(&tuple), exprs, |arena, value| { let mut state = arena.local_state(plan_arena); let (transaction, table_codec) = state.transaction_codec_mut(); - let index = Index::new(index_meta.id, value, index_meta.ty); + let index = Index::new(index_meta.id, &value, index_meta.ty); transaction.add_index(table_codec, &self.table_name, index, new_pk) })?; } diff --git a/src/execution/dql/sort.rs b/src/execution/dql/sort.rs index 9b032b36..c3bddc94 100644 --- a/src/execution/dql/sort.rs +++ b/src/execution/dql/sort.rs @@ -65,11 +65,6 @@ impl<'a, T> NullableVec<'a, T> { unsafe { self.0[offset].assume_init_read() } } - #[inline] - pub(crate) fn get(&self, offset: usize) -> &T { - unsafe { self.0[offset].assume_init_ref() } - } - #[inline] pub(crate) fn len(&self) -> usize { self.0.len() diff --git a/src/execution/mod.rs b/src/execution/mod.rs index db16ec13..f2948c79 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -413,7 +413,7 @@ pub(crate) fn with_projection_tmp_value<'a, T: Transaction + 'a>( arena: &mut ExecArena<'a, T>, tuple: Option<&dyn TupleLike>, exprs: &[ScalarExpression], - f: impl FnOnce(&mut ExecArena<'a, T>, &DataValue) -> Result<(), DatabaseError>, + f: impl FnOnce(&mut ExecArena<'a, T>, DataValue) -> Result<(), DatabaseError>, ) -> Result<(), DatabaseError> { arena.with_projection_tmp(|arena, projection_tmp| { { @@ -424,16 +424,16 @@ pub(crate) fn with_projection_tmp_value<'a, T: Transaction + 'a>( } } - if projection_tmp.len() > 1 { - let value = DataValue::Tuple(std::mem::take(projection_tmp), false); - let ret = f(arena, &value); - let DataValue::Tuple(values, _) = value else { - unreachable!() - }; - *projection_tmp = values; - ret?; - } else if let Some(value) = projection_tmp.first() { - f(arena, value)?; + match projection_tmp.len() { + 0 => {} + 1 => { + let value = projection_tmp.pop().expect("projection has one value"); + f(arena, value)?; + } + _ => { + let value = DataValue::Tuple(std::mem::take(projection_tmp), false); + f(arena, value)?; + } } Ok(()) }) diff --git a/src/optimizer/core/cm_sketch.rs b/src/optimizer/core/cm_sketch.rs index 122daaab..582c4e06 100644 --- a/src/optimizer/core/cm_sketch.rs +++ b/src/optimizer/core/cm_sketch.rs @@ -20,10 +20,10 @@ use crate::storage::Transaction; use crate::types::value::DataValue; use kite_sql_serde_macros::ReferenceSerialization; use std::borrow::Borrow; +use std::cmp; use std::hash::{Hash, Hasher}; use std::io::{Read, Write}; use std::marker::PhantomData; -use std::{cmp, mem}; pub(crate) const COUNT_MIN_SKETCH_STORAGE_PAGE_LEN: usize = 16 * 1024; @@ -146,7 +146,6 @@ impl CountMinSketch { "count-min sketch width must be a power of two".to_string(), )); } - let mut counters = vec![Vec::with_capacity(width); k_num]; let mut expected_page_idx = vec![0usize; k_num]; @@ -213,20 +212,34 @@ impl CountMinSketch { } impl CountMinSketch { - pub fn new(capacity: usize, probability: f64, tolerance: f64) -> Self { - let width = Self::optimal_width(capacity, tolerance); + pub fn with_relative_error( + probability: f64, + relative_error: f64, + ) -> Result { + if !(0.0..1.0).contains(&probability) { + return Err(DatabaseError::InvalidValue(format!( + "count-min sketch probability must be between 0 and 1, got {probability}" + ))); + } + if relative_error <= 0.0 || relative_error.is_nan() { + return Err(DatabaseError::InvalidValue(format!( + "count-min sketch relative error must be greater than zero, got {relative_error}" + ))); + } + + let width = Self::optimal_relative_width(relative_error)?; let k_num = Self::optimal_k_num(probability); let counters = vec![vec![0; width]; k_num]; let offsets = vec![0; k_num]; let hashers = Self::new_hashers(); - CountMinSketch { + Ok(CountMinSketch { counters, offsets, hashers, mask: Self::mask(width), k_num, phantom_k: PhantomData, - } + }) } pub fn add(&mut self, key: &Q, value: usize) @@ -271,17 +284,6 @@ impl CountMinSketch { .unwrap() } - #[allow(dead_code)] - pub fn estimate_memory( - capacity: usize, - probability: f64, - tolerance: f64, - ) -> Result { - let width = Self::optimal_width(capacity, tolerance); - let k_num = Self::optimal_k_num(probability); - Ok(width * mem::size_of::() * k_num) - } - #[allow(dead_code)] pub fn clear(&mut self) { for k_i in 0..self.k_num { @@ -292,12 +294,21 @@ impl CountMinSketch { self.hashers = Self::new_hashers(); } - fn optimal_width(capacity: usize, tolerance: f64) -> usize { - let e = tolerance / (capacity as f64); - let width = (2.0 / e).round() as usize; - cmp::max(2, width) + fn optimal_relative_width(relative_error: f64) -> Result { + let width = (2.0 / relative_error).ceil(); + if !width.is_finite() || width > usize::MAX as f64 { + return Err(DatabaseError::InvalidValue(format!( + "count-min sketch relative error is too small: {relative_error}" + ))); + } + + cmp::max(2, width as usize) .checked_next_power_of_two() - .expect("Width would be way too large") + .ok_or_else(|| { + DatabaseError::InvalidValue(format!( + "count-min sketch width overflow for relative error: {relative_error}" + )) + }) } fn mask(width: usize) -> usize { @@ -307,7 +318,7 @@ impl CountMinSketch { } fn optimal_k_num(probability: f64) -> usize { - cmp::max(1, ((1.0 - probability).ln() / 0.5f64.ln()) as usize) + cmp::max(1, ((1.0 - probability).ln() / 0.5f64.ln()).ceil() as usize) } fn new_hashers() -> [StableHasher; 2] { @@ -391,7 +402,7 @@ mod tests { #[test] fn test_increment() { - let mut cms = CountMinSketch::<&str>::new(100, 0.95, 10.0); + let mut cms = CountMinSketch::<&str>::with_relative_error(0.95, 0.01).unwrap(); for _ in 0..300 { cms.increment("key"); } @@ -400,7 +411,7 @@ mod tests { #[test] fn test_increment_multi() { - let mut cms = CountMinSketch::::new(100, 0.99, 2.0); + let mut cms = CountMinSketch::::with_relative_error(0.99, 0.001).unwrap(); for i in 0..1_000_000 { cms.increment(&(i % 100)); } @@ -411,7 +422,7 @@ mod tests { #[test] fn test_collect_count() { - let mut cms = CountMinSketch::::new(100, 0.95, 10.0); + let mut cms = CountMinSketch::::with_relative_error(0.95, 0.01).unwrap(); for _ in 0..300 { cms.increment(&DataValue::Int32(300)); } @@ -429,7 +440,7 @@ mod tests { #[test] fn test_storage_parts_roundtrip() { - let mut cms = CountMinSketch::::new(128, 0.95, 10.0); + let mut cms = CountMinSketch::::with_relative_error(0.95, 0.01).unwrap(); for i in 0..256 { cms.increment(&DataValue::Int32(i % 17)); } diff --git a/src/optimizer/core/histogram.rs b/src/optimizer/core/histogram.rs index 2d89a177..c49be367 100644 --- a/src/optimizer/core/histogram.rs +++ b/src/optimizer/core/histogram.rs @@ -13,34 +13,31 @@ // limitations under the License. use crate::errors::DatabaseError; -use crate::execution::dql::sort::{radix_sort, BumpVec, NullableVec}; use crate::expression::range_detacher::Range; use crate::expression::BinaryOperator; use crate::optimizer::core::cm_sketch::CountMinSketch; -use crate::storage::table_codec::BumpBytes; +use crate::optimizer::core::kll_sketch::KllSketchBuilder; use crate::types::evaluator::{binary_create, BinaryEvaluatorRef}; use crate::types::index::{IndexId, IndexMeta}; use crate::types::value::DataValue; use crate::types::LogicalType; -use bumpalo::Bump; use kite_sql_serde_macros::ReferenceSerialization; use ordered_float::OrderedFloat; use std::borrow::Cow; +use std::cmp; use std::collections::Bound; +use std::mem; use std::sync::OnceLock; -use std::{cmp, mem}; + +const ANALYZE_STATISTICS_CONFIDENCE: f64 = 0.95; +pub(crate) const ANALYZE_STATISTICS_RELATIVE_ERROR: f64 = 0.001; pub struct HistogramBuilder { - arena: Bump, index_id: IndexId, - capacity: Option, - is_init: bool, - null_count: usize, - values: Option>, - sort_keys: Option)>>, - - value_index: usize, + values_len: usize, + quantile: KllSketchBuilder, + sketch: CountMinSketch, } #[derive(Debug)] @@ -97,138 +94,90 @@ pub struct Bucket { } impl HistogramBuilder { - #[allow(clippy::missing_transmute_annotations)] - pub(crate) fn init(&mut self) { - if self.is_init { - return; - } - let (values, sort_keys) = self - .capacity - .map(|capacity| { - ( - NullableVec::<(usize, DataValue)>::with_capacity(capacity, &self.arena), - BumpVec::<(usize, BumpBytes<'static>)>::with_capacity_in(capacity, &self.arena), - ) - }) - .unwrap_or_else(|| (NullableVec::new(&self.arena), BumpVec::new_in(&self.arena))); - - self.values = Some(unsafe { mem::transmute::<_, _>(values) }); - self.sort_keys = Some(unsafe { mem::transmute::<_, _>(sort_keys) }); - self.is_init = true; - } - - pub fn new(index_meta: &IndexMeta, capacity: Option) -> Self { - Self { - arena: Default::default(), + pub fn new(index_meta: &IndexMeta, relative_error: f64) -> Result { + Ok(Self { index_id: index_meta.id, - capacity, - is_init: false, null_count: 0, - values: None, - sort_keys: None, - value_index: 0, - } + values_len: 0, + quantile: KllSketchBuilder::with_relative_error(relative_error)?, + sketch: CountMinSketch::with_relative_error( + ANALYZE_STATISTICS_CONFIDENCE, + relative_error, + )?, + }) } - #[allow(clippy::missing_transmute_annotations)] - pub fn append(&mut self, value: &DataValue) -> Result<(), DatabaseError> { - self.init(); + pub fn append(&mut self, value: DataValue) -> Result<(), DatabaseError> { if value.is_null() { self.null_count += 1; } else { - let mut bytes = BumpBytes::new_in(&self.arena); - - value.memcomparable_encode(&mut bytes)?; - self.values - .as_mut() - .unwrap() - .put((self.value_index, value.clone())); - self.sort_keys - .as_mut() - .unwrap() - .put((self.value_index, unsafe { mem::transmute::<_, _>(bytes) })) + self.sketch.increment(&value); + self.quantile.insert(value)?; + self.values_len += 1; } - self.value_index += 1; - Ok(()) } pub fn build( - mut self, + self, number_of_buckets: usize, ) -> Result<(Histogram, CountMinSketch), DatabaseError> { - self.init(); - let values_len = self.values.as_ref().unwrap().len(); + if number_of_buckets == 0 { + return Err(DatabaseError::InvalidValue( + "histogram bucket count must be greater than zero".to_string(), + )); + } + + let values_len = self.values_len; if number_of_buckets > values_len { return Err(DatabaseError::TooManyBuckets(number_of_buckets, values_len)); } - let mut sketch = CountMinSketch::new(values_len, 0.95, 1.0); let HistogramBuilder { - arena, index_id, null_count, - values, - sort_keys, + quantile, + mut sketch, .. } = self; - let mut values = values.unwrap(); - let mut sort_keys = sort_keys.unwrap(); let mut buckets = Vec::with_capacity(number_of_buckets); - let bucket_len = if values_len.is_multiple_of(number_of_buckets) { - values_len / number_of_buckets - } else { - (values_len + number_of_buckets) / number_of_buckets - }; - radix_sort(&mut sort_keys, &arena); + let quantile = quantile.build()?; + let correlation = quantile.correlation(); + let ranks = (0..number_of_buckets).flat_map(|i| { + [ + i * values_len / number_of_buckets, + ((i + 1) * values_len / number_of_buckets) - 1, + ] + }); + let mut values = quantile.values_at_ranks(ranks); for i in 0..number_of_buckets { - let mut bucket = Bucket::empty(); - let j = (i + 1) * bucket_len; - - bucket.upper = values - .get(sort_keys.get(cmp::min(j, values_len) - 1).0) - .1 - .clone(); - buckets.push(bucket); - } - let mut corr_xy_sum = 0.0; - let mut number_of_distinct_value = 0; - let mut last_value: Option = None; - - for (i, (index, _)) in sort_keys.into_iter().enumerate() { - let (ordinal, value) = values.take(index); - sketch.increment(&value); - - if let None | Some(true) = last_value.as_ref().map(|last_value| last_value != &value) { - last_value = Some(value.clone()); - number_of_distinct_value += 1; - } - - let bucket = &mut buckets[i / bucket_len]; - - if bucket.lower.is_null() { - bucket.lower = value; - } - bucket.count += 1; - - corr_xy_sum += i as f64 * ordinal as f64; + let lower_rank = i * values_len / number_of_buckets; + let upper_rank = ((i + 1) * values_len / number_of_buckets) - 1; + let lower = values.next().flatten().ok_or_else(|| { + DatabaseError::InvalidValue("KLL sketch failed to produce bucket lower".to_string()) + })?; + let upper = values.next().flatten().ok_or_else(|| { + DatabaseError::InvalidValue("KLL sketch failed to produce bucket upper".to_string()) + })?; + buckets.push(Bucket { + lower, + upper, + count: (upper_rank - lower_rank + 1) as u64, + }); } sketch.add(&DataValue::Null, self.null_count); - drop(values); - drop(arena); - Ok(( Histogram { meta: HistogramMeta { index_id, - number_of_distinct_value, + number_of_distinct_value: values_len, null_count, values_len, buckets_len: buckets.len(), - correlation: Self::calc_correlation(corr_xy_sum, values_len), + correlation, }, buckets, comparator: OnceLock::new(), @@ -236,18 +185,6 @@ impl HistogramBuilder { sketch, )) } - - // https://github.com/pingcap/tidb/blob/6957170f1147e96958e63db48148445a7670328e/pkg/statistics/builder.go#L210 - fn calc_correlation(corr_xy_sum: f64, values_len: usize) -> f64 { - if values_len == 1 { - return 1.0; - } - let item_count = values_len as f64; - let corr_x_sum = (item_count - 1.0) * item_count / 2.0; - let corr_x2_sum = (item_count - 1.0) * item_count * (2.0 * item_count - 1.0) / 6.0; - (item_count * corr_xy_sum - corr_x_sum * corr_x_sum) - / (item_count * corr_x2_sum - corr_x_sum * corr_x_sum) - } } impl BoundComparator { @@ -534,7 +471,7 @@ impl Histogram { Range::Scope { min, max } => { let bucket = &self.buckets[*bucket_i]; let mut bucket_count = bucket.count as usize; - if *bucket_i == 0 { + if *bucket_i == 0 && scope_lower_includes_null(min) { bucket_count += self.meta.null_count; } @@ -561,14 +498,14 @@ impl Histogram { } Bound::Excluded(val) => ( calc_fraction(&bucket.lower, &bucket.upper, val)?, - Some(sketch.estimate(val)), + endpoint_count(val, bucket, sketch), ), Bound::Unbounded => unreachable!(), }; let ratio = *distinct_1.max(OrderedFloat(temp_ratio).min(OrderedFloat(1.0))); temp_count += (bucket_count as f64 * ratio).ceil() as usize; if let Some(count) = option { - temp_count = temp_count.saturating_sub(count); + temp_count = subtract_endpoint_count(temp_count, count); } *bucket_i += 1; } else if is_under(comparator, &bucket.upper, max, false)? { @@ -578,14 +515,14 @@ impl Histogram { } Bound::Excluded(val) => ( calc_fraction(&bucket.lower, &bucket.upper, val)?, - Some(sketch.estimate(val)), + endpoint_count(val, bucket, sketch), ), Bound::Unbounded => unreachable!(), }; let ratio = *distinct_1.max(OrderedFloat(temp_ratio).min(OrderedFloat(1.0))); temp_count += (bucket_count as f64 * (1.0 - ratio)).ceil() as usize; if let Some(count) = option { - temp_count = temp_count.saturating_sub(count); + temp_count = subtract_endpoint_count(temp_count, count); } *bucket_i += 1; } else { @@ -595,7 +532,7 @@ impl Histogram { } Bound::Excluded(val) => ( calc_fraction(&bucket.lower, &bucket.upper, val)?, - Some(sketch.estimate(val)), + endpoint_count(val, bucket, sketch), ), Bound::Unbounded => unreachable!(), }; @@ -605,7 +542,7 @@ impl Histogram { } Bound::Excluded(val) => ( calc_fraction(&bucket.lower, &bucket.upper, val)?, - Some(sketch.estimate(val)), + endpoint_count(val, bucket, sketch), ), Bound::Unbounded => unreachable!(), }; @@ -613,17 +550,21 @@ impl Histogram { .max(OrderedFloat(temp_ratio_max - temp_ratio_min).min(OrderedFloat(1.0))); temp_count += (bucket_count as f64 * ratio).ceil() as usize; if let Some(count) = option_max { - temp_count = temp_count.saturating_sub(count); + temp_count = subtract_endpoint_count(temp_count, count); } if let Some(count) = option_min { - temp_count = temp_count.saturating_sub(count); + temp_count = subtract_endpoint_count(temp_count, count); } *binary_i += 1; } *count += cmp::max(temp_count, 0); } Range::Eq(value) => { - *count += sketch.estimate(value); + *count += if value.is_null() { + self.meta.null_count + } else { + sketch.estimate(value) + }; *binary_i += 1 } Range::Dummy => return Ok(true), @@ -634,6 +575,44 @@ impl Histogram { } } +fn subtract_endpoint_count(count: usize, endpoint_count: usize) -> usize { + if endpoint_count < count { + count.saturating_sub(endpoint_count) + } else if endpoint_count == count && count > 1 { + count - 1 + } else { + count + } +} + +fn endpoint_count( + value: &DataValue, + bucket: &Bucket, + sketch: &CountMinSketch, +) -> Option { + let bucket_key_type = bucket.lower.logical_type(); + debug_assert_eq!(bucket_key_type, bucket.upper.logical_type()); + + if value.logical_type() == bucket_key_type { + match value { + DataValue::Tuple(values, true) => { + Some(sketch.estimate(&DataValue::Tuple(values.clone(), false))) + } + _ => Some(sketch.estimate(value)), + } + } else { + None + } +} + +fn scope_lower_includes_null(min: &Bound) -> bool { + match min { + Bound::Unbounded => true, + Bound::Included(value) => value.is_null(), + Bound::Excluded(_) => false, + } +} + impl HistogramMeta { pub fn index_id(&self) -> IndexId { self.index_id @@ -648,23 +627,14 @@ impl HistogramMeta { } } -impl Bucket { - fn empty() -> Self { - let empty_value = DataValue::Null; - - Bucket { - lower: empty_value.clone(), - upper: empty_value, - count: 0, - } - } -} - #[cfg(all(test, not(target_arch = "wasm32")))] mod tests { use crate::errors::DatabaseError; use crate::expression::range_detacher::Range; - use crate::optimizer::core::histogram::{Bucket, HistogramBuilder}; + use crate::optimizer::core::cm_sketch::CountMinSketch; + use crate::optimizer::core::histogram::{ + Bucket, HistogramBuilder, ANALYZE_STATISTICS_RELATIVE_ERROR, + }; use crate::types::index::{IndexMeta, IndexType}; use crate::types::value::DataValue; use crate::types::LogicalType; @@ -684,28 +654,28 @@ mod tests { #[test] fn test_sort_tuples_on_histogram() -> Result<(), DatabaseError> { - let mut builder = HistogramBuilder::new(&index_meta(), Some(15)); + let mut builder = HistogramBuilder::new(&index_meta(), ANALYZE_STATISTICS_RELATIVE_ERROR)?; - builder.append(&DataValue::Int32(0))?; - builder.append(&DataValue::Int32(1))?; - builder.append(&DataValue::Int32(2))?; - builder.append(&DataValue::Int32(3))?; - builder.append(&DataValue::Int32(4))?; + builder.append(DataValue::Int32(0))?; + builder.append(DataValue::Int32(1))?; + builder.append(DataValue::Int32(2))?; + builder.append(DataValue::Int32(3))?; + builder.append(DataValue::Int32(4))?; - builder.append(&DataValue::Int32(5))?; - builder.append(&DataValue::Int32(6))?; - builder.append(&DataValue::Int32(7))?; - builder.append(&DataValue::Int32(8))?; - builder.append(&DataValue::Int32(9))?; + builder.append(DataValue::Int32(5))?; + builder.append(DataValue::Int32(6))?; + builder.append(DataValue::Int32(7))?; + builder.append(DataValue::Int32(8))?; + builder.append(DataValue::Int32(9))?; - builder.append(&DataValue::Int32(10))?; - builder.append(&DataValue::Int32(11))?; - builder.append(&DataValue::Int32(12))?; - builder.append(&DataValue::Int32(13))?; - builder.append(&DataValue::Int32(14))?; + builder.append(DataValue::Int32(10))?; + builder.append(DataValue::Int32(11))?; + builder.append(DataValue::Int32(12))?; + builder.append(DataValue::Int32(13))?; + builder.append(DataValue::Int32(14))?; - builder.append(&DataValue::Null)?; - builder.append(&DataValue::Null)?; + builder.append(DataValue::Null)?; + builder.append(DataValue::Null)?; // assert!(matches!(builder.build(10), Err(DataBaseError::TooManyBuckets))); @@ -750,28 +720,28 @@ mod tests { #[test] fn test_rev_sort_tuples_on_histogram() -> Result<(), DatabaseError> { - let mut builder = HistogramBuilder::new(&index_meta(), Some(15)); + let mut builder = HistogramBuilder::new(&index_meta(), ANALYZE_STATISTICS_RELATIVE_ERROR)?; - builder.append(&DataValue::Int32(14))?; - builder.append(&DataValue::Int32(13))?; - builder.append(&DataValue::Int32(12))?; - builder.append(&DataValue::Int32(11))?; - builder.append(&DataValue::Int32(10))?; + builder.append(DataValue::Int32(14))?; + builder.append(DataValue::Int32(13))?; + builder.append(DataValue::Int32(12))?; + builder.append(DataValue::Int32(11))?; + builder.append(DataValue::Int32(10))?; - builder.append(&DataValue::Int32(9))?; - builder.append(&DataValue::Int32(8))?; - builder.append(&DataValue::Int32(7))?; - builder.append(&DataValue::Int32(6))?; - builder.append(&DataValue::Int32(5))?; + builder.append(DataValue::Int32(9))?; + builder.append(DataValue::Int32(8))?; + builder.append(DataValue::Int32(7))?; + builder.append(DataValue::Int32(6))?; + builder.append(DataValue::Int32(5))?; - builder.append(&DataValue::Int32(4))?; - builder.append(&DataValue::Int32(3))?; - builder.append(&DataValue::Int32(2))?; - builder.append(&DataValue::Int32(1))?; - builder.append(&DataValue::Int32(0))?; + builder.append(DataValue::Int32(4))?; + builder.append(DataValue::Int32(3))?; + builder.append(DataValue::Int32(2))?; + builder.append(DataValue::Int32(1))?; + builder.append(DataValue::Int32(0))?; - builder.append(&DataValue::Null)?; - builder.append(&DataValue::Null)?; + builder.append(DataValue::Null)?; + builder.append(DataValue::Null)?; let (histogram, _) = builder.build(5)?; @@ -814,28 +784,28 @@ mod tests { #[test] fn test_non_average_on_histogram() -> Result<(), DatabaseError> { - let mut builder = HistogramBuilder::new(&index_meta(), Some(15)); + let mut builder = HistogramBuilder::new(&index_meta(), ANALYZE_STATISTICS_RELATIVE_ERROR)?; - builder.append(&DataValue::Int32(14))?; - builder.append(&DataValue::Int32(13))?; - builder.append(&DataValue::Int32(12))?; - builder.append(&DataValue::Int32(11))?; - builder.append(&DataValue::Int32(10))?; + builder.append(DataValue::Int32(14))?; + builder.append(DataValue::Int32(13))?; + builder.append(DataValue::Int32(12))?; + builder.append(DataValue::Int32(11))?; + builder.append(DataValue::Int32(10))?; - builder.append(&DataValue::Int32(4))?; - builder.append(&DataValue::Int32(3))?; - builder.append(&DataValue::Int32(2))?; - builder.append(&DataValue::Int32(1))?; - builder.append(&DataValue::Int32(0))?; + builder.append(DataValue::Int32(4))?; + builder.append(DataValue::Int32(3))?; + builder.append(DataValue::Int32(2))?; + builder.append(DataValue::Int32(1))?; + builder.append(DataValue::Int32(0))?; - builder.append(&DataValue::Int32(9))?; - builder.append(&DataValue::Int32(8))?; - builder.append(&DataValue::Int32(7))?; - builder.append(&DataValue::Int32(6))?; - builder.append(&DataValue::Int32(5))?; + builder.append(DataValue::Int32(9))?; + builder.append(DataValue::Int32(8))?; + builder.append(DataValue::Int32(7))?; + builder.append(DataValue::Int32(6))?; + builder.append(DataValue::Int32(5))?; - builder.append(&DataValue::Null)?; - builder.append(&DataValue::Null)?; + builder.append(DataValue::Null)?; + builder.append(DataValue::Null)?; let (histogram, _) = builder.build(4)?; @@ -847,23 +817,23 @@ mod tests { vec![ Bucket { lower: DataValue::Int32(0), - upper: DataValue::Int32(3), - count: 4, + upper: DataValue::Int32(2), + count: 3, }, Bucket { - lower: DataValue::Int32(4), - upper: DataValue::Int32(7), + lower: DataValue::Int32(3), + upper: DataValue::Int32(6), count: 4, }, Bucket { - lower: DataValue::Int32(8), - upper: DataValue::Int32(11), + lower: DataValue::Int32(7), + upper: DataValue::Int32(10), count: 4, }, Bucket { - lower: DataValue::Int32(12), + lower: DataValue::Int32(11), upper: DataValue::Int32(14), - count: 3, + count: 4, }, ] ); @@ -873,27 +843,27 @@ mod tests { #[test] fn test_collect_count() -> Result<(), DatabaseError> { - let mut builder = HistogramBuilder::new(&index_meta(), Some(15)); + let mut builder = HistogramBuilder::new(&index_meta(), ANALYZE_STATISTICS_RELATIVE_ERROR)?; - builder.append(&DataValue::Int32(14))?; - builder.append(&DataValue::Int32(13))?; - builder.append(&DataValue::Int32(12))?; - builder.append(&DataValue::Int32(11))?; - builder.append(&DataValue::Int32(10))?; + builder.append(DataValue::Int32(14))?; + builder.append(DataValue::Int32(13))?; + builder.append(DataValue::Int32(12))?; + builder.append(DataValue::Int32(11))?; + builder.append(DataValue::Int32(10))?; - builder.append(&DataValue::Int32(4))?; - builder.append(&DataValue::Int32(3))?; - builder.append(&DataValue::Int32(2))?; - builder.append(&DataValue::Int32(1))?; - builder.append(&DataValue::Int32(0))?; + builder.append(DataValue::Int32(4))?; + builder.append(DataValue::Int32(3))?; + builder.append(DataValue::Int32(2))?; + builder.append(DataValue::Int32(1))?; + builder.append(DataValue::Int32(0))?; - builder.append(&DataValue::Int32(9))?; - builder.append(&DataValue::Int32(8))?; - builder.append(&DataValue::Int32(7))?; - builder.append(&DataValue::Int32(6))?; - builder.append(&DataValue::Int32(5))?; + builder.append(DataValue::Int32(9))?; + builder.append(DataValue::Int32(8))?; + builder.append(DataValue::Int32(7))?; + builder.append(DataValue::Int32(6))?; + builder.append(DataValue::Int32(5))?; - builder.append(&DataValue::Null)?; + builder.append(DataValue::Null)?; let (histogram, sketch) = builder.build(4)?; @@ -958,7 +928,7 @@ mod tests { &sketch, )?; - assert_eq!(count_6, 13); + assert_eq!(count_6, 12); let count_7 = histogram.collect_count( &[Range::Scope { @@ -968,7 +938,7 @@ mod tests { &sketch, )?; - assert_eq!(count_7, 14); + assert_eq!(count_7, 13); let count_8 = histogram.collect_count( &[Range::Scope { @@ -998,7 +968,7 @@ mod tests { &sketch, )?; - assert_eq!(count_10, 3); + assert_eq!(count_10, 2); let count_11 = histogram.collect_count( &[Range::Scope { @@ -1012,4 +982,58 @@ mod tests { Ok(()) } + + #[test] + fn test_collect_count_ignores_tuple_prefix_endpoint_count() -> Result<(), DatabaseError> { + let mut builder = HistogramBuilder::new(&index_meta(), ANALYZE_STATISTICS_RELATIVE_ERROR)?; + + for value in 0..15 { + builder.append(DataValue::Tuple( + vec![DataValue::Int32(value), DataValue::Int32(value)], + false, + ))?; + } + + let (histogram, mut sketch) = builder.build(5)?; + let ranges = [Range::Scope { + min: Bound::Excluded(DataValue::Tuple(vec![DataValue::Int32(0)], false)), + max: Bound::Excluded(DataValue::Tuple(vec![DataValue::Int32(8)], true)), + }]; + let clean_count = histogram.collect_count(&ranges, &sketch)?; + + sketch.increment(&DataValue::Tuple(vec![DataValue::Int32(0)], false)); + sketch.increment(&DataValue::Tuple(vec![DataValue::Int32(8)], true)); + + assert_eq!(histogram.collect_count(&ranges, &sketch)?, clean_count); + + Ok(()) + } + + #[test] + fn test_endpoint_count_uses_only_full_histogram_keys() -> Result<(), DatabaseError> { + let bucket = Bucket { + lower: DataValue::Tuple(vec![DataValue::Int32(0), DataValue::Int32(0)], false), + upper: DataValue::Tuple(vec![DataValue::Int32(10), DataValue::Int32(10)], false), + count: 11, + }; + let mut sketch = CountMinSketch::with_relative_error( + super::ANALYZE_STATISTICS_CONFIDENCE, + ANALYZE_STATISTICS_RELATIVE_ERROR, + )?; + + let real_key = DataValue::Tuple(vec![DataValue::Int32(8), DataValue::Int32(8)], false); + let upper_bound = DataValue::Tuple(vec![DataValue::Int32(8), DataValue::Int32(8)], true); + let prefix_bound = DataValue::Tuple(vec![DataValue::Int32(8)], true); + + sketch.increment(&real_key); + sketch.increment(&prefix_bound); + + assert_eq!( + super::endpoint_count(&upper_bound, &bucket, &sketch), + Some(1) + ); + assert_eq!(super::endpoint_count(&prefix_bound, &bucket, &sketch), None); + + Ok(()) + } } diff --git a/src/optimizer/core/kll_sketch.rs b/src/optimizer/core/kll_sketch.rs new file mode 100644 index 00000000..5a6697fb --- /dev/null +++ b/src/optimizer/core/kll_sketch.rs @@ -0,0 +1,451 @@ +// Copyright 2024 KipData/KiteSQL +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::errors::DatabaseError; +use crate::types::value::DataValue; +use std::cmp::Ordering; +use std::iter::Peekable; + +#[derive(Debug, Clone, PartialEq, Eq)] +struct KllSketchItem { + value: DataValue, + ordinal: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct KllWeightedItem { + value: DataValue, + ordinal: usize, + weight: usize, +} + +pub struct KllSketchBuilder { + level_capacity: usize, + len: usize, + levels: Vec>, + min_value: Option, + max_value: Option, + compact_next_odd: bool, +} + +impl KllSketchBuilder { + fn new(level_capacity: usize) -> Result { + if level_capacity < 2 { + return Err(DatabaseError::InvalidValue(format!( + "KLL sketch level capacity must be at least 2, got {level_capacity}" + ))); + } + + Ok(Self { + level_capacity, + len: 0, + levels: vec![Vec::with_capacity(level_capacity)], + min_value: None, + max_value: None, + compact_next_odd: false, + }) + } + + pub fn with_relative_error(relative_error: f64) -> Result { + if relative_error <= 0.0 || relative_error.is_nan() { + return Err(DatabaseError::InvalidValue(format!( + "KLL sketch relative error must be greater than zero, got {relative_error}" + ))); + } + + Self::new(Self::capacity_for_relative_error(relative_error)?) + } + + pub fn insert(&mut self, value: DataValue) -> Result<(), DatabaseError> { + self.update_bounds(&value)?; + self.levels[0].push(KllSketchItem { + value, + ordinal: self.len, + }); + self.len += 1; + self.compact_if_needed(0) + } + + pub fn build(self) -> Result { + let Self { + len, + levels, + min_value, + max_value, + .. + } = self; + let retained_len = levels.iter().map(Vec::len).sum(); + let mut items = Vec::with_capacity(retained_len); + + for (level_idx, level) in levels.into_iter().enumerate() { + let weight = 1usize << level_idx; + items.extend(level.into_iter().map(|item| KllWeightedItem { + value: item.value, + ordinal: item.ordinal, + weight, + })); + } + items.sort_by(|left, right| compare_values(&left.value, &right.value)); + + Ok(KllSketch { + len, + items, + min_value, + max_value, + }) + } + + #[cfg(test)] + fn len(&self) -> usize { + self.len + } + + #[cfg(test)] + fn levels_len(&self) -> usize { + self.levels.len() + } + + #[cfg(test)] + fn retained_len(&self) -> usize { + self.levels.iter().map(Vec::len).sum() + } + + fn update_bounds(&mut self, value: &DataValue) -> Result<(), DatabaseError> { + if let Some(min_value) = &self.min_value { + if compare_values(value, min_value) == Ordering::Less { + self.min_value = Some(value.clone()); + } + } else { + self.min_value = Some(value.clone()); + } + + if let Some(max_value) = &self.max_value { + if compare_values(value, max_value) == Ordering::Greater { + self.max_value = Some(value.clone()); + } + } else { + self.max_value = Some(value.clone()); + } + Ok(()) + } + + fn compact_if_needed(&mut self, level_idx: usize) -> Result<(), DatabaseError> { + if self.levels[level_idx].len() <= self.level_capacity { + return Ok(()); + } + + self.compact(level_idx)?; + self.compact_if_needed(level_idx + 1) + } + + fn compact(&mut self, level_idx: usize) -> Result<(), DatabaseError> { + let level = &mut self.levels[level_idx]; + level.sort_by(|left, right| compare_values(&left.value, &right.value)); + + let keep_odd = self.compact_next_odd; + self.compact_next_odd = !self.compact_next_odd; + + let mut promoted = Vec::with_capacity(level.len().div_ceil(2)); + let mut retained = Vec::new(); + if level.len() % 2 == 1 { + retained.push(level.pop().expect("odd level has a tail item")); + } + + for (idx, item) in level.drain(..).enumerate() { + if idx % 2 == usize::from(keep_odd) { + promoted.push(item); + } + } + *level = retained; + + if self.levels.len() == level_idx + 1 { + self.levels.push(Vec::with_capacity(self.level_capacity)); + } + self.levels[level_idx + 1].extend(promoted); + Ok(()) + } + + fn capacity_for_relative_error(relative_error: f64) -> Result { + let capacity = (2.0 / relative_error).ceil(); + if !capacity.is_finite() || capacity > usize::MAX as f64 { + return Err(DatabaseError::InvalidValue(format!( + "KLL sketch relative error is too small: {relative_error}" + ))); + } + + Ok(capacity as usize) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KllSketch { + len: usize, + items: Vec, + min_value: Option, + max_value: Option, +} + +impl KllSketch { + pub fn correlation(&self) -> f64 { + if self.len == 0 { + return 0.0; + } + + let mut corr_xy_sum = 0.0; + let mut rank = 0usize; + for item in self.items.iter() { + let next_rank = rank + item.weight; + let rank_sum = (rank + next_rank - 1) as f64 * item.weight as f64 / 2.0; + corr_xy_sum += rank_sum * item.ordinal as f64; + rank = next_rank; + } + + Self::calc_correlation(corr_xy_sum, self.len) + } + + pub fn values_at_ranks(self, ranks: I) -> impl Iterator> + where + I: IntoIterator, + { + let Self { + len, + items, + min_value, + max_value, + } = self; + + KllValuesAtRanks { + len, + ranks: ranks.into_iter().peekable(), + items: items.into_iter(), + current: None, + accumulated: 0, + last_rank: None, + min_value, + max_value, + } + } + + #[allow(dead_code)] + fn len(&self) -> usize { + self.len + } + + // https://github.com/pingcap/tidb/blob/6957170f1147e96958e63db48148445a7670328e/pkg/statistics/builder.go#L210 + fn calc_correlation(corr_xy_sum: f64, values_len: usize) -> f64 { + if values_len == 1 { + return 1.0; + } + let item_count = values_len as f64; + let corr_x_sum = (item_count - 1.0) * item_count / 2.0; + let corr_x2_sum = (item_count - 1.0) * item_count * (2.0 * item_count - 1.0) / 6.0; + (item_count * corr_xy_sum - corr_x_sum * corr_x_sum) + / (item_count * corr_x2_sum - corr_x_sum * corr_x_sum) + } +} + +struct KllValuesAtRanks> { + len: usize, + ranks: Peekable, + items: std::vec::IntoIter, + current: Option, + accumulated: usize, + last_rank: Option, + min_value: Option, + max_value: Option, +} + +impl> Iterator for KllValuesAtRanks { + type Item = Option; + + fn next(&mut self) -> Option { + let rank = self.ranks.next()?; + debug_assert!( + self.last_rank.is_none_or(|last_rank| rank >= last_rank), + "KLL ranks must be sorted" + ); + self.last_rank = Some(rank); + if self.len == 0 { + return Some(None); + } + + let ordinal = rank.saturating_add(1); + let next_ordinal = self.ranks.peek().map(|rank| rank.saturating_add(1)); + if ordinal == 1 { + return Some(Self::emit_value(&mut self.min_value, next_ordinal, 1)); + } + if ordinal >= self.len { + return Some(Self::emit_value( + &mut self.max_value, + next_ordinal, + usize::MAX, + )); + } + + while self.accumulated < ordinal { + if let Some(item) = self.items.next() { + self.accumulated += item.weight; + self.current = Some(item); + } else { + self.current = None; + break; + } + } + + Some(if self.current.is_some() && self.accumulated >= ordinal { + let upper_ordinal = self.accumulated.min(self.len - 1); + self.emit_current(next_ordinal, upper_ordinal) + } else { + None + }) + } +} + +impl> KllValuesAtRanks { + fn emit_value( + value: &mut Option, + next_ordinal: Option, + upper_ordinal: usize, + ) -> Option { + if next_ordinal.is_some_and(|next_ordinal| next_ordinal <= upper_ordinal) { + value.clone() + } else { + value.take() + } + } + + fn emit_current( + &mut self, + next_ordinal: Option, + upper_ordinal: usize, + ) -> Option { + if next_ordinal.is_some_and(|next_ordinal| next_ordinal <= upper_ordinal) { + self.current.as_ref().map(|item| item.value.clone()) + } else { + self.current.take().map(|item| item.value) + } + } +} + +#[inline] +fn compare_values(left: &DataValue, right: &DataValue) -> Ordering { + left.partial_cmp(right) + .expect("KLL sketch values must be mutually comparable") +} + +#[cfg(test)] +mod tests { + use super::KllSketchBuilder; + use crate::errors::DatabaseError; + use crate::types::value::DataValue; + + fn build_builder(level_capacity: usize, len: i32) -> Result { + let mut builder = KllSketchBuilder::new(level_capacity)?; + for value in 0..len { + builder.insert(DataValue::Int32(value))?; + } + Ok(builder) + } + + #[test] + fn kll_sketch_keeps_bounded_retained_items() -> Result<(), DatabaseError> { + let builder = build_builder(32, 10_000)?; + + assert_eq!(builder.len(), 10_000); + assert!(builder.levels_len() > 1); + assert!(builder.retained_len() < 32 * 16); + + Ok(()) + } + + #[test] + fn kll_sketch_estimates_quantiles() -> Result<(), DatabaseError> { + let sketch = build_builder(256, 10_000)?.build()?; + let values = sketch + .values_at_ranks([0, 4_999, 8_999, 9_999]) + .collect::>(); + + assert_eq!(values[0], Some(DataValue::Int32(0))); + assert_eq!(values[3], Some(DataValue::Int32(9999))); + + let p50 = match values[1].clone().unwrap() { + DataValue::Int32(value) => value, + value => panic!("unexpected value: {value:?}"), + }; + let p90 = match values[2].clone().unwrap() { + DataValue::Int32(value) => value, + value => panic!("unexpected value: {value:?}"), + }; + + assert!((4_500..=5_500).contains(&p50), "p50={p50}"); + assert!((8_500..=9_500).contains(&p90), "p90={p90}"); + + Ok(()) + } + + #[test] + fn kll_sketch_handles_reversed_input() -> Result<(), DatabaseError> { + let mut builder = KllSketchBuilder::new(64)?; + for value in (0..1_000).rev() { + builder.insert(DataValue::Int32(value))?; + } + let sketch = builder.build()?; + + let quantiles = sketch + .values_at_ranks([0, 249, 499, 749, 999]) + .map(|value| value.expect("non-empty sketch")) + .collect::>(); + assert_eq!(quantiles.first(), Some(&DataValue::Int32(0))); + assert_eq!(quantiles.last(), Some(&DataValue::Int32(999))); + assert_eq!(quantiles.len(), 5); + + Ok(()) + } + + #[test] + fn kll_sketch_correlation_tracks_original_order() -> Result<(), DatabaseError> { + let ascending = build_builder(64, 15)?.build()?; + assert_eq!(ascending.correlation(), 1.0); + + let mut descending = KllSketchBuilder::new(64)?; + for value in (0..15).rev() { + descending.insert(DataValue::Int32(value))?; + } + assert_eq!(descending.build()?.correlation(), -1.0); + + let mut mixed = KllSketchBuilder::new(64)?; + for value in [14, 13, 12, 11, 10, 4, 3, 2, 1, 0, 9, 8, 7, 6, 5] { + mixed.insert(DataValue::Int32(value))?; + } + assert!(mixed.build()?.correlation() < 0.0); + + Ok(()) + } + + #[test] + fn kll_sketch_values_at_ranks_accepts_sorted_ranks() -> Result<(), DatabaseError> { + let sketch = build_builder(64, 1_000)?.build()?; + let values = sketch.values_at_ranks([0, 499, 999]).collect::>(); + + assert_eq!(values[0], Some(DataValue::Int32(0))); + assert_eq!(values[2], Some(DataValue::Int32(999))); + let mid = match values[1].clone().unwrap() { + DataValue::Int32(value) => value, + value => panic!("unexpected value: {value:?}"), + }; + assert!((400..=600).contains(&mid), "mid={mid}"); + + Ok(()) + } +} diff --git a/src/optimizer/core/mod.rs b/src/optimizer/core/mod.rs index 87ee196a..e43353d7 100644 --- a/src/optimizer/core/mod.rs +++ b/src/optimizer/core/mod.rs @@ -14,6 +14,7 @@ pub(crate) mod cm_sketch; pub(crate) mod histogram; +pub(crate) mod kll_sketch; pub(crate) mod pattern; pub(crate) mod rule; pub(crate) mod statistics_meta; diff --git a/src/optimizer/core/statistics_meta.rs b/src/optimizer/core/statistics_meta.rs index 410e51c7..4c042b6a 100644 --- a/src/optimizer/core/statistics_meta.rs +++ b/src/optimizer/core/statistics_meta.rs @@ -141,12 +141,11 @@ impl StatisticsMeta { #[cfg(all(test, not(target_arch = "wasm32")))] mod tests { use crate::errors::DatabaseError; - use crate::optimizer::core::histogram::HistogramBuilder; + use crate::optimizer::core::histogram::{HistogramBuilder, ANALYZE_STATISTICS_RELATIVE_ERROR}; use crate::optimizer::core::statistics_meta::StatisticsMeta; use crate::types::index::{IndexMeta, IndexType}; use crate::types::value::DataValue; use crate::types::LogicalType; - use std::sync::Arc; #[test] fn test_into_parts_and_from_parts() -> Result<(), DatabaseError> { @@ -160,25 +159,25 @@ mod tests { ty: IndexType::PrimaryKey { is_multiple: false }, }; - let mut builder = HistogramBuilder::new(&index, Some(15)); - - builder.append(&Arc::new(DataValue::Int32(14)))?; - builder.append(&Arc::new(DataValue::Int32(13)))?; - builder.append(&Arc::new(DataValue::Int32(12)))?; - builder.append(&Arc::new(DataValue::Int32(11)))?; - builder.append(&Arc::new(DataValue::Int32(10)))?; - builder.append(&Arc::new(DataValue::Int32(4)))?; - builder.append(&Arc::new(DataValue::Int32(3)))?; - builder.append(&Arc::new(DataValue::Int32(2)))?; - builder.append(&Arc::new(DataValue::Int32(1)))?; - builder.append(&Arc::new(DataValue::Int32(0)))?; - builder.append(&Arc::new(DataValue::Int32(9)))?; - builder.append(&Arc::new(DataValue::Int32(8)))?; - builder.append(&Arc::new(DataValue::Int32(7)))?; - builder.append(&Arc::new(DataValue::Int32(6)))?; - builder.append(&Arc::new(DataValue::Int32(5)))?; - builder.append(&Arc::new(DataValue::Null))?; - builder.append(&Arc::new(DataValue::Null))?; + let mut builder = HistogramBuilder::new(&index, ANALYZE_STATISTICS_RELATIVE_ERROR)?; + + builder.append(DataValue::Int32(14))?; + builder.append(DataValue::Int32(13))?; + builder.append(DataValue::Int32(12))?; + builder.append(DataValue::Int32(11))?; + builder.append(DataValue::Int32(10))?; + builder.append(DataValue::Int32(4))?; + builder.append(DataValue::Int32(3))?; + builder.append(DataValue::Int32(2))?; + builder.append(DataValue::Int32(1))?; + builder.append(DataValue::Int32(0))?; + builder.append(DataValue::Int32(9))?; + builder.append(DataValue::Int32(8))?; + builder.append(DataValue::Int32(7))?; + builder.append(DataValue::Int32(6))?; + builder.append(DataValue::Int32(5))?; + builder.append(DataValue::Null)?; + builder.append(DataValue::Null)?; let (histogram, sketch) = builder.build(4)?; let expected_estimate = sketch.estimate(&DataValue::Int32(7)); diff --git a/src/optimizer/rule/implementation/dql/table_scan.rs b/src/optimizer/rule/implementation/dql/table_scan.rs index d2d725bf..b32bdc9f 100644 --- a/src/optimizer/rule/implementation/dql/table_scan.rs +++ b/src/optimizer/rule/implementation/dql/table_scan.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::errors::DatabaseError; +use crate::expression::range_detacher::Range; use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate}; use crate::optimizer::core::rule::{BestPhysicalOption, ImplementationRule, MatchPattern}; use crate::optimizer::core::statistics_meta::StatisticMetaLoader; @@ -96,6 +97,7 @@ impl ImplementationRule for IndexScanImplementation { if let Some(mut row_count) = loader.collect_count(&scan_op.table_name, index_meta.id, range)? { + row_count = adjust_index_row_count(index_meta.ty, range, row_count); if index_info.covered_deserializers.is_none() && !matches!(index_meta.ty, IndexType::PrimaryKey { .. }) { @@ -118,9 +120,19 @@ impl ImplementationRule for IndexScanImplementation { // | // └────────────────────── hint_sum // 2 4 8 16 32 - let hint_sum = index_info.sort_elimination_hint.unwrap_or(0) - + index_info.stream_distinct_hint.unwrap_or(0); + let hint_sum = index_info + .sort_elimination_hint + .map(|hint| hint.cover_num()) + .unwrap_or(0) + + index_info + .stream_distinct_hint + .map(|hint| hint.cover_num()) + .unwrap_or(0); if hint_sum > 0 { + // TODO: use histogram correlation to refine the cost of ordered index + // scans. A hint here means an ancestor has an ordering requirement that + // index_info.sort_option can satisfy; correlation can estimate whether the + // underlying row access is closer to sequential IO or random IO. let rows = row_count.max(1) as f64; let raw_bonus = rows * rows.log2(); let hint_weight = (hint_sum as f64).log2().max(1.0); @@ -147,3 +159,28 @@ impl ImplementationRule for IndexScanImplementation { } } } + +fn adjust_index_row_count(index_type: IndexType, range: &Range, row_count: usize) -> usize { + let row_count = unique_eq_row_count(index_type, range).unwrap_or(row_count); + if row_count == 0 && !matches!(range, Range::Dummy) { + 1 + } else { + row_count + } +} + +fn unique_eq_row_count(index_type: IndexType, range: &Range) -> Option { + match range { + Range::Dummy => Some(0), + Range::Eq(value) + if !value.is_null() + && matches!(index_type, IndexType::PrimaryKey { .. } | IndexType::Unique) => + { + Some(1) + } + Range::SortedRanges(ranges) => ranges.iter().try_fold(0usize, |count, range| { + unique_eq_row_count(index_type, range).map(|row_count| count + row_count) + }), + _ => None, + } +} diff --git a/src/optimizer/rule/normalization/agg_elimination.rs b/src/optimizer/rule/normalization/agg_elimination.rs index 99c806bd..d23b04d3 100644 --- a/src/optimizer/rule/normalization/agg_elimination.rs +++ b/src/optimizer/rule/normalization/agg_elimination.rs @@ -21,6 +21,7 @@ use crate::planner::operator::sort::SortField; use crate::planner::operator::table_scan::TableScanOperator; use crate::planner::operator::{Operator, PhysicalOption, PlanImpl, SortOption}; use crate::planner::{Childrens, LogicalPlan}; +use crate::types::index::IndexOrderHint; pub struct EliminateRedundantSort; @@ -152,18 +153,18 @@ pub(crate) fn apply_scan_order_hint( let covered = hint_len(required); match hint { OrderHintKind::SortElimination => { - index_info.sort_elimination_hint = Some( - index_info - .sort_elimination_hint - .map_or(covered, |old| old.max(covered)), - ); + if let Some(hint) = &mut index_info.sort_elimination_hint { + hint.merge_cover_num(covered); + } else { + index_info.sort_elimination_hint = Some(IndexOrderHint::new(covered)); + } } OrderHintKind::StreamDistinct => { - index_info.stream_distinct_hint = Some( - index_info - .stream_distinct_hint - .map_or(covered, |old| old.max(covered)), - ); + if let Some(hint) = &mut index_info.stream_distinct_hint { + hint.merge_cover_num(covered); + } else { + index_info.stream_distinct_hint = Some(IndexOrderHint::new(covered)); + } } } } @@ -695,7 +696,12 @@ mod tests { }; assert_eq!(scan_op.index_infos.len(), 1); - assert_eq!(scan_op.index_infos[0].stream_distinct_hint, Some(1)); + assert_eq!( + scan_op.index_infos[0] + .stream_distinct_hint + .map(|hint| hint.cover_num()), + Some(1) + ); Ok(()) } diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index 1b6b9a42..3377eb12 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -1004,7 +1004,7 @@ mod tests { use crate::catalog::{ColumnCatalog, ColumnDesc, ColumnRelation, TableCatalog, TableMeta}; use crate::errors::DatabaseError; use crate::iter_ext::Itertools; - use crate::optimizer::core::histogram::HistogramBuilder; + use crate::optimizer::core::histogram::{HistogramBuilder, ANALYZE_STATISTICS_RELATIVE_ERROR}; use crate::optimizer::core::statistics_meta::StatisticsMeta; use crate::planner::{PlanArena, TableArenaCell}; use crate::serdes::ReferenceTables; @@ -1102,10 +1102,10 @@ mod tests { name: "pk_c1".to_string(), ty: IndexType::PrimaryKey { is_multiple: false }, }; - let mut builder = HistogramBuilder::new(&index_meta, Some(8)); + let mut builder = HistogramBuilder::new(&index_meta, ANALYZE_STATISTICS_RELATIVE_ERROR)?; for value in 0..4 { - builder.append(&DataValue::Int32(value))?; + builder.append(DataValue::Int32(value))?; } let (histogram, sketch) = builder.build(2)?; let (root, buckets, _) = StatisticsMeta::new(histogram, sketch.clone()).into_parts(); diff --git a/src/types/index.rs b/src/types/index.rs index 2404c285..c122d3bb 100644 --- a/src/types/index.rs +++ b/src/types/index.rs @@ -75,8 +75,27 @@ pub struct IndexInfo { pub(crate) lookup: Option, pub(crate) covered_deserializers: Option>, pub(crate) cover_mapping: Option>, - pub(crate) sort_elimination_hint: Option, - pub(crate) stream_distinct_hint: Option, + pub(crate) sort_elimination_hint: Option, + pub(crate) stream_distinct_hint: Option, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, ReferenceSerialization)] +pub struct IndexOrderHint { + cover_num: usize, +} + +impl IndexOrderHint { + pub(crate) fn new(cover_num: usize) -> Self { + Self { cover_num } + } + + pub(crate) fn cover_num(self) -> usize { + self.cover_num + } + + pub(crate) fn merge_cover_num(&mut self, cover_num: usize) { + self.cover_num = self.cover_num.max(cover_num); + } } #[derive(Debug, Clone, Eq, PartialEq, Hash, ReferenceSerialization)] diff --git a/src/types/value.rs b/src/types/value.rs index 138a1191..12299827 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -326,6 +326,52 @@ impl PartialEq for DataValue { } } +fn tuple_partial_cmp( + (left, left_is_upper): (&[DataValue], bool), + (right, right_is_upper): (&[DataValue], bool), +) -> Option { + let mut left_iter = left.iter(); + let mut right_iter = right.iter(); + + loop { + match (left_iter.next(), right_iter.next()) { + (Some(left), Some(right)) => { + let ordering = tuple_element_partial_cmp(left, right)?; + if ordering != Ordering::Equal { + return Some(ordering); + } + } + (Some(_), None) => { + return Some(if right_is_upper { + Ordering::Less + } else { + Ordering::Greater + }); + } + (None, Some(_)) => { + return Some(if left_is_upper { + Ordering::Greater + } else { + Ordering::Less + }); + } + (None, None) => return Some(Ordering::Equal), + } + } +} + +fn tuple_element_partial_cmp(left: &DataValue, right: &DataValue) -> Option { + match (left, right) { + (DataValue::Null, DataValue::Null) => Some(Ordering::Equal), + (DataValue::Null, _) => Some(Ordering::Greater), + (_, DataValue::Null) => Some(Ordering::Less), + (DataValue::Tuple(left, left_is_upper), DataValue::Tuple(right, right_is_upper)) => { + tuple_partial_cmp((left, *left_is_upper), (right, *right_is_upper)) + } + _ => left.partial_cmp(right), + } +} + impl PartialOrd for DataValue { fn partial_cmp(&self, other: &Self) -> Option { use DataValue::*; @@ -368,6 +414,9 @@ impl PartialOrd for DataValue { (Decimal(v1), Decimal(v2)) => v1.partial_cmp(v2), #[cfg(feature = "decimal")] (Decimal(_), _) => None, + (Tuple(v1, is_upper1), Tuple(v2, is_upper2)) => { + tuple_partial_cmp((v1, *is_upper1), (v2, *is_upper2)) + } (Tuple(..), _) => None, } } @@ -1680,8 +1729,24 @@ mod test { use bumpalo::Bump; use ordered_float::OrderedFloat; use rust_decimal::Decimal; + use std::cmp::Ordering; use std::io::Cursor; + #[test] + fn test_tuple_partial_cmp() { + let tuple_1 = DataValue::Tuple(vec![DataValue::Int32(1), DataValue::Int32(2)], false); + let tuple_2 = DataValue::Tuple(vec![DataValue::Int32(1), DataValue::Int32(3)], false); + let tuple_with_null = DataValue::Tuple(vec![DataValue::Int32(1), DataValue::Null], false); + let lower_prefix = DataValue::Tuple(vec![DataValue::Int32(1)], false); + let upper_prefix = DataValue::Tuple(vec![DataValue::Int32(1)], true); + + assert_eq!(tuple_1.partial_cmp(&tuple_2), Some(Ordering::Less)); + assert_eq!(tuple_2.partial_cmp(&tuple_with_null), Some(Ordering::Less)); + assert_eq!(lower_prefix.partial_cmp(&tuple_1), Some(Ordering::Less)); + assert_eq!(upper_prefix.partial_cmp(&tuple_1), Some(Ordering::Greater)); + assert_eq!(DataValue::Null.partial_cmp(&DataValue::Int32(1)), None); + } + #[test] fn test_mem_comparable_null() -> Result<(), DatabaseError> { let arena = Bump::new();