From d0617a0158ebb504cef563f6185b096082fa9fd2 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Wed, 15 Apr 2026 13:29:02 +0200 Subject: [PATCH 1/4] feat(cubestore): Sync tables cache instead of reset (#10686) Instead of resetting the entire tables cache on every write operation, introduce CachedTables with update_table_by_id_or_reset and remove_by_table_id_or_reset methods for surgical in-place mutations. table_ready, seal_table, update_location_download_size, and drop_table now sync the cache without forcing a full reload from RocksDB. --- rust/cubestore/cubestore/src/metastore/mod.rs | 191 +++++++++++++++--- 1 file changed, 160 insertions(+), 31 deletions(-) diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 942161bfde4eb..1edda6fb859a3 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -1356,10 +1356,106 @@ impl RocksStoreDetails for RocksMetaStoreDetails { } } +pub struct CachedTables { + tables: Mutex>>>, +} + +impl CachedTables { + pub fn new() -> Self { + Self { + tables: Mutex::new(None), + } + } + + pub fn reset(&self) { + *self.tables.lock().unwrap() = None; + } + + pub fn get(&self) -> Option>> { + self.tables.lock().unwrap().clone() + } + + pub fn set(&self, tables: Arc>) { + *self.tables.lock().unwrap() = Some(tables); + } + + /// Surgically update a single entry in the cache by table ID. + /// The closure receives a mutable reference to the entry for in-place mutation. + /// If the entry is not found or the cache is not populated, resets the cache. + #[inline(always)] + pub fn update_table_by_id_or_reset(&self, table_id: u64, f: F) + where + F: FnOnce(&mut TablePath), + { + let mut guard = self.tables.lock().unwrap(); + let Some(cached) = guard.as_mut() else { + return; + }; + + // Check existence on the immutable Arc first to avoid a wasted + // deep-clone from make_mut when the entry is absent. + let Some(idx) = cached.iter().position(|tp| tp.table.get_id() == table_id) else { + log::warn!( + "Table with id: {} not found in cache, completely resetting cache", + table_id + ); + + *guard = None; + return; + }; + + let tables = Arc::make_mut(cached); + f(&mut tables[idx]); + + // Remove entry if it's no longer ready (cache only stores ready tables) + if !tables[idx].table.get_row().is_ready() { + tables.swap_remove(idx); + } + } + + pub fn upsert_table_by_id(&self, table_id: u64, entry: TablePath) { + let mut guard = self.tables.lock().unwrap(); + let Some(cached) = guard.as_mut() else { + return; + }; + + let tables = Arc::make_mut(cached); + if let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) { + tables.remove(idx); + } + + // Paranoid check + if entry.table.get_row().is_ready() { + tables.push(entry); + } else { + debug_assert!( + false, + "upsert_table_by_id called with non-ready table (id: {})", + table_id + ); + } + } + + pub fn remove_by_table_id_or_reset(&self, table_id: u64) { + let mut guard = self.tables.lock().unwrap(); + let Some(cached) = guard.as_mut() else { + return; + }; + + let tables = Arc::make_mut(cached); + let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) else { + *guard = None; + return; + }; + + tables.remove(idx); + } +} + #[derive(Clone)] pub struct RocksMetaStore { store: Arc, - cached_tables: Arc>>>>, + cached_tables: Arc, disk_space_cache: Arc, SystemTime)>>>, upload_loop: Arc, } @@ -1382,14 +1478,14 @@ impl RocksMetaStore { fn new_from_store(store: Arc) -> Arc { Arc::new(Self { store, - cached_tables: Arc::new(Mutex::new(None)), + cached_tables: Arc::new(CachedTables::new()), disk_space_cache: Arc::new(RwLock::new(None)), upload_loop: Arc::new(WorkerLoop::new("Metastore upload")), }) } pub fn reset_cached_tables(&self) { - *self.cached_tables.lock().unwrap() = None; + self.cached_tables.reset(); } pub async fn load_from_dump( @@ -1917,7 +2013,7 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("create_schema", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); if if_not_exists { @@ -1980,7 +2076,7 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("rename_schema", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); let existing_keys = @@ -2006,7 +2102,7 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("rename_schema_by_id", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); @@ -2024,7 +2120,7 @@ impl MetaStore for RocksMetaStore { async fn delete_schema(&self, schema_name: String) -> Result<(), CubeError> { self.write_operation("delete_schema", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); let existing_keys = @@ -2053,7 +2149,7 @@ impl MetaStore for RocksMetaStore { async fn delete_schema_by_id(&self, schema_id: u64) -> Result<(), CubeError> { self.write_operation("delete_schema_by_id", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let tables = TableRocksTable::new(db_ref.clone()).all_rows()?; if tables @@ -2120,13 +2216,15 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("create_table", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); + if drop_if_exists { if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) { RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?; } } + let rocks_table = TableRocksTable::new(db_ref.clone()); let rocks_index = IndexRocksTable::new(db_ref.clone()); let rocks_schema = SchemaRocksTable::new(db_ref.clone()); @@ -2316,11 +2414,25 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn table_ready(&self, id: u64, is_ready: bool) -> Result, CubeError> { self.write_operation("table_ready", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; - }); let rocks_table = TableRocksTable::new(db_ref.clone()); - Ok(rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?) + let entry = + rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?; + + if is_ready { + let schema = SchemaRocksTable::new(db_ref) + .get_row_or_not_found(entry.get_row().get_schema_id())?; + let table_path = TablePath::new(Arc::new(schema), entry.clone()); + + batch_pipe.set_post_commit_callback(move |metastore| { + metastore.cached_tables.upsert_table_by_id(id, table_path); + }); + } else { + batch_pipe.set_post_commit_callback(move |metastore| { + metastore.cached_tables.remove_by_table_id_or_reset(id); + }); + } + + Ok(entry) }) .await } @@ -2328,11 +2440,19 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn seal_table(&self, id: u64) -> Result, CubeError> { self.write_operation("seal_table", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; - }); let rocks_table = TableRocksTable::new(db_ref.clone()); - Ok(rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?) + let entry = rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?; + + let table_to_move = entry.get_row().clone(); + batch_pipe.set_post_commit_callback(move |metastore| { + metastore + .cached_tables + .update_table_by_id_or_reset(id, |tp| { + tp.table = IdRow::new(tp.table.get_id(), table_to_move); + }); + }); + + Ok(entry) }) .await } @@ -2359,16 +2479,24 @@ impl MetaStore for RocksMetaStore { self.write_operation( "update_location_download_size", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; - }); - let rocks_table = TableRocksTable::new(db_ref.clone()); - rocks_table.update_with_res_fn( + let entry = rocks_table.update_with_res_fn( id, |r| r.update_location_download_size(&location, download_size), batch_pipe, - ) + )?; + + let table_to_move = entry.get_row().clone(); + + batch_pipe.set_post_commit_callback(move |metastore| { + metastore + .cached_tables + .update_table_by_id_or_reset(id, |tp| { + tp.table = IdRow::new(tp.table.get_id(), table_to_move); + }); + }); + + Ok(entry) }, ) .await @@ -2423,18 +2551,17 @@ impl MetaStore for RocksMetaStore { } else { let cache = self.cached_tables.clone(); - if let Some(t) = cube_ext::spawn_blocking(move || cache.lock().unwrap().clone()).await? - { + if let Some(t) = cube_ext::spawn_blocking(move || cache.get()).await? { return Ok(t); } let cache = self.cached_tables.clone(); // Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped self.read_operation("get_tables_with_path", move |db_ref| { - let cached_tables = { cache.lock().unwrap().clone() }; - if let Some(t) = cached_tables { + if let Some(t) = cache.get() { return Ok(t); } + let table_rocks_table = TableRocksTable::new(db_ref.clone()); let mut tables = Vec::new(); for t in table_rocks_table.scan_all_rows()? { @@ -2450,8 +2577,7 @@ impl MetaStore for RocksMetaStore { |table, schema| TablePath::new(schema, table), )?); - let to_cache = tables.clone(); - *cache.lock().unwrap() = Some(to_cache); + cache.set(tables.clone()); Ok(tables) }) @@ -2493,9 +2619,12 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn drop_table(&self, table_id: u64) -> Result, CubeError> { self.write_operation("drop_table", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + batch_pipe.set_post_commit_callback(move |metastore| { + metastore + .cached_tables + .remove_by_table_id_or_reset(table_id); }); + RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe) }) .await From 78f80c9c12b4fed628dd3b3b9678d86472d14109 Mon Sep 17 00:00:00 2001 From: waralexrom <108349432+waralexrom@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:16:40 +0200 Subject: [PATCH 2/4] fix(tesseract): Issue with multistage rolling window without time dimension (#10691) --- .../multi_stage/multi_stage_query_planner.rs | 19 +++++--- .../common/integration_rolling_window.yaml | 13 ++++++ ...ations__to_date_avg_no_time_dimension.snap | 7 +++ ...ns__to_date_no_granularity_multistage.snap | 7 +++ .../rolling_window/to_date_variations.rs | 44 ++++++++++++++++++- 5 files changed, 82 insertions(+), 8 deletions(-) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_avg_no_time_dimension.snap create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_no_granularity_multistage.snap diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs index aa8d17200a1c3..2acd2e109cf33 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/multi_stage_query_planner.rs @@ -473,15 +473,21 @@ impl MultiStageQueryPlanner { } } + let base_member = MemberSymbol::new_measure(measure.new_unrolling()); + if time_dimensions.is_empty() { let base_state = self.replace_date_range_for_rolling_window(&rolling_window, state.clone())?; - let rolling_base = self.add_rolling_window_base( - member.clone(), - base_state, - ungrouped, - descriptions, - )?; + let rolling_base = if !measure.is_multi_stage() { + self.add_rolling_window_base(base_member, base_state, false, descriptions)? + } else { + self.make_queries_descriptions( + base_member, + base_state, + descriptions, + resolved_multi_stage_dimensions, + )? + }; return Ok(Some(rolling_base)); } let uniq_time_dimensions = time_dimensions @@ -504,7 +510,6 @@ impl MultiStageQueryPlanner { &rolling_window, state.clone(), )?; - let base_member = MemberSymbol::new_measure(measure.new_unrolling()); let time_series = self.add_time_series(time_dimension.clone(), state.clone(), descriptions)?; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_rolling_window.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_rolling_window.yaml index 5e93e06aeb7c3..b1c81941710a6 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_rolling_window.yaml +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/integration_rolling_window.yaml @@ -85,6 +85,19 @@ cubes: rolling_window: type: to_date granularity: month + - name: rolling_avg_to_date + type: avg + sql: amount + rolling_window: + type: to_date + granularity: month + - name: rolling_sum_to_date_multistage + multi_stage: true + type: sum + sql: "{total_amount}" + rolling_window: + type: to_date + granularity: month # Cat 2 — different aggregation types (all trailing 7 day) - name: rolling_count_7d type: count diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_avg_no_time_dimension.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_avg_no_time_dimension.snap new file mode 100644 index 0000000000000..3fd21de02538d --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_avg_no_time_dimension.snap @@ -0,0 +1,7 @@ +--- +source: cubesqlplanner/src/tests/integration/rolling_window/to_date_variations.rs +expression: result +--- +orders__rolling_avg_to_date +--------------------------- +137.7500000000000000 diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_no_granularity_multistage.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_no_granularity_multistage.snap new file mode 100644 index 0000000000000..391feafbb17bf --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/snapshots/cubesqlplanner__tests__integration__rolling_window__to_date_variations__to_date_no_granularity_multistage.snap @@ -0,0 +1,7 @@ +--- +source: cubesqlplanner/src/tests/integration/rolling_window/to_date_variations.rs +expression: result +--- +orders__rolling_sum_to_date_multistage +-------------------------------------- +2755.00 diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/to_date_variations.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/to_date_variations.rs index 96421862e43df..5a341fde73bc0 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/to_date_variations.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/integration/rolling_window/to_date_variations.rs @@ -166,7 +166,49 @@ async fn test_to_date_no_granularity() { "#}; let result = ctx.build_sql(query); - // to_date without query granularity — may work or error + match result { + Ok(_sql) => { + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } + } + Err(e) => { + insta::assert_snapshot!("to_date_no_granularity_error", e.to_string()); + } + } +} +#[tokio::test(flavor = "multi_thread")] +async fn test_to_date_no_granularity_multistage() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.rolling_sum_to_date_multistage + "#}; + + let result = ctx.build_sql(query); + match result { + Ok(_sql) => { + if let Some(result) = ctx.try_execute_pg(query, SEED).await { + insta::assert_snapshot!(result); + } + } + Err(e) => { + insta::assert_snapshot!("to_date_no_granularity_error", e.to_string()); + } + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_to_date_avg_no_time_dimension() { + let ctx = create_context(); + + let query = indoc! {r#" + measures: + - orders.rolling_avg_to_date + "#}; + + let result = ctx.build_sql(query); match result { Ok(_sql) => { if let Some(result) = ctx.try_execute_pg(query, SEED).await { From 9ba289732c5d6c2545ce3ce086003d1caea3dd58 Mon Sep 17 00:00:00 2001 From: Konstantin Burkalev Date: Wed, 15 Apr 2026 15:18:45 +0300 Subject: [PATCH 3/4] feat(api-gateway): Introduce Query format Convertion API (#10286) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR introduces new API endpoint /v1/convert that allows you to convert queries between different formats. Right now only sql→rest conversion is implemented, but this might be extended in the future. --------- Co-authored-by: Dmitry Patsura Co-authored-by: claude[bot] <209825114+claude[bot]@users.noreply.github.com> --- packages/cubejs-api-gateway/src/gateway.ts | 38 ++++ packages/cubejs-api-gateway/src/sql-server.ts | 6 + .../cubejs-api-gateway/src/types/request.ts | 12 ++ packages/cubejs-backend-native/js/index.ts | 13 ++ packages/cubejs-backend-native/src/lib.rs | 1 + .../cubejs-backend-native/src/node_export.rs | 2 + .../cubejs-backend-native/src/rest4sql.rs | 175 ++++++++++++++++++ .../birdbox-fixtures/smoke/schema/Orders.js | 14 +- .../__snapshots__/smoke-cubesql.test.ts.snap | 84 +++++++++ .../cubejs-testing/test/smoke-cubesql.test.ts | 55 +++++- 10 files changed, 394 insertions(+), 6 deletions(-) create mode 100644 packages/cubejs-backend-native/src/rest4sql.rs diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 8b8bb95adfbe0..049c4e30391bc 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -56,6 +56,7 @@ import { SqlApiRequest, MetaResponseResultFn, RequestQuery, + QueryConvertRequest, } from './types/request'; import { CheckAuthInternalOptions, @@ -418,6 +419,14 @@ class ApiGateway { }); })); + app.post(`${this.basePath}/v1/convert-query`, jsonParser, userMiddlewares, userAsyncHandler(async (req, res) => { + await this.convertQuery({ + payload: req.body, + context: req.context, + res: this.resToResultFn(res) + }); + })); + /** ************************************************************** * meta scope * *************************************************************** */ @@ -1601,6 +1610,35 @@ class ApiGateway { }; } + protected async convertQuery({ payload, context, res }: QueryConvertRequest) { + try { + await this.assertApiScope('sql', context.securityContext); + + if (payload.input !== 'sql') { + throw new Error(`Unexpected input parameter value '${payload.input}'`); + } + + if (payload.output !== 'rest') { + throw new Error(`Unexpected output parameter value '${payload.output}'`); + } + + if (typeof payload.query !== 'string' || !payload.query.trim()) { + throw new Error('query parameter must be a non-empty string'); + } + + const result = await this.sqlServer.rest4sql(payload.query, context.securityContext); + + await res(result); + } catch (e: any) { + this.handleError({ + e, + context, + query: payload, + res, + }); + } + } + protected async dryRun({ query, context, res }: QueryRequest) { const requestStarted = new Date(); diff --git a/packages/cubejs-api-gateway/src/sql-server.ts b/packages/cubejs-api-gateway/src/sql-server.ts index fe59c4bfdc713..28d2cc94c0a44 100644 --- a/packages/cubejs-api-gateway/src/sql-server.ts +++ b/packages/cubejs-api-gateway/src/sql-server.ts @@ -4,10 +4,12 @@ import { shutdownInterface, execSql, sql4sql, + rest4sql, SqlInterfaceInstance, Request as NativeRequest, LoadRequestMeta, Sql4SqlResponse, + QueryConvertResponse, } from '@cubejs-backend/native'; import type { ShutdownMode } from '@cubejs-backend/native'; import { displayCLIWarning, getEnv, CacheMode } from '@cubejs-backend/shared'; @@ -82,6 +84,10 @@ export class SQLServer { return sql4sql(this.getSqlInterfaceInstance(), sqlQuery, disablePostProcessing, securityContext); } + public async rest4sql(sqlQuery: string, securityContext?: unknown): Promise { + return rest4sql(this.getSqlInterfaceInstance(), sqlQuery, securityContext); + } + protected buildCheckSqlAuth(options: SQLServerOptions): CheckSQLAuthFn { return (options.checkSqlAuth && this.wrapCheckSqlAuthFn(options.checkSqlAuth)) || this.createDefaultCheckSqlAuthFn(options); diff --git a/packages/cubejs-api-gateway/src/types/request.ts b/packages/cubejs-api-gateway/src/types/request.ts index 024352615bba7..c7804f84a5762 100644 --- a/packages/cubejs-api-gateway/src/types/request.ts +++ b/packages/cubejs-api-gateway/src/types/request.ts @@ -142,6 +142,16 @@ type QueryRequest = BaseRequest & { cacheMode?: CacheMode; }; +type ConvertQuery = { + input: 'sql'; + output: 'rest'; + query: string; +}; + +type QueryConvertRequest = BaseRequest & { + payload: ConvertQuery; +}; + type SqlApiRequest = BaseRequest & { query: Record; sqlQuery?: [string, string[]]; @@ -215,6 +225,7 @@ type PreAggJobStatusResponse = | PreAggJobStatusObject; export { + ConvertQuery, RequestContext, RequestExtension, ExtendedRequestContext, @@ -228,6 +239,7 @@ export { BaseRequest, RequestQuery, QueryRequest, + QueryConvertRequest, PreAggsJobsRequest, PreAggsSelector, PreAggJob, diff --git a/packages/cubejs-backend-native/js/index.ts b/packages/cubejs-backend-native/js/index.ts index 9abdde2e3b17a..df6989e3bdc95 100644 --- a/packages/cubejs-backend-native/js/index.ts +++ b/packages/cubejs-backend-native/js/index.ts @@ -161,8 +161,15 @@ export type Sql4SqlCommon = { pushdown: boolean; } }; + export type Sql4SqlResponse = Sql4SqlCommon & (Sql4SqlOk | Sql4SqlError); +export type QueryConvertResponse = { + status: string; + query: any; + error?: string; +}; + let loadedNative: any = null; export function loadNative() { @@ -451,6 +458,12 @@ export const sql4sql = async (instance: SqlInterfaceInstance, sqlQuery: string, return native.sql4sql(instance, sqlQuery, disablePostProcessing, securityContext ? JSON.stringify(securityContext) : null); }; +export const rest4sql = async (instance: SqlInterfaceInstance, sqlQuery: string, securityContext?: unknown): Promise => { + const native = loadNative(); + + return native.rest4sql(instance, sqlQuery, securityContext ? JSON.stringify(securityContext) : null); +}; + export const buildSqlAndParams = (cubeEvaluator: any): any[] => { const native = loadNative(); return native.buildSqlAndParams(cubeEvaluator); diff --git a/packages/cubejs-backend-native/src/lib.rs b/packages/cubejs-backend-native/src/lib.rs index 6f16a6ec3777d..34c1188933f2d 100644 --- a/packages/cubejs-backend-native/src/lib.rs +++ b/packages/cubejs-backend-native/src/lib.rs @@ -16,6 +16,7 @@ pub mod node_obj_serializer; pub mod orchestrator; #[cfg(feature = "python")] pub mod python; +pub mod rest4sql; pub mod sql4sql; pub mod stream; pub mod template; diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 79f03263252be..f1cfe06ed1d18 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -16,6 +16,7 @@ use crate::config::{NodeConfiguration, NodeConfigurationFactoryOptions, NodeCube use crate::cross::CLRepr; use crate::cubesql_utils::with_session; use crate::logger::NodeBridgeLogger; +use crate::rest4sql::rest4sql; use crate::sql4sql::sql4sql; use crate::stream::OnDrainHandler; use crate::tokio_runtime_node; @@ -788,6 +789,7 @@ pub fn register_module_exports( cx.export_function("shutdownInterface", shutdown_interface)?; cx.export_function("execSql", exec_sql)?; cx.export_function("sql4sql", sql4sql)?; + cx.export_function("rest4sql", rest4sql)?; cx.export_function("isFallbackBuild", is_fallback_build)?; cx.export_function("__js_to_clrepr_to_js", debug_js_to_clrepr_to_js)?; diff --git a/packages/cubejs-backend-native/src/rest4sql.rs b/packages/cubejs-backend-native/src/rest4sql.rs new file mode 100644 index 0000000000000..fc9893bc48490 --- /dev/null +++ b/packages/cubejs-backend-native/src/rest4sql.rs @@ -0,0 +1,175 @@ +use std::sync::Arc; + +use neon::prelude::*; +use serde_json; + +use crate::auth::NativeSQLAuthContext; +use crate::config::NodeCubeServices; +use crate::cubesql_utils::with_session; +use crate::tokio_runtime_node; +use crate::utils::NonDebugInRelease; +use cubesql::compile::convert_sql_to_cube_query; +use cubesql::compile::datafusion::logical_plan::LogicalPlan; +use cubesql::compile::engine::df::scan::CubeScanNode; +use cubesql::transport::TransportLoadRequestQuery; +use cubesql::CubeError; + +fn json_value_to_js<'ctx>( + cx: &mut impl Context<'ctx>, + value: &serde_json::Value, +) -> JsResult<'ctx, JsValue> { + match value { + serde_json::Value::Null => Ok(cx.null().upcast()), + serde_json::Value::Bool(b) => Ok(cx.boolean(*b).upcast()), + serde_json::Value::Number(n) => { + if let Some(i) = n.as_i64() { + Ok(cx.number(i as f64).upcast()) + } else if let Some(f) = n.as_f64() { + Ok(cx.number(f).upcast()) + } else { + cx.throw_error("Number conversion failed") + } + } + serde_json::Value::String(s) => Ok(cx.string(s).upcast()), + serde_json::Value::Array(arr) => { + let js_array = cx.empty_array(); + for (i, item) in arr.iter().enumerate() { + let js_value = json_value_to_js(cx, item)?; + js_array.set(cx, i as u32, js_value)?; + } + Ok(js_array.upcast()) + } + serde_json::Value::Object(obj) => { + let js_obj = cx.empty_object(); + for (key, val) in obj.iter() { + let js_value = json_value_to_js(cx, val)?; + js_obj.set(cx, key.as_str(), js_value)?; + } + Ok(js_obj.upcast()) + } + } +} + +#[derive(Debug)] +enum Rest4SqlResponse { + Ok { + status: String, + query: Box, + }, + Error { + status: String, + error: String, + }, +} + +impl Rest4SqlResponse { + pub fn to_js<'ctx>(&self, cx: &mut impl Context<'ctx>) -> JsResult<'ctx, JsObject> { + let obj = cx.empty_object(); + + match &self { + Rest4SqlResponse::Ok { status, query } => { + let status = cx.string(status); + obj.set(cx, "status", status)?; + + let query_json = serde_json::to_value(query) + .or_else(|e| cx.throw_error(format!("Failed to serialize query: {}", e)))?; + let query_js = json_value_to_js(cx, &query_json)?; + obj.set(cx, "query", query_js)?; + } + Rest4SqlResponse::Error { error, status } => { + let status = cx.string(status); + obj.set(cx, "status", status)?; + + let error = cx.string(error); + obj.set(cx, "error", error)?; + } + } + + Ok(obj) + } +} + +async fn handle_rest4sql_query( + services: Arc, + native_auth_ctx: Arc, + sql_query: &str, +) -> Result { + with_session(&services, native_auth_ctx.clone(), |session| async move { + let transport = session.server.transport.clone(); + let meta_context = transport + .meta(native_auth_ctx) + .await + .map_err(|err| CubeError::internal(format!("Failed to get meta context: {err}")))?; + let query_plan = + convert_sql_to_cube_query(sql_query, meta_context.clone(), session.clone()).await?; + let logical_plan = query_plan.try_as_logical_plan()?; + + match logical_plan { + LogicalPlan::Extension(extension) => { + if let Some(cube_scan) = extension.node.as_any().downcast_ref::() { + return Ok(Rest4SqlResponse::Ok { + status: "ok".to_string(), + query: Box::new(cube_scan.request.clone()), + }); + } + + Ok(Rest4SqlResponse::Error { + status: "error".to_string(), + error: "Provided sql query can not be converted to rest query.".to_string(), + }) + } + _ => Ok(Rest4SqlResponse::Error { + status: "error".to_string(), + error: "Provided sql query can not be converted to rest query.".to_string(), + }), + } + }) + .await +} + +pub fn rest4sql(mut cx: FunctionContext) -> JsResult { + let interface = cx.argument::>(0)?; + let sql_query = cx.argument::(1)?.value(&mut cx); + + let security_context: Option = match cx.argument::(2) { + Ok(string) => match string.downcast::(&mut cx) { + Ok(v) => v.value(&mut cx).parse::().ok(), + Err(_) => None, + }, + Err(_) => None, + }; + + let services = interface.services.clone(); + let runtime = tokio_runtime_node(&mut cx)?; + + let channel = cx.channel(); + + let native_auth_ctx = Arc::new(NativeSQLAuthContext { + user: Some(String::from("unknown")), + superuser: false, + security_context: NonDebugInRelease::from(security_context), + }); + + let (deferred, promise) = cx.promise(); + + // Note: if the spawned task panics or is aborted before settling, + // Neon's Drop implementation for Deferred automatically rejects the promise on the JS side. + runtime.spawn(async move { + let result = handle_rest4sql_query(services, native_auth_ctx, &sql_query).await; + + if let Err(err) = deferred.try_settle_with(&channel, move |mut cx| { + // `neon::result::ResultExt` is implemented only for Result, even though Ok variant is not touched + let response = result.or_else(|err| cx.throw_error(err.to_string()))?; + let response = response.to_js(&mut cx)?; + Ok(response) + }) { + // There is not much we can do at this point + // TODO lift this error to task => JoinHandle => JS watchdog + log::error!( + "Unable to settle JS promise from tokio task, try_settle_with failed, err: {err}" + ); + } + }); + + Ok(promise.upcast::()) +} diff --git a/packages/cubejs-testing/birdbox-fixtures/smoke/schema/Orders.js b/packages/cubejs-testing/birdbox-fixtures/smoke/schema/Orders.js index bea02baa829c3..de3f521566d0d 100644 --- a/packages/cubejs-testing/birdbox-fixtures/smoke/schema/Orders.js +++ b/packages/cubejs-testing/birdbox-fixtures/smoke/schema/Orders.js @@ -1,14 +1,14 @@ cube(`Orders`, { sql: ` - select 1 as id, 100 as amount, 'new' status + select 1 as id, 100 as amount, 'new' status, CAST('2025-01-01' AS DATE) AS createdAt, UNION ALL - select 2 as id, 200 as amount, 'new' status + select 2 as id, 200 as amount, 'new' status, CAST('2025-01-02' AS DATE) AS createdAt, UNION ALL - select 3 as id, 300 as amount, 'processed' status + select 3 as id, 300 as amount, 'processed' status, CAST('2025-01-03' AS DATE) AS createdAt, UNION ALL - select 4 as id, 500 as amount, 'processed' status + select 4 as id, 500 as amount, 'processed' status, CAST('2025-01-04' AS DATE) AS createdAt, UNION ALL - select 5 as id, 600 as amount, 'shipped' status + select 5 as id, 600 as amount, 'shipped' status, CAST('2025-01-05' AS DATE) AS createdAt, `, measures: { @@ -29,5 +29,9 @@ cube(`Orders`, { sql: `status`, type: `string`, }, + createdAt: { + sql: `createdAt`, + type: `time`, + }, }, }); diff --git a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap index 886271f355aaf..8dc04ced96c68 100644 --- a/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap +++ b/packages/cubejs-testing/test/__snapshots__/smoke-cubesql.test.ts.snap @@ -1,5 +1,89 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP +exports[`SQL API Cube SQL over HTTP Query convert API regular query 1`] = ` +Object { + "body": Object { + "query": Object { + "dimensions": Array [], + "measures": Array [ + "Orders.totalAmount", + ], + "order": Array [], + "segments": Array [], + }, + "status": "ok", + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP Query convert API regular query with filter 1`] = ` +Object { + "body": Object { + "query": Object { + "dimensions": Array [], + "filters": Array [ + Object { + "member": "Orders.status", + "operator": "equals", + "values": Array [ + "foo", + ], + }, + ], + "measures": Array [ + "Orders.totalAmount", + ], + "order": Array [], + "segments": Array [], + }, + "status": "ok", + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP Query convert API regular query with time dimension filter 1`] = ` +Object { + "body": Object { + "query": Object { + "dimensions": Array [ + "Orders.status", + ], + "measures": Array [], + "order": Array [], + "segments": Array [], + "timeDimensions": Array [ + Object { + "dateRange": Array [ + "2024-01-01T00:00:00.001Z", + "2025-12-31T23:59:59.999Z", + ], + "dimension": "Orders.createdAt", + }, + ], + "ungrouped": true, + }, + "status": "ok", + }, + "status": 200, + "statusText": "OK", +} +`; + +exports[`SQL API Cube SQL over HTTP Query convert API wrapper with parameters 1`] = ` +Object { + "body": Object { + "error": "Provided sql query can not be converted to rest query.", + "status": "error", + }, + "status": 200, + "statusText": "OK", +} +`; + exports[`SQL API Cube SQL over HTTP sql4sql double aggregation post-processing 1`] = ` Object { "body": Object { diff --git a/packages/cubejs-testing/test/smoke-cubesql.test.ts b/packages/cubejs-testing/test/smoke-cubesql.test.ts index 04e614cbc8101..1dc83840f00f4 100644 --- a/packages/cubejs-testing/test/smoke-cubesql.test.ts +++ b/packages/cubejs-testing/test/smoke-cubesql.test.ts @@ -341,6 +341,59 @@ describe('SQL API', () => { `)).toMatchSnapshot(); }); }); + + describe('Query convert API', () => { + async function generateSql(query: string) { + const response = await fetch(`${birdbox.configuration.apiUrl}/convert-query`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: token, + }, + body: JSON.stringify({ + input: 'sql', + output: 'rest', + query, + }), + }); + const { status, statusText } = response; + const body = await response.json(); + + // To stabilize responses + delete body.requestId; + + return { + status, + statusText, + body, + }; + } + + it('regular query', async () => { + expect(await generateSql('SELECT SUM(totalAmount) AS total FROM Orders;')).toMatchSnapshot(); + }); + + it('regular query with filter', async () => { + expect(await generateSql('SELECT SUM(totalAmount) AS total FROM Orders WHERE status = \'foo\';')).toMatchSnapshot(); + }); + + it('regular query with time dimension filter', async () => { + expect(await generateSql(` + SELECT status + FROM Orders + WHERE createdAt > CAST('2024-01-01' AS DATE) and createdAt < CAST('2026-01-01' AS DATE) + `)).toMatchSnapshot(); + }); + + it('wrapper with parameters', async () => { + expect(await generateSql(` + SELECT + SUM(totalAmount) AS total + FROM Orders + WHERE LOWER(status) = 'foo' + `)).toMatchSnapshot(); + }); + }); }); describe('Postgres (Auth)', () => { @@ -998,7 +1051,7 @@ filter_subq AS ( COUNT(DISTINCT CASE WHEN status = 'shipped' THEN orderCount END) AS new_count_distinct - + /* Works but testing Postgres does not include "hll_hash_any" function APPROX_DISTINCT(CASE WHEN status = 'shipped' THEN approxOrderCount From 9f6d48972905b9d06828fb29ca2fd724b83cde93 Mon Sep 17 00:00:00 2001 From: "mintlify[bot]" <109931778+mintlify[bot]@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:32:02 +0200 Subject: [PATCH 4/4] docs: Add /v1/convert-query endpoint to REST API reference (#10689) --------- Co-Authored-By: Igor Lukanin Co-authored-by: mintlify[bot] <109931778+mintlify[bot]@users.noreply.github.com> Co-authored-by: Dmitry Patsura --- .../core-data-apis/rest-api/reference.mdx | 65 ++++++++++++++++++ .../core-data-apis/rest-api/reference.mdx | 66 +++++++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx b/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx index 92a802a0ddc64..3aed19103d90f 100644 --- a/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx +++ b/docs-mintlify/reference/core-data-apis/rest-api/reference.mdx @@ -243,6 +243,71 @@ Response: } ``` +## `{base_path}/v1/convert-query` + +Takes an API query in the specified input format and converts it to the specified +output format. Currently, only conversion from [SQL API](/reference/core-data-apis/sql-api) queries to +[REST API](/reference/core-data-apis/rest-api) queries is supported. + +This endpoint is useful for translating SQL API queries into equivalent REST API +queries that can be used with the [`/v1/load`](#base_path%2Fv1%2Fload) endpoint. + +Request parameters (JSON body): + +| Parameter, type | Description | Required | +| --- | --- | --- | +| `input`, `string` | Input query format. Use `sql` for [SQL API](/reference/core-data-apis/sql-api) queries | ✅ Yes | +| `output`, `string` | Output query format. Use `rest` for [REST API](/reference/core-data-apis/rest-api) queries | ✅ Yes | +| `query`, `string` | Input query in the specified format | ✅ Yes | + +The response will contain a JSON object with the following properties: + +| Property, type | Description | +| --- | --- | +| `status`, `string` | Query conversion status, `ok` or `error` | +| `query`, `object` | Converted query in the [REST API query format](/reference/core-data-apis/rest-api/query-format) (only present when `status` is `ok`) | +| `error`, `string` | Error message (only present when `status` is `error`) | + +An error will be returned if the input query can't be converted to the specified +output format, e.g., if the SQL API query requires post-processing on top of +REST API capabilities or if the SQL API query results in multiple REST API queries. + +### Example + +Request to convert a SQL API query with a filter: + +```bash +curl \ + -X POST \ + -H "Authorization: TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"input": "sql", "output": "rest", "query": "SELECT MEASURE(total_amount) FROM orders WHERE status = '\''foo'\''"}' \ + http://localhost:4000/cubejs-api/v1/convert-query +``` + +Successful response: + +```json +{ + "status": "ok", + "query": { + "measures": [ + "orders.total_amount" + ], + "dimensions": [], + "filters": [ + { + "member": "orders.status", + "operator": "equals", + "values": ["foo"] + } + ], + "segments": [], + "order": [] + } +} +``` + ## `{base_path}/v1/meta` diff --git a/docs/content/product/apis-integrations/core-data-apis/rest-api/reference.mdx b/docs/content/product/apis-integrations/core-data-apis/rest-api/reference.mdx index 592e04f7a029c..872ad2012bc8f 100644 --- a/docs/content/product/apis-integrations/core-data-apis/rest-api/reference.mdx +++ b/docs/content/product/apis-integrations/core-data-apis/rest-api/reference.mdx @@ -242,6 +242,71 @@ Response: } ``` +## `{base_path}/v1/convert-query` + +Takes an API query in the specified input format and converts it to the specified +output format. Currently, only conversion from [SQL API][ref-sql-api] queries to +[REST API][ref-rest-api] queries is supported. + +This endpoint is useful for translating SQL API queries into equivalent REST API +queries that can be used with the [`/v1/load`](#base_pathv1load) endpoint. + +Request parameters (JSON body): + +| Parameter, type | Description | Required | +| --- | --- | --- | +| `input`, `string` | Input query format. Use `sql` for [SQL API][ref-sql-api] queries | ✅ Yes | +| `output`, `string` | Output query format. Use `rest` for [REST API][ref-rest-api] queries | ✅ Yes | +| `query`, `string` | Input query in the specified format | ✅ Yes | + +The response will contain a JSON object with the following properties: + +| Property, type | Description | +| --- | --- | +| `status`, `string` | Query conversion status, `ok` or `error` | +| `query`, `object` | Converted query in the [REST API query format][ref-rest-api-query-format] (only present when `status` is `ok`) | +| `error`, `string` | Error message (only present when `status` is `error`) | + +An error will be returned if the input query can't be converted to the specified +output format, e.g., if the SQL API query requires post-processing on top of +REST API capabilities or if the SQL API query results in multiple REST API queries. + +### Example + +Request to convert a SQL API query with a filter: + +```bash +curl \ + -X POST \ + -H "Authorization: TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"input": "sql", "output": "rest", "query": "SELECT MEASURE(total_amount) FROM orders WHERE status = '\''foo'\''"}' \ + http://localhost:4000/cubejs-api/v1/convert-query +``` + +Successful response: + +```json +{ + "status": "ok", + "query": { + "measures": [ + "orders.total_amount" + ], + "dimensions": [], + "filters": [ + { + "member": "orders.status", + "operator": "equals", + "values": ["foo"] + } + ], + "segments": [], + "order": [] + } +} +``` + ## `{base_path}/v1/meta` @@ -889,6 +954,7 @@ Keep-Alive: timeout=5 [ref-datasources]: /product/configuration/advanced/multiple-data-sources [ref-sql-api]: /product/apis-integrations/sql-api [ref-rest-api]: /product/apis-integrations/rest-api +[ref-rest-api-query-format]: /product/apis-integrations/rest-api/query-format [ref-regular-queries]: /product/apis-integrations/queries#regular-query [ref-query-wpp]: /product/apis-integrations/queries#query-with-post-processing [ref-query-wpd]: /product/apis-integrations/queries#query-with-pushdown