From cb66e8e755824319007f5ff90e1663c9d6f51137 Mon Sep 17 00:00:00 2001 From: waralexrom <108349432+waralexrom@users.noreply.github.com> Date: Wed, 29 Apr 2026 11:43:47 +0200 Subject: [PATCH 1/3] fix(tesseract): CASE handling fixes in multi-stage measures (#10764) --- packages/cubejs-backend-native/Cargo.lock | 38 ++++-- rust/cubesqlplanner/Cargo.lock | 1 + rust/cubesqlplanner/cubesqlplanner/Cargo.toml | 1 + .../multi_stage/multi_stage_query_planner.rs | 117 +++++++++--------- .../common/integration_multi_stage.yaml | 2 +- .../integration/multi_stage/case_switch.rs | 34 +++-- ...se_switch__case_switch_dimension_only.snap | 9 ++ ...age__case_switch__case_switch_measure.snap | 9 ++ ...se_switch__case_switch_with_dimension.snap | 15 +++ ..._case_switch__case_switch_with_filter.snap | 9 ++ 10 files changed, 155 insertions(+), 80 deletions(-) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_dimension_only.snap create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_measure.snap create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_dimension.snap create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_filter.snap diff --git a/packages/cubejs-backend-native/Cargo.lock b/packages/cubejs-backend-native/Cargo.lock index b5bb97e712466..94503944e574b 100644 --- a/packages/cubejs-backend-native/Cargo.lock +++ b/packages/cubejs-backend-native/Cargo.lock @@ -785,7 +785,7 @@ dependencies = [ "anyhow", "chrono", "cubeshared", - "indexmap 2.7.1", + "indexmap 2.14.0", "itertools 0.13.0", "neon", "serde", @@ -855,6 +855,7 @@ dependencies = [ "convert_case 0.7.1", "cubeclient", "cubenativeutils", + "indexmap 2.14.0", "indoc", "itertools 0.10.5", "lazy_static", @@ -1370,6 +1371,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + [[package]] name = "heck" version = "0.3.3" @@ -1711,13 +1718,14 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.7.1" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.15.2", + "hashbrown 0.17.0", "serde", + "serde_core", ] [[package]] @@ -3084,18 +3092,28 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "serde" -version = "1.0.217" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -3108,7 +3126,7 @@ version = "1.0.138" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" dependencies = [ - "indexmap 2.7.1", + "indexmap 2.14.0", "itoa", "memchr", "ryu", @@ -3691,7 +3709,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" dependencies = [ - "indexmap 2.7.1", + "indexmap 2.14.0", "toml_datetime", "winnow", ] diff --git a/rust/cubesqlplanner/Cargo.lock b/rust/cubesqlplanner/Cargo.lock index cc694cb3b7d19..1d9e35dcc8f7b 100644 --- a/rust/cubesqlplanner/Cargo.lock +++ b/rust/cubesqlplanner/Cargo.lock @@ -356,6 +356,7 @@ dependencies = [ "convert_case 0.7.1", "cubeclient", "cubenativeutils", + "indexmap 2.12.0", "indoc", "insta", "itertools", diff --git a/rust/cubesqlplanner/cubesqlplanner/Cargo.toml b/rust/cubesqlplanner/cubesqlplanner/Cargo.toml index 92720295f974f..2417b0ad4af87 100644 --- a/rust/cubesqlplanner/cubesqlplanner/Cargo.toml +++ b/rust/cubesqlplanner/cubesqlplanner/Cargo.toml @@ -22,6 +22,7 @@ lazy_static = "1.4.0" regex = "1.3.9" typed-builder = "0.21.2" indoc = "2.0.7" +indexmap = "2.12" [features] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs index c7e43b32d8d48..6ae27098f4c42 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs @@ -21,6 +21,7 @@ use crate::planner::sql_evaluator::MemberSymbol; use crate::planner::GranularityHelper; use crate::planner::QueryProperties; use cubenativeutils::CubeError; +use indexmap::IndexMap; use itertools::Itertools; use std::collections::HashSet; use std::rc::Rc; @@ -238,7 +239,6 @@ impl MultiStageQueryPlanner { Ok(()) } - //TODO refactor needed fn try_make_childs_for_case_switch( &self, case: &CaseSwitchDefinition, @@ -248,67 +248,72 @@ impl MultiStageQueryPlanner { resolved_multi_stage_dimensions: &mut HashSet, cte_state: &mut CteState, ) -> Result { - if let CaseSwitchItem::Member(switch_member) = &case.switch { - let mut processed_deps = HashSet::new(); - for itm in &case.items { - let mut state = new_state.clone_state(); - let filter = BaseFilter::try_new( - self.query_tools.clone(), - switch_member.clone(), - FilterType::Dimension, - FilterOperator::Equal, - Some(vec![Some(itm.value.clone())]), - )?; - state.add_dimension_filter(FilterItem::Item(filter)); - let state = Rc::new(state); - - for dep in itm.sql.get_dependencies() { - let dep = dep.resolve_reference_chain(); - if !processed_deps.contains(&dep.full_name()) { - processed_deps.insert(dep.full_name()); - result.push(self.make_queries_descriptions( - dep, - state.clone(), - descriptions, - resolved_multi_stage_dimensions, - cte_state, - )?); + let CaseSwitchItem::Member(switch_member) = &case.switch else { + return Ok(false); + }; + + // Collect, per dependency, the union of switch values that need it. + // `None` marks an unrestricted (open ELSE) entry: such a dependency + // must be processed without a prefilter on switch_member, since the + // outer CASE will dispatch by value at row level. + let mut deps: IndexMap, Option>)> = IndexMap::new(); + + let mut record = |dep: Rc, branch_values: Option>| { + let dep = dep.resolve_reference_chain(); + let entry = deps + .entry(dep.full_name()) + .or_insert_with(|| (dep.clone(), Some(Vec::new()))); + match (&mut entry.1, branch_values) { + (None, _) => {} // already unrestricted + (slot @ Some(_), None) => *slot = None, + (Some(values), Some(branch)) => { + for v in branch { + if !values.contains(&v) { + values.push(v); + } } } } - if let Some(else_sql) = &case.else_sql { - let mut state = new_state.clone_state(); - if let Some(else_values) = case.get_else_values() { - if !else_values.is_empty() { - let filter = BaseFilter::try_new( - self.query_tools.clone(), - switch_member.clone(), - FilterType::Dimension, - FilterOperator::Equal, - Some(else_values.into_iter().map(|v| Some(v)).collect_vec()), - )?; - state.add_dimension_filter(FilterItem::Item(filter)); - } - } - let state = Rc::new(state); - for dep in else_sql.get_dependencies() { - let dep = dep.resolve_reference_chain(); - if !processed_deps.contains(&dep.full_name()) { - processed_deps.insert(dep.full_name()); - result.push(self.make_queries_descriptions( - dep, - state.clone(), - descriptions, - resolved_multi_stage_dimensions, - cte_state, - )?); - } + }; + + for itm in &case.items { + for dep in itm.sql.get_dependencies() { + record(dep, Some(vec![itm.value.clone()])); + } + } + + if let Some(else_sql) = &case.else_sql { + let else_values = case.get_else_values(); + for dep in else_sql.get_dependencies() { + record(dep.clone(), else_values.clone()); + } + } + + for (_, (dep, values)) in deps { + let mut state = new_state.clone_state(); + if let Some(values) = values { + if !values.is_empty() { + let filter = BaseFilter::try_new( + self.query_tools.clone(), + switch_member.clone(), + FilterType::Dimension, + FilterOperator::Equal, + Some(values.into_iter().map(Some).collect_vec()), + )?; + state.add_dimension_filter(FilterItem::Item(filter)); } } - Ok(true) - } else { - Ok(false) + let state = Rc::new(state); + result.push(self.make_queries_descriptions( + dep, + state, + descriptions, + resolved_multi_stage_dimensions, + cte_state, + )?); } + + Ok(true) } fn make_queries_descriptions( diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml index 3df3463546b7d..3d915bafeb624 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_multi_stage.yaml @@ -223,7 +223,7 @@ cubes: dir: desc - name: status_weighted_amount - type: number + type: sum multi_stage: true case: switch: "{CUBE.status}" diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs index 0738b8ef87fa1..3921bfae45265 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs @@ -9,17 +9,6 @@ fn create_context() -> TestContext { const SEED: &str = "integration_multi_stage_tables.sql"; -// BUG: Case/switch multi-stage measures generate incorrect SQL. -// The leaf CTE is filtered by the first WHEN value (e.g. status = 'completed') -// even though no such filter was specified in the query. -// Expected: each WHEN value should produce a separate leaf CTE with its own -// filter, then the final CTE combines results via CASE expression -// (cross-join pattern, similar to calc_groups test). -// Actual: only one leaf CTE with first WHEN value as filter, so only -// 'completed' rows are processed. Result shows completed totals (300, 500, 600) -// instead of weighted sums across all statuses (375, 600, 750). - -#[ignore] #[tokio::test(flavor = "multi_thread")] async fn test_case_switch_measure() { let ctx = create_context(); @@ -42,7 +31,6 @@ async fn test_case_switch_measure() { } } -#[ignore] #[tokio::test(flavor = "multi_thread")] async fn test_case_switch_with_dimension() { let ctx = create_context(); @@ -60,6 +48,27 @@ async fn test_case_switch_with_dimension() { - "2024-03-31" order: - id: orders.status + - id: orders.created_at + "#}; + + ctx.build_sql(query).unwrap(); + + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_case_switch_dimension_only() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.status_weighted_amount + dimensions: + - orders.status + order: + - id: orders.status "#}; ctx.build_sql(query).unwrap(); @@ -69,7 +78,6 @@ async fn test_case_switch_with_dimension() { } } -#[ignore] #[tokio::test(flavor = "multi_thread")] async fn test_case_switch_with_filter() { let ctx = create_context(); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_dimension_only.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_dimension_only.snap new file mode 100644 index 0000000000000..e4f4b53dddb8b --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_dimension_only.snap @@ -0,0 +1,9 @@ +--- +source: cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs +expression: result +--- +orders__status | orders__status_weighted_amount +---------------+------------------------------- +cancelled | 0.00 +completed | 1400.00 +pending | 325.000 diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_measure.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_measure.snap new file mode 100644 index 0000000000000..dd4648106605b --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_measure.snap @@ -0,0 +1,9 @@ +--- +source: cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs +expression: result +--- +orders__created_at_month | orders__status_weighted_amount +-------------------------+------------------------------- +2024-01-01 00:00:00 | 375.000 +2024-02-01 00:00:00 | 600.000 +2024-03-01 00:00:00 | 750.000 diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_dimension.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_dimension.snap new file mode 100644 index 0000000000000..8d7d4219d1478 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_dimension.snap @@ -0,0 +1,15 @@ +--- +source: cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs +expression: result +--- +orders__status | orders__created_at_month | orders__status_weighted_amount +---------------+--------------------------+------------------------------- +cancelled | 2024-01-01 00:00:00 | 0.00 +cancelled | 2024-02-01 00:00:00 | 0.00 +cancelled | 2024-03-01 00:00:00 | 0.00 +completed | 2024-01-01 00:00:00 | 300.00 +completed | 2024-02-01 00:00:00 | 500.00 +completed | 2024-03-01 00:00:00 | 600.00 +pending | 2024-01-01 00:00:00 | 75.000 +pending | 2024-02-01 00:00:00 | 100.000 +pending | 2024-03-01 00:00:00 | 150.000 diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_filter.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_filter.snap new file mode 100644 index 0000000000000..1341bf8156f4f --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/multi_stage/snapshots/cubesqlplanner__tests__integration__multi_stage__case_switch__case_switch_with_filter.snap @@ -0,0 +1,9 @@ +--- +source: cubesqlplanner/src/tests/integration/multi_stage/case_switch.rs +expression: result +--- +orders__created_at_month | orders__status_weighted_amount +-------------------------+------------------------------- +2024-01-01 00:00:00 | 300.00 +2024-02-01 00:00:00 | 500.00 +2024-03-01 00:00:00 | 600.00 From df30aadefdc33531d5ae41e4dbb9ac9f25930962 Mon Sep 17 00:00:00 2001 From: waralexrom <108349432+waralexrom@users.noreply.github.com> Date: Wed, 29 Apr 2026 12:53:55 +0200 Subject: [PATCH 2/3] chore(cubestore): Update join partitions limit hit message (#10760) --- .../cubestore/src/queryplanner/query_executor.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 5de3d4daceb66..05a5e88445c99 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -1538,9 +1538,14 @@ impl ClusterSendExec { // Budget for root partitions per batch. let chunk_size = if max == 0 || max <= right_active { - return Err(CubeError::user( - format!("Max number of right hand side join partitions limit is hit. Max limit is {}. Query requires {}. {}", max, right_active, config.max_joined_partitions_message()) - )); + let required = right_active + 1; + return Err(CubeError::user(format!( + "Max number of joined partitions per batch limit is hit. Max limit is {}. Query requires at least {} per batch (1 on the left and {} on the right). {}", + max, + required, + right_active, + config.max_joined_partitions_message() + ))); } else { (max - right_active).max(1) }; From 388607cf9737e5ddb5712e484fd8af4ac30cc4b4 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 29 Apr 2026 15:21:00 +0200 Subject: [PATCH 3/3] perf(cubesql): Avoid cloning row payload in convert_transport_response (#10772) Destructure V1LoadResult to move `data` and `last_refresh_time` out of the owned value instead of cloning. Eliminates a full copy of the row Vec on every Cube load result, reducing peak memory and allocator churn. --- rust/cubesql/cubesql/src/compile/engine/df/scan.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index 4fe6e2bbb9062..34f412f440152 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -11,7 +11,7 @@ use crate::{ }; use async_trait::async_trait; use chrono::{Datelike, NaiveDate}; -use cubeclient::models::{V1LoadRequestQuery, V1LoadResponse}; +use cubeclient::models::{V1LoadRequestQuery, V1LoadResponse, V1LoadResult}; pub use datafusion::{ arrow::{ array::{ @@ -1194,9 +1194,15 @@ pub fn convert_transport_response( response .results .into_iter() - .map(|r| { - let mut response = JsonValueObject::new(r.data.clone()); - let updated_schema = if let Some(last_refresh_time) = r.last_refresh_time.clone() { + .map(|result| { + let V1LoadResult { + data, + last_refresh_time, + .. + } = result; + + let mut response = JsonValueObject::new(data); + let updated_schema = if let Some(last_refresh_time) = last_refresh_time { let mut metadata = schema.metadata().clone(); metadata.insert("lastRefreshTime".to_string(), last_refresh_time); Arc::new(Schema::new_with_metadata(