From 43b3fd5450c2884a2660164f1f1e91180088b05e Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 14 Apr 2026 16:20:11 +0200 Subject: [PATCH 1/2] =?UTF-8?q?refactor(cubestore):=20Replace=20invalidate?= =?UTF-8?q?=5Ftables=5Fcache=20flag=20with=20generi=E2=80=A6=20(#10683)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a generic `set_post_commit_callback` on `BatchPipe` that fires after a successful RocksDB commit on the RW-loop thread. This replaces the hard-coded `invalidate_tables_cache` boolean flag, which was a hack that also ran before the actual commit. - `BatchPipe` is now generic over a store type `S` (defaults to `()`), allowing callbacks to receive a typed store reference - `RocksMetaStore::write_operation` passes `&RocksMetaStore` to callbacks, giving them direct access to `cached_tables` and other metastore state - Moved `cached_tables` from `RocksStore` to `RocksMetaStore` where it belongs - Cache invalidation now only happens when the commit actually succeeds - `RocksTable` methods are generic over `S` so they work with any `BatchPipe` --- .../src/cachestore/cache_rocksstore.rs | 4 +- rust/cubestore/cubestore/src/lib.rs | 6 + rust/cubestore/cubestore/src/metastore/mod.rs | 994 ++++++++---------- .../cubestore/src/metastore/rocks_store.rs | 54 +- .../cubestore/src/metastore/rocks_table.rs | 44 +- 5 files changed, 486 insertions(+), 616 deletions(-) diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index 7055d7633964d..c5af7672b2674 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -498,14 +498,14 @@ impl RocksCacheStore { f: F, ) -> Result where - F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result + F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a>) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, { self.store - .write_operation_impl::(&self.rw_loop_queue_cf, op_name, f) + .write_operation_impl::(&self.rw_loop_queue_cf, op_name, f, ()) .await } diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 791e75ef74c20..f4e862298bea8 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -214,6 +214,12 @@ impl From for CubeError { } } +impl From for CubeError { + fn from(v: regex::Error) -> Self { + CubeError::from_error(v) + } +} + impl From for CubeError { fn from(v: ParquetError) -> Self { CubeError::from_error(v.to_string()) diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 7be2c3d4e2284..942161bfde4eb 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -26,6 +26,7 @@ use cuberockstore::rocksdb::{BlockBasedOptions, Cache, Env, MergeOperands, Optio use log::info; use serde::{Deserialize, Serialize}; use std::hash::Hash; +use std::sync::Mutex; use std::{env, io::Cursor, sync::Arc}; use crate::config::injection::DIService; @@ -1355,8 +1356,10 @@ impl RocksStoreDetails for RocksMetaStoreDetails { } } +#[derive(Clone)] pub struct RocksMetaStore { store: Arc, + cached_tables: Arc>>>>, disk_space_cache: Arc, SystemTime)>>>, upload_loop: Arc, } @@ -1379,13 +1382,14 @@ impl RocksMetaStore { fn new_from_store(store: Arc) -> Arc { Arc::new(Self { store, + cached_tables: Arc::new(Mutex::new(None)), disk_space_cache: Arc::new(RwLock::new(None)), upload_loop: Arc::new(WorkerLoop::new("Metastore upload")), }) } pub fn reset_cached_tables(&self) { - *self.store.cached_tables.lock().unwrap() = None; + *self.cached_tables.lock().unwrap() = None; } pub async fn load_from_dump( @@ -1512,19 +1516,24 @@ impl RocksMetaStore { #[inline(always)] pub async fn write_operation(&self, op_name: &'static str, f: F) -> Result where - F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result + F: for<'a> FnOnce( + DbTableRef<'a>, + &mut BatchPipe<'a, RocksMetaStore>, + ) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, { - self.store.write_operation(op_name, f).await + self.store + .write_operation_impl(&self.store.rw_loop_default_cf, op_name, f, self.clone()) + .await } fn drop_table_impl( table_id: u64, db_ref: DbTableRef, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, RocksMetaStore>, ) -> Result, CubeError> { let tables_table = TableRocksTable::new(db_ref.clone()); let indexes_table = IndexRocksTable::new(db_ref.clone()); @@ -1555,7 +1564,7 @@ impl RocksMetaStore { impl RocksMetaStore { fn add_index( - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, RocksMetaStore>, rocks_index: &IndexRocksTable, rocks_partition: &PartitionRocksTable, table_cols: &Vec, @@ -1586,7 +1595,7 @@ impl RocksMetaStore { } } fn add_regular_index( - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, RocksMetaStore>, rocks_index: &IndexRocksTable, rocks_partition: &PartitionRocksTable, table_cols: &Vec, @@ -1734,7 +1743,7 @@ impl RocksMetaStore { } fn add_aggregate_index( - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, RocksMetaStore>, rocks_index: &IndexRocksTable, rocks_partition: &PartitionRocksTable, table_cols: &Vec, @@ -1850,7 +1859,7 @@ impl RocksMetaStore { // Must be run under write_operation(). Returns activated row count. fn activate_chunks_impl( db_ref: DbTableRef, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, RocksMetaStore>, uploaded_chunk_ids: &[(u64, Option)], replay_handle_id: Option, ) -> Result<(u64, HashMap), CubeError> { @@ -1907,7 +1916,9 @@ impl MetaStore for RocksMetaStore { if_not_exists: bool, ) -> Result, CubeError> { self.write_operation("create_schema", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); let table = SchemaRocksTable::new(db_ref.clone()); if if_not_exists { let rows = table.get_rows_by_index(&schema_name, &SchemaRocksIndex::Name)?; @@ -1968,7 +1979,9 @@ impl MetaStore for RocksMetaStore { new_schema_name: String, ) -> Result, CubeError> { self.write_operation("rename_schema", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); let table = SchemaRocksTable::new(db_ref.clone()); let existing_keys = table.get_row_ids_by_index(&old_schema_name, &SchemaRocksIndex::Name)?; @@ -1992,7 +2005,9 @@ impl MetaStore for RocksMetaStore { new_schema_name: String, ) -> Result, CubeError> { self.write_operation("rename_schema_by_id", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); let table = SchemaRocksTable::new(db_ref.clone()); let old_schema = table.get_row(schema_id)?.unwrap(); @@ -2008,7 +2023,9 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn delete_schema(&self, schema_name: String) -> Result<(), CubeError> { self.write_operation("delete_schema", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); let table = SchemaRocksTable::new(db_ref.clone()); let existing_keys = table.get_row_ids_by_index(&schema_name, &SchemaRocksIndex::Name)?; @@ -2035,7 +2052,9 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn delete_schema_by_id(&self, schema_id: u64) -> Result<(), CubeError> { self.write_operation("delete_schema_by_id", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); let tables = TableRocksTable::new(db_ref.clone()).all_rows()?; if tables .into_iter() @@ -2100,7 +2119,9 @@ impl MetaStore for RocksMetaStore { extension: Option, ) -> Result, CubeError> { self.write_operation("create_table", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); if drop_if_exists { if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) { RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?; @@ -2295,7 +2316,9 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn table_ready(&self, id: u64, is_ready: bool) -> Result, CubeError> { self.write_operation("table_ready", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); let rocks_table = TableRocksTable::new(db_ref.clone()); Ok(rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?) }) @@ -2305,7 +2328,9 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn seal_table(&self, id: u64) -> Result, CubeError> { self.write_operation("seal_table", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); let rocks_table = TableRocksTable::new(db_ref.clone()); Ok(rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?) }) @@ -2334,13 +2359,16 @@ impl MetaStore for RocksMetaStore { self.write_operation( "update_location_download_size", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); + let rocks_table = TableRocksTable::new(db_ref.clone()); - Ok(rocks_table.update_with_res_fn( + rocks_table.update_with_res_fn( id, |r| r.update_location_download_size(&location, download_size), batch_pipe, - )?) + ) }, ) .await @@ -2393,14 +2421,14 @@ impl MetaStore for RocksMetaStore { }) .await } else { - let cache = self.store.cached_tables.clone(); + let cache = self.cached_tables.clone(); if let Some(t) = cube_ext::spawn_blocking(move || cache.lock().unwrap().clone()).await? { return Ok(t); } - let cache = self.store.cached_tables.clone(); + let cache = self.cached_tables.clone(); // Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped self.read_operation("get_tables_with_path", move |db_ref| { let cached_tables = { cache.lock().unwrap().clone() }; @@ -2465,7 +2493,9 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn drop_table(&self, table_id: u64) -> Result, CubeError> { self.write_operation("drop_table", move |db_ref, batch_pipe| { - batch_pipe.invalidate_tables_cache(); + batch_pipe.set_post_commit_callback(|metastore| { + *metastore.cached_tables.lock().unwrap() = None; + }); RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe) }) .await @@ -4819,7 +4849,7 @@ fn get_default_index_impl(db_ref: DbTableRef, table_id: u64) -> Result, current_active: &[(IdRow, Vec>)], new_active: &[(IdRow, u64)], mut update_new_partition_stats: impl FnMut(/*index*/ usize, &Partition) -> Partition, @@ -4982,6 +5012,67 @@ mod tests { use std::time::Duration; use std::{env, fs}; + #[tokio::test] + async fn test_post_commit_callback_on_success() -> Result<(), CubeError> { + let config = Config::test("test_post_commit_callback_on_success"); + let store_path = env::current_dir()?.join("test_post_commit_callback_on_success-local"); + let remote_store_path = + env::current_dir()?.join("test_post_commit_callback_on_success-remote"); + let _ = fs::remove_dir_all(store_path.clone()); + let _ = fs::remove_dir_all(remote_store_path.clone()); + let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); + + let meta_store = RocksMetaStore::new( + store_path.join("metastore").as_path(), + BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), + config.config_obj(), + )?; + + // Test 1: callback fires on successful writing + { + let called = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let called_clone = called.clone(); + meta_store + .write_operation("test_success", move |_db_ref, batch_pipe| { + batch_pipe.set_post_commit_callback(move |_metastore| { + called_clone.store(true, std::sync::atomic::Ordering::SeqCst); + }); + Ok(()) + }) + .await?; + + assert!( + called.load(std::sync::atomic::Ordering::SeqCst), + "post-commit callback should fire on successful write" + ); + } + + // Test 2: callback does NOT fire when the closure returns Err + { + let called = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let called_clone = called.clone(); + let result: Result<(), _> = meta_store + .write_operation("test_failure", move |_db_ref, batch_pipe| { + batch_pipe.set_post_commit_callback(move |_metastore| { + called_clone.store(true, std::sync::atomic::Ordering::SeqCst); + }); + Err(CubeError::user("intentional error".to_string())) + }) + .await; + + assert!(result.is_err()); + assert!( + !called.load(std::sync::atomic::Ordering::SeqCst), + "post-commit callback should NOT fire when write fails" + ); + } + + let _ = fs::remove_dir_all(store_path); + let _ = fs::remove_dir_all(remote_store_path); + + Ok(()) + } + #[test] fn macro_test() { let s = Schema { @@ -4996,10 +5087,10 @@ mod tests { } #[tokio::test] - async fn schema_test() { + async fn schema_test() -> Result<(), CubeError> { let config = Config::test("schema_test"); - let store_path = env::current_dir().unwrap().join("test-local"); - let remote_store_path = env::current_dir().unwrap().join("test-remote"); + let store_path = env::current_dir()?.join("test-local"); + let remote_store_path = env::current_dir()?.join("test-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -5009,23 +5100,13 @@ mod tests { store_path.join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - let schema_1 = meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + let schema_1 = meta_store.create_schema("foo".to_string(), false).await?; println!("New id: {}", schema_1.id); - let schema_2 = meta_store - .create_schema("bar".to_string(), false) - .await - .unwrap(); + let schema_2 = meta_store.create_schema("bar".to_string(), false).await?; println!("New id: {}", schema_2.id); - let schema_3 = meta_store - .create_schema("boo".to_string(), false) - .await - .unwrap(); + let schema_3 = meta_store.create_schema("boo".to_string(), false).await?; println!("New id: {}", schema_3.id); let schema_1_id = schema_1.id; @@ -5037,34 +5118,16 @@ mod tests { .await .is_err()); - assert_eq!( - meta_store.get_schema("foo".to_string()).await.unwrap(), - schema_1 - ); - assert_eq!( - meta_store.get_schema("bar".to_string()).await.unwrap(), - schema_2 - ); - assert_eq!( - meta_store.get_schema("boo".to_string()).await.unwrap(), - schema_3 - ); + assert_eq!(meta_store.get_schema("foo".to_string()).await?, schema_1); + assert_eq!(meta_store.get_schema("bar".to_string()).await?, schema_2); + assert_eq!(meta_store.get_schema("boo".to_string()).await?, schema_3); - assert_eq!( - meta_store.get_schema_by_id(schema_1_id).await.unwrap(), - schema_1 - ); - assert_eq!( - meta_store.get_schema_by_id(schema_2_id).await.unwrap(), - schema_2 - ); - assert_eq!( - meta_store.get_schema_by_id(schema_3_id).await.unwrap(), - schema_3 - ); + assert_eq!(meta_store.get_schema_by_id(schema_1_id).await?, schema_1); + assert_eq!(meta_store.get_schema_by_id(schema_2_id).await?, schema_2); + assert_eq!(meta_store.get_schema_by_id(schema_3_id).await?, schema_3); assert_eq!( - meta_store.get_schemas().await.unwrap(), + meta_store.get_schemas().await?, vec![ IdRow::new( 1, @@ -5090,8 +5153,7 @@ mod tests { assert_eq!( meta_store .rename_schema("foo".to_string(), "foo1".to_string()) - .await - .unwrap(), + .await?, IdRow::new( schema_1_id, Schema { @@ -5101,7 +5163,7 @@ mod tests { ); assert!(meta_store.get_schema("foo".to_string()).await.is_err()); assert_eq!( - meta_store.get_schema("foo1".to_string()).await.unwrap(), + meta_store.get_schema("foo1".to_string()).await?, IdRow::new( schema_1_id, Schema { @@ -5110,7 +5172,7 @@ mod tests { ) ); assert_eq!( - meta_store.get_schema_by_id(schema_1_id).await.unwrap(), + meta_store.get_schema_by_id(schema_1_id).await?, IdRow::new( schema_1_id, Schema { @@ -5127,8 +5189,7 @@ mod tests { assert_eq!( meta_store .rename_schema_by_id(schema_2_id, "bar1".to_string()) - .await - .unwrap(), + .await?, IdRow::new( schema_2_id, Schema { @@ -5138,7 +5199,7 @@ mod tests { ); assert!(meta_store.get_schema("bar".to_string()).await.is_err()); assert_eq!( - meta_store.get_schema("bar1".to_string()).await.unwrap(), + meta_store.get_schema("bar1".to_string()).await?, IdRow::new( schema_2_id, Schema { @@ -5147,7 +5208,7 @@ mod tests { ) ); assert_eq!( - meta_store.get_schema_by_id(schema_2_id).await.unwrap(), + meta_store.get_schema_by_id(schema_2_id).await?, IdRow::new( schema_2_id, Schema { @@ -5156,25 +5217,16 @@ mod tests { ) ); - assert_eq!( - meta_store.delete_schema("bar1".to_string()).await.unwrap(), - () - ); + meta_store.delete_schema("bar1".to_string()).await?; assert!(meta_store.delete_schema("bar1".to_string()).await.is_err()); assert!(meta_store.delete_schema("bar".to_string()).await.is_err()); assert!(meta_store.get_schema("bar1".to_string()).await.is_err()); assert!(meta_store.get_schema("bar".to_string()).await.is_err()); - assert_eq!( - meta_store.delete_schema_by_id(schema_3_id).await.unwrap(), - () - ); + meta_store.delete_schema_by_id(schema_3_id).await?; assert!(meta_store.delete_schema_by_id(schema_2_id).await.is_err()); - assert_eq!( - meta_store.delete_schema_by_id(schema_1_id).await.unwrap(), - () - ); + meta_store.delete_schema_by_id(schema_1_id).await?; assert!(meta_store.delete_schema_by_id(schema_1_id).await.is_err()); assert!(meta_store.get_schema("foo".to_string()).await.is_err()); assert!(meta_store.get_schema("foo1".to_string()).await.is_err()); @@ -5182,13 +5234,15 @@ mod tests { } let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn non_empty_schema_test() { + async fn non_empty_schema_test() -> Result<(), CubeError> { let config = Config::test("non_empty_schema_test"); - let store_path = env::current_dir().unwrap().join("test-local-ne-schema"); - let remote_store_path = env::current_dir().unwrap().join("test-remote-ne-schema"); + let store_path = env::current_dir()?.join("test-local-ne-schema"); + let remote_store_path = env::current_dir()?.join("test-remote-ne-schema"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -5197,18 +5251,11 @@ mod tests { store_path.join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - let schema1 = meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + let schema1 = meta_store.create_schema("foo".to_string(), false).await?; - let _schema2 = meta_store - .create_schema("foo2".to_string(), false) - .await - .unwrap(); + let _schema2 = meta_store.create_schema("foo2".to_string(), false).await?; let mut columns = Vec::new(); columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); @@ -5233,8 +5280,7 @@ mod tests { false, None, ) - .await - .unwrap(); + .await?; let _table2 = meta_store .create_table( @@ -5257,8 +5303,7 @@ mod tests { false, None, ) - .await - .unwrap(); + .await?; assert!(meta_store .delete_schema_by_id(schema1.get_id()) @@ -5268,13 +5313,15 @@ mod tests { let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn index_repair_test() { + async fn index_repair_test() -> Result<(), CubeError> { let config = Config::test("index_repair_test"); - let store_path = env::current_dir().unwrap().join("index_repair_test-local"); - let remote_store_path = env::current_dir().unwrap().join("index_repair_test-remote"); + let store_path = env::current_dir()?.join("index_repair_test-local"); + let remote_store_path = env::current_dir()?.join("index_repair_test-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -5284,19 +5331,14 @@ mod tests { store_path.join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo".to_string(), false).await?; meta_store .store .db - .delete(RowKey::Table(TableId::Schemas, 1).to_bytes()) - .unwrap(); + .delete(RowKey::Table(TableId::Schemas, 1).to_bytes())?; let result = meta_store.get_schema("foo".to_string()).await; println!("{:?}", result); @@ -5306,28 +5348,27 @@ mod tests { println!("Keys in db"); for kv_res in iterator { - let (key, _) = kv_res.unwrap(); + let (key, _) = kv_res?; println!("Key {:?}", RowKey::from_bytes(&key)); } sleep(Duration::from_millis(300)); - meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo".to_string(), false).await?; } let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn table_test() { + async fn table_test() -> Result<(), CubeError> { init_test_logger().await; let config = Config::test("table_test"); - let store_path = env::current_dir().unwrap().join("test-table-local"); - let remote_store_path = env::current_dir().unwrap().join("test-table-remote"); + let store_path = env::current_dir()?.join("test-table-local"); + let remote_store_path = env::current_dir()?.join("test-table-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -5336,13 +5377,9 @@ mod tests { store_path.clone().join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - let schema_1 = meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + let schema_1 = meta_store.create_schema("foo".to_string(), false).await?; let mut columns = Vec::new(); columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); columns.push(Column::new("col2".to_string(), ColumnType::String, 1)); @@ -5382,8 +5419,7 @@ mod tests { false, None, ) - .await - .unwrap(); + .await?; let table1_id = table1.id; assert!(schema_1.id == table1.get_row().get_schema_id()); @@ -5414,8 +5450,7 @@ mod tests { assert_eq!( meta_store .get_table("foo".to_string(), "boo".to_string()) - .await - .unwrap(), + .await?, table1 ); @@ -5427,25 +5462,22 @@ mod tests { None, None, Index::index_type_default(), - ) - .unwrap(); + )?; let expected_res = vec![IdRow::new(1, expected_index)]; - assert_eq!(meta_store.get_table_indexes(1).await.unwrap(), expected_res); + assert_eq!(meta_store.get_table_indexes(1).await?, expected_res); } let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn default_index_field_positions_test() { + async fn default_index_field_positions_test() -> Result<(), CubeError> { init_test_logger().await; let config = Config::test("default_index_field_positions_test"); - let store_path = env::current_dir() - .unwrap() - .join("test-default-index-positions-local"); - let remote_store_path = env::current_dir() - .unwrap() - .join("test-default-index-positions-remote"); + let store_path = env::current_dir()?.join("test-default-index-positions-local"); + let remote_store_path = env::current_dir()?.join("test-default-index-positions-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -5454,13 +5486,9 @@ mod tests { store_path.clone().join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo".to_string(), false).await?; let mut columns = Vec::new(); columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); columns.push(Column::new("col2".to_string(), ColumnType::Bytes, 1)); @@ -5500,8 +5528,7 @@ mod tests { false, None, ) - .await - .unwrap(); + .await?; let table1_id = table1.id; let expected_columns = vec![ @@ -5520,26 +5547,23 @@ mod tests { None, None, Index::index_type_default(), - ) - .unwrap(); + )?; let expected_res = vec![IdRow::new(1, expected_index)]; - assert_eq!(meta_store.get_table_indexes(1).await.unwrap(), expected_res); + assert_eq!(meta_store.get_table_indexes(1).await?, expected_res); } let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn table_with_aggregate_index_test() { + async fn table_with_aggregate_index_test() -> Result<(), CubeError> { init_test_logger().await; let config = Config::test("table_with_aggregate_index_test"); - let store_path = env::current_dir() - .unwrap() - .join("test-table-aggregate-local"); - let remote_store_path = env::current_dir() - .unwrap() - .join("test-table-aggregate-remote"); + let store_path = env::current_dir()?.join("test-table-aggregate-local"); + let remote_store_path = env::current_dir()?.join("test-table-aggregate-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -5548,13 +5572,9 @@ mod tests { store_path.clone().join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo".to_string(), false).await?; let mut columns = Vec::new(); columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); columns.push(Column::new("col2".to_string(), ColumnType::String, 1)); @@ -5593,16 +5613,14 @@ mod tests { false, None, ) - .await - .unwrap(); + .await?; let table_id = table1.get_id(); assert_eq!( meta_store .get_table("foo".to_string(), "boo".to_string()) - .await - .unwrap(), + .await?, table1 ); @@ -5622,7 +5640,7 @@ mod tests { ) ); - let indexes = meta_store.get_table_indexes(table_id).await.unwrap(); + let indexes = meta_store.get_table_indexes(table_id).await?; assert_eq!(indexes.len(), 2); let ind = indexes .into_iter() @@ -5723,19 +5741,18 @@ mod tests { } let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn table_with_default_index_no_measures_test() { + async fn table_with_default_index_no_measures_test() -> Result<(), CubeError> { init_test_logger().await; let config = Config::test("table_with_default_index_no_measures_test"); - let store_path = env::current_dir() - .unwrap() - .join("test-table-default-index-no-measure-local"); - let remote_store_path = env::current_dir() - .unwrap() - .join("test-table-default-index-no-measure-remote"); + let store_path = env::current_dir()?.join("test-table-default-index-no-measure-local"); + let remote_store_path = + env::current_dir()?.join("test-table-default-index-no-measure-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -5744,13 +5761,9 @@ mod tests { store_path.clone().join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo".to_string(), false).await?; let mut columns = Vec::new(); columns.push(Column::new("col1".to_string(), ColumnType::Int, 0)); columns.push(Column::new("col2".to_string(), ColumnType::String, 1)); @@ -5784,20 +5797,18 @@ mod tests { false, None, ) - .await - .unwrap(); + .await?; let table_id = table1.get_id(); assert_eq!( meta_store .get_table("foo".to_string(), "boo".to_string()) - .await - .unwrap(), + .await?, table1 ); - let indexes = meta_store.get_table_indexes(table_id).await.unwrap(); + let indexes = meta_store.get_table_indexes(table_id).await?; assert_eq!(indexes.len(), 1); let ind = &indexes[0]; @@ -5821,10 +5832,12 @@ mod tests { } let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn cold_start_test() { + async fn cold_start_test() -> Result<(), CubeError> { init_test_logger().await; { @@ -5834,76 +5847,60 @@ mod tests { let _ = fs::remove_dir_all(config.remote_dir()); let services = config.configure().await; - services.start_processing_loops().await.unwrap(); + services.start_processing_loops().await?; services .meta_store .create_schema("foo1".to_string(), false) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .run_upload() - .await - .unwrap(); + .await?; services .meta_store .create_schema("foo".to_string(), false) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .upload_check_point() - .await - .unwrap(); + .await?; services .meta_store .create_schema("bar".to_string(), false) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .run_upload() - .await - .unwrap(); - services.stop_processing_loops().await.unwrap(); + .await?; + services.stop_processing_loops().await?; Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict - fs::remove_dir_all(config.local_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; } { let config = Config::test("cold_start_test"); let services2 = config.configure().await; - services2 - .meta_store - .get_schema("foo1".to_string()) - .await - .unwrap(); - services2 - .meta_store - .get_schema("foo".to_string()) - .await - .unwrap(); - services2 - .meta_store - .get_schema("bar".to_string()) - .await - .unwrap(); + services2.meta_store.get_schema("foo1".to_string()).await?; + services2.meta_store.get_schema("foo".to_string()).await?; + services2.meta_store.get_schema("bar".to_string()).await?; - fs::remove_dir_all(config.local_dir()).unwrap(); - fs::remove_dir_all(config.remote_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; + fs::remove_dir_all(config.remote_dir())?; } + + Ok(()) } #[tokio::test] - async fn get_snapshots_list() { + async fn get_snapshots_list() -> Result<(), CubeError> { { let config = Config::test("get_snapshots_list"); @@ -5911,86 +5908,76 @@ mod tests { let _ = fs::remove_dir_all(config.remote_dir()); let services = config.configure().await; - services.start_processing_loops().await.unwrap(); + services.start_processing_loops().await?; let snapshots = services .rocks_meta_store .as_ref() .unwrap() .get_snapshots_list() - .await - .unwrap(); + .await?; assert_eq!(snapshots.len(), 0); services .meta_store .create_schema("foo1".to_string(), false) - .await - .unwrap(); + .await?; assert_eq!(snapshots.len(), 0); services .rocks_meta_store .as_ref() .unwrap() .upload_check_point() - .await - .unwrap(); + .await?; let snapshots = services .rocks_meta_store .as_ref() .unwrap() .get_snapshots_list() - .await - .unwrap(); + .await?; assert_eq!(snapshots.len(), 1); assert!(snapshots[0].current); services .meta_store .create_schema("foo".to_string(), false) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .upload_check_point() - .await - .unwrap(); + .await?; let snapshots = services .rocks_meta_store .as_ref() .unwrap() .get_snapshots_list() - .await - .unwrap(); + .await?; assert_eq!(snapshots.len(), 2); assert!(!snapshots[0].current); assert!(snapshots[1].current); services .meta_store .create_schema("bar".to_string(), false) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .upload_check_point() - .await - .unwrap(); + .await?; let snapshots = services .rocks_meta_store .as_ref() .unwrap() .get_snapshots_list() - .await - .unwrap(); + .await?; assert_eq!(snapshots.len(), 3); assert!(!snapshots[0].current); assert!(!snapshots[1].current); assert!(snapshots[2].current); - services.stop_processing_loops().await.unwrap(); + services.stop_processing_loops().await?; Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict - fs::remove_dir_all(config.local_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; } { @@ -6002,18 +5989,19 @@ mod tests { .as_ref() .unwrap() .get_snapshots_list() - .await - .unwrap(); + .await?; assert_eq!(snapshots.len(), 3); assert!(!snapshots[0].current); assert!(!snapshots[1].current); assert!(snapshots[2].current); - fs::remove_dir_all(config.local_dir()).unwrap(); - fs::remove_dir_all(config.remote_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; + fs::remove_dir_all(config.remote_dir())?; } + + Ok(()) } #[tokio::test] - async fn set_current_snapshot() { + async fn set_current_snapshot() -> Result<(), CubeError> { init_test_logger().await; { @@ -6023,27 +6011,24 @@ mod tests { let _ = fs::remove_dir_all(config.remote_dir()); let services = config.configure().await; - services.start_processing_loops().await.unwrap(); + services.start_processing_loops().await?; let rocks_meta_store = services.rocks_meta_store.as_ref().unwrap(); services .meta_store .create_schema("foo1".to_string(), false) - .await - .unwrap(); - rocks_meta_store.upload_check_point().await.unwrap(); + .await?; + rocks_meta_store.upload_check_point().await?; services .meta_store .create_schema("foo".to_string(), false) - .await - .unwrap(); - rocks_meta_store.upload_check_point().await.unwrap(); + .await?; + rocks_meta_store.upload_check_point().await?; services .meta_store .create_schema("bar".to_string(), false) - .await - .unwrap(); - rocks_meta_store.upload_check_point().await.unwrap(); - let snapshots = services.meta_store.get_snapshots_list().await.unwrap(); + .await?; + rocks_meta_store.upload_check_point().await?; + let snapshots = services.meta_store.get_snapshots_list().await?; assert_eq!(snapshots.len(), 3); assert!(!snapshots[0].current); assert!(!snapshots[1].current); @@ -6076,42 +6061,33 @@ mod tests { services .meta_store .create_schema("bar_after".to_string(), false) - .await - .unwrap(); - rocks_meta_store.upload_check_point().await.unwrap(); - rocks_meta_store.run_upload().await.unwrap(); + .await?; + rocks_meta_store.upload_check_point().await?; + rocks_meta_store.run_upload().await?; - let snapshots = services.meta_store.get_snapshots_list().await.unwrap(); + let snapshots = services.meta_store.get_snapshots_list().await?; assert_eq!(snapshots.len(), 3); assert!(!snapshots[0].current); assert!(snapshots[1].current); assert!(!snapshots[2].current); - services.stop_processing_loops().await.unwrap(); + services.stop_processing_loops().await?; Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict - fs::remove_dir_all(config.local_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; } { let config = Config::test("set_current_snapshot"); let services2 = config.configure().await; - let snapshots = services2.meta_store.get_snapshots_list().await.unwrap(); + let snapshots = services2.meta_store.get_snapshots_list().await?; assert_eq!(snapshots.len(), 3); assert!(!snapshots[0].current); assert!(snapshots[1].current); assert!(!snapshots[2].current); - services2 - .meta_store - .get_schema("foo1".to_string()) - .await - .unwrap(); - services2 - .meta_store - .get_schema("foo".to_string()) - .await - .unwrap(); + services2.meta_store.get_schema("foo1".to_string()).await?; + services2.meta_store.get_schema("foo".to_string()).await?; assert!(services2 .meta_store .get_schema("bar".to_string()) @@ -6129,45 +6105,34 @@ mod tests { .await; assert!(res.is_ok()); Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict - fs::remove_dir_all(config.local_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; } { let config = Config::test("set_current_snapshot"); let services3 = config.configure().await; - let snapshots = services3.meta_store.get_snapshots_list().await.unwrap(); + let snapshots = services3.meta_store.get_snapshots_list().await?; assert_eq!(snapshots.len(), 3); assert!(!snapshots[0].current); assert!(!snapshots[1].current); assert!(snapshots[2].current); - services3 - .meta_store - .get_schema("foo1".to_string()) - .await - .unwrap(); - services3 - .meta_store - .get_schema("foo".to_string()) - .await - .unwrap(); - services3 - .meta_store - .get_schema("bar".to_string()) - .await - .unwrap(); + services3.meta_store.get_schema("foo1".to_string()).await?; + services3.meta_store.get_schema("foo".to_string()).await?; + services3.meta_store.get_schema("bar".to_string()).await?; services3 .meta_store .get_schema("bar_after".to_string()) - .await - .unwrap(); - fs::remove_dir_all(config.local_dir()).unwrap(); - fs::remove_dir_all(config.remote_dir()).unwrap(); + .await?; + fs::remove_dir_all(config.local_dir())?; + fs::remove_dir_all(config.remote_dir())?; } + + Ok(()) } #[tokio::test] - async fn upload_logs_without_snapshots() { + async fn upload_logs_without_snapshots() -> Result<(), CubeError> { let config = Config::test("upload_logs_without_snapshots"); let _ = fs::remove_dir_all(config.local_dir()); @@ -6175,7 +6140,7 @@ mod tests { let services = config.configure().await; - services.start_processing_loops().await.unwrap(); + services.start_processing_loops().await?; let rocks_meta_store = services.rocks_meta_store.as_ref().unwrap(); let remote_fs = services .injector @@ -6184,29 +6149,26 @@ mod tests { services .meta_store .create_schema("foo1".to_string(), false) - .await - .unwrap(); - rocks_meta_store.run_upload().await.unwrap(); + .await?; + rocks_meta_store.run_upload().await?; services .meta_store .create_schema("foo".to_string(), false) - .await - .unwrap(); - rocks_meta_store.run_upload().await.unwrap(); - let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap(); + .await?; + rocks_meta_store.run_upload().await?; + let uploaded = remote_fs.list("metastore-".to_string()).await?; assert!(uploaded.is_empty()); - rocks_meta_store.upload_check_point().await.unwrap(); + rocks_meta_store.upload_check_point().await?; services .meta_store .create_schema("bar".to_string(), false) - .await - .unwrap(); + .await?; - rocks_meta_store.run_upload().await.unwrap(); + rocks_meta_store.run_upload().await?; - let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap(); + let uploaded = remote_fs.list("metastore-".to_string()).await?; let logs_uploaded = uploaded .into_iter() @@ -6215,9 +6177,9 @@ mod tests { assert_eq!(logs_uploaded.len(), 1); - rocks_meta_store.run_upload().await.unwrap(); + rocks_meta_store.run_upload().await?; - let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap(); + let uploaded = remote_fs.list("metastore-".to_string()).await?; let logs_uploaded = uploaded .into_iter() @@ -6229,12 +6191,11 @@ mod tests { services .meta_store .create_schema("bar2".to_string(), false) - .await - .unwrap(); + .await?; - rocks_meta_store.run_upload().await.unwrap(); + rocks_meta_store.run_upload().await?; - let uploaded = remote_fs.list("metastore-".to_string()).await.unwrap(); + let uploaded = remote_fs.list("metastore-".to_string()).await?; let logs_uploaded = uploaded .into_iter() @@ -6245,10 +6206,12 @@ mod tests { let _ = fs::remove_dir_all(config.local_dir()); let _ = fs::remove_dir_all(config.remote_dir()); + + Ok(()) } #[tokio::test] - async fn log_replay_ordering() { + async fn log_replay_ordering() -> Result<(), CubeError> { init_test_logger().await; { @@ -6258,27 +6221,24 @@ mod tests { let _ = fs::remove_dir_all(config.remote_dir()); let services = config.configure().await; - services.start_processing_loops().await.unwrap(); + services.start_processing_loops().await?; services .rocks_meta_store .as_ref() .unwrap() .upload_check_point() - .await - .unwrap(); + .await?; for i in 0..100 { let schema = services .meta_store .create_schema(format!("foo{}", i), false) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .run_upload() - .await - .unwrap(); + .await?; let table = services .meta_store .create_table( @@ -6301,62 +6261,51 @@ mod tests { false, None, ) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .run_upload() - .await - .unwrap(); - services - .meta_store - .drop_table(table.get_id()) - .await - .unwrap(); + .await?; + services.meta_store.drop_table(table.get_id()).await?; services .rocks_meta_store .as_ref() .unwrap() .run_upload() - .await - .unwrap(); + .await?; services .meta_store .delete_schema_by_id(schema.get_id()) - .await - .unwrap(); + .await?; services .rocks_meta_store .as_ref() .unwrap() .run_upload() - .await - .unwrap(); + .await?; } - services.stop_processing_loops().await.unwrap(); + services.stop_processing_loops().await?; Delay::new(Duration::from_millis(2000)).await; - fs::remove_dir_all(config.local_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; } { let config = Config::test("log_replay_ordering"); let services2 = config.configure().await; - let tables = services2 - .meta_store - .get_tables_with_path(true) - .await - .unwrap(); + let tables = services2.meta_store.get_tables_with_path(true).await?; assert_eq!(tables.len(), 0); let _ = fs::remove_dir_all(config.local_dir()); let _ = fs::remove_dir_all(config.remote_dir()); } + + Ok(()) } #[tokio::test] - async fn discard_logs() { + async fn discard_logs() -> Result<(), CubeError> { { let config = Config::test("discard_logs"); @@ -6364,19 +6313,17 @@ mod tests { let _ = fs::remove_dir_all(config.remote_dir()); let services = config.configure().await; - services.start_processing_loops().await.unwrap(); + services.start_processing_loops().await?; services .meta_store .create_schema("foo1".to_string(), false) - .await - .unwrap(); + .await?; while !services .rocks_meta_store .as_ref() .unwrap() .has_pending_changes() - .await - .unwrap() + .await? { futures_timer::Delay::new(Duration::from_millis(100)).await; } @@ -6385,20 +6332,17 @@ mod tests { .as_ref() .unwrap() .run_upload() - .await - .unwrap(); + .await?; services .meta_store .create_schema("foo".to_string(), false) - .await - .unwrap(); + .await?; while !services .rocks_meta_store .as_ref() .unwrap() .has_pending_changes() - .await - .unwrap() + .await? { futures_timer::Delay::new(Duration::from_millis(100)).await; } @@ -6407,20 +6351,17 @@ mod tests { .as_ref() .unwrap() .upload_check_point() - .await - .unwrap(); + .await?; services .meta_store .create_schema("bar".to_string(), false) - .await - .unwrap(); + .await?; while !services .rocks_meta_store .as_ref() .unwrap() .has_pending_changes() - .await - .unwrap() + .await? { futures_timer::Delay::new(Duration::from_millis(100)).await; } @@ -6429,19 +6370,17 @@ mod tests { .as_ref() .unwrap() .run_upload() - .await - .unwrap(); - services.stop_processing_loops().await.unwrap(); + .await?; + services.stop_processing_loops().await?; Delay::new(Duration::from_millis(2000)).await; // TODO logger init conflict - fs::remove_dir_all(config.local_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; let list = LocalDirRemoteFs::list_recursive( config.remote_dir().clone(), "metastore-".to_string(), ) - .await - .unwrap(); - let re = Regex::new(r"(\d+).flex").unwrap(); + .await?; + let re = Regex::new(r"(\d+).flex")?; let last_log = list .iter() .filter(|f| re.captures(f.remote_path()).is_some()) @@ -6456,37 +6395,30 @@ mod tests { println!("Truncating {:?}", file_path); let file = std::fs::OpenOptions::new() .write(true) - .open(file_path.clone()) - .unwrap(); - println!("Size {}", file.metadata().unwrap().len()); - file.set_len(50).unwrap(); + .open(file_path.clone())?; + println!("Size {}", file.metadata()?.len()); + file.set_len(50)?; } { let config = Config::test("discard_logs"); let services2 = config.configure().await; - services2 - .meta_store - .get_schema("foo1".to_string()) - .await - .unwrap(); - services2 - .meta_store - .get_schema("foo".to_string()) - .await - .unwrap(); + services2.meta_store.get_schema("foo1".to_string()).await?; + services2.meta_store.get_schema("foo".to_string()).await?; - fs::remove_dir_all(config.local_dir()).unwrap(); - fs::remove_dir_all(config.remote_dir()).unwrap(); + fs::remove_dir_all(config.local_dir())?; + fs::remove_dir_all(config.remote_dir())?; } + + Ok(()) } #[tokio::test] - async fn swap_chunks() { + async fn swap_chunks() -> Result<(), CubeError> { let config = Config::test("swap_chunks"); - let store_path = env::current_dir().unwrap().join("swap_chunks_test-local"); - let remote_store_path = env::current_dir().unwrap().join("swap_chunks_test-remote"); + let store_path = env::current_dir()?.join("swap_chunks_test-local"); + let remote_store_path = env::current_dir()?.join("swap_chunks_test-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -6495,12 +6427,8 @@ mod tests { store_path.join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); - meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + )?; + meta_store.create_schema("foo".to_string(), false).await?; let cols = vec![Column::new("name".to_string(), ColumnType::String, 0)]; meta_store .create_table( @@ -6523,37 +6451,32 @@ mod tests { false, None, ) - .await - .unwrap(); - let partition = meta_store.get_partition(1).await.unwrap(); + .await?; + let partition = meta_store.get_partition(1).await?; //============= trying to swap same source chunks twice ============== let mut source_ids: Vec = Vec::new(); let ch = meta_store .create_chunk(partition.get_id(), 10, None, None, true) - .await - .unwrap(); + .await?; source_ids.push(ch.get_id()); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + meta_store.chunk_uploaded(ch.get_id()).await?; let ch = meta_store .create_chunk(partition.get_id(), 16, None, None, true) - .await - .unwrap(); + .await?; source_ids.push(ch.get_id()); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + meta_store.chunk_uploaded(ch.get_id()).await?; let dest_chunk = meta_store .create_chunk(partition.get_id(), 26, None, None, true) - .await - .unwrap(); + .await?; assert_eq!(dest_chunk.get_row().active(), false); let dest_chunk2 = meta_store .create_chunk(partition.get_id(), 26, None, None, true) - .await - .unwrap(); + .await?; assert_eq!(dest_chunk2.get_row().active(), false); meta_store @@ -6562,15 +6485,14 @@ mod tests { vec![(dest_chunk.get_id(), Some(26))], None, ) - .await - .unwrap(); + .await?; for id in source_ids.iter() { - let ch = meta_store.get_chunk(id.to_owned()).await.unwrap(); + let ch = meta_store.get_chunk(id.to_owned()).await?; assert_eq!(ch.get_row().active(), false); } - let ch = meta_store.get_chunk(dest_chunk.get_id()).await.unwrap(); + let ch = meta_store.get_chunk(dest_chunk.get_id()).await?; assert_eq!(ch.get_row().active(), true); meta_store @@ -6586,17 +6508,15 @@ mod tests { let mut source_ids: Vec = Vec::new(); let ch = meta_store .create_chunk(partition.get_id(), 10, None, None, true) - .await - .unwrap(); + .await?; source_ids.push(ch.get_id()); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + meta_store.chunk_uploaded(ch.get_id()).await?; let ch = meta_store .create_chunk(partition.get_id(), 16, None, None, true) - .await - .unwrap(); + .await?; source_ids.push(ch.get_id()); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + meta_store.chunk_uploaded(ch.get_id()).await?; meta_store .swap_chunks( @@ -6610,7 +6530,7 @@ mod tests { ); for id in source_ids.iter() { - let ch = meta_store.get_chunk(id.to_owned()).await.unwrap(); + let ch = meta_store.get_chunk(id.to_owned()).await?; assert_eq!(ch.get_row().active(), true); } } @@ -6618,22 +6538,20 @@ mod tests { assert!(true); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn delete_old_snapshots() { + async fn delete_old_snapshots() -> Result<(), CubeError> { let metastore_snapshots_lifetime_secs = 1; let config = Config::test("delete_old_snapshots").update_config(|mut obj| { obj.metastore_snapshots_lifetime = metastore_snapshots_lifetime_secs; obj.minimum_metastore_snapshots_count = 2; obj }); - let store_path = env::current_dir() - .unwrap() - .join("delete_old_snapshots-local"); - let remote_store_path = env::current_dir() - .unwrap() - .join("delete_old_snapshots-remote"); + let store_path = env::current_dir()?.join("delete_old_snapshots-local"); + let remote_store_path = env::current_dir()?.join("delete_old_snapshots-remote"); let _ = fs::remove_dir_all(&store_path); let _ = fs::remove_dir_all(&remote_store_path); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -6642,56 +6560,38 @@ mod tests { store_path.join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; - // let list = remote_fs.list("metastore-".to_owned()).await.unwrap(); + // let list = remote_fs.list("metastore-".to_owned()).await?; // assert_eq!(0, list.len(), "remote fs list: {:?}", list); let uploaded = - BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") - .await - .unwrap(); + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?; assert_eq!(uploaded.len(), 0); - meta_store - .create_schema("foo1".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo1".to_string(), false).await?; - meta_store.upload_check_point().await.unwrap(); + meta_store.upload_check_point().await?; let uploaded1 = - BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") - .await - .unwrap(); + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?; assert_eq!(uploaded1.len(), 1); - meta_store - .create_schema("foo2".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo2".to_string(), false).await?; - meta_store.upload_check_point().await.unwrap(); + meta_store.upload_check_point().await?; let uploaded2 = - BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") - .await - .unwrap(); + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?; assert_eq!(uploaded2.len(), 2); - meta_store - .create_schema("foo3".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo3".to_string(), false).await?; - meta_store.upload_check_point().await.unwrap(); + meta_store.upload_check_point().await?; let uploaded3 = - BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") - .await - .unwrap(); + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?; assert_eq!( uploaded3.len(), @@ -6700,21 +6600,16 @@ mod tests { uploaded3.keys().join(", ") ); - meta_store - .create_schema("foo4".to_string(), false) - .await - .unwrap(); + meta_store.create_schema("foo4".to_string(), false).await?; tokio::time::sleep(Duration::from_millis( metastore_snapshots_lifetime_secs * 1000 + 100, )) .await; - meta_store.upload_check_point().await.unwrap(); + meta_store.upload_check_point().await?; let uploaded4 = - BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore") - .await - .unwrap(); + BaseRocksStoreFs::list_files_by_snapshot(remote_fs.as_ref(), "metastore").await?; // Should have 2 remaining snapshots because 2 is the minimum. assert_eq!(uploaded4.len(), 2); @@ -6722,17 +6617,15 @@ mod tests { let _ = fs::remove_dir_all(&store_path); let _ = fs::remove_dir_all(&remote_store_path); + + Ok(()) } #[tokio::test] - async fn swap_active_partitions() { + async fn swap_active_partitions() -> Result<(), CubeError> { let config = Config::test("swap_active_partitions"); - let store_path = env::current_dir() - .unwrap() - .join("swap_active_partitions_test-local"); - let remote_store_path = env::current_dir() - .unwrap() - .join("swap_active_partitions_test-remote"); + let store_path = env::current_dir()?.join("swap_active_partitions_test-local"); + let remote_store_path = env::current_dir()?.join("swap_active_partitions_test-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -6741,12 +6634,8 @@ mod tests { store_path.join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); - meta_store - .create_schema("foo".to_string(), false) - .await - .unwrap(); + )?; + meta_store.create_schema("foo".to_string(), false).await?; let cols = vec![Column::new("name".to_string(), ColumnType::String, 0)]; meta_store .create_table( @@ -6769,29 +6658,25 @@ mod tests { false, None, ) - .await - .unwrap(); - let partition = meta_store.get_partition(1).await.unwrap(); + .await?; + let partition = meta_store.get_partition(1).await?; let mut source_chunks: Vec> = Vec::new(); let ch = meta_store .create_chunk(partition.get_id(), 10, None, None, true) - .await - .unwrap(); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + .await?; + meta_store.chunk_uploaded(ch.get_id()).await?; source_chunks.push(ch); let ch = meta_store .create_chunk(partition.get_id(), 16, None, None, true) - .await - .unwrap(); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + .await?; + meta_store.chunk_uploaded(ch.get_id()).await?; source_chunks.push(ch); let dest_partition = meta_store .create_partition(Partition::new_child(&partition, None)) - .await - .unwrap(); + .await?; meta_store .swap_active_partitions( @@ -6799,34 +6684,22 @@ mod tests { vec![(dest_partition.clone(), 10)], vec![(26, (None, None), (None, None))], ) - .await - .unwrap(); + .await?; assert_eq!( - meta_store - .get_partition(1) - .await - .unwrap() - .get_row() - .is_active(), + meta_store.get_partition(1).await?.get_row().is_active(), false ); assert_eq!( meta_store .get_partition(dest_partition.get_id()) - .await - .unwrap() + .await? .get_row() .is_active(), true ); for c in source_chunks.iter() { assert_eq!( - meta_store - .get_chunk(c.get_id()) - .await - .unwrap() - .get_row() - .active(), + meta_store.get_chunk(c.get_id()).await?.get_row().active(), false ); } @@ -6836,22 +6709,19 @@ mod tests { let mut source_chunks: Vec> = Vec::new(); let ch = meta_store .create_chunk(partition.clone().get_id(), 10, None, None, true) - .await - .unwrap(); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + .await?; + meta_store.chunk_uploaded(ch.get_id()).await?; source_chunks.push(ch); let ch = meta_store .create_chunk(partition.get_id(), 16, None, None, true) - .await - .unwrap(); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + .await?; + meta_store.chunk_uploaded(ch.get_id()).await?; source_chunks.push(ch); let dest_partition = meta_store .create_partition(Partition::new_child(&partition, None)) - .await - .unwrap(); + .await?; match meta_store .swap_active_partitions( @@ -6873,27 +6743,23 @@ mod tests { let partition = meta_store .get_active_partitions_by_index_id(1) - .await - .unwrap() + .await? .first() .unwrap() .to_owned(); let ch = meta_store .create_chunk(partition.clone().get_id(), 10, None, None, true) - .await - .unwrap(); + .await?; source_chunks.push(ch); let ch = meta_store .create_chunk(partition.get_id(), 16, None, None, true) - .await - .unwrap(); + .await?; source_chunks.push(ch); let dest_partition = meta_store .create_partition(Partition::new_child(&partition, None)) - .await - .unwrap(); + .await?; let dest_row_count = partition.get_row().main_table_row_count() + 26; @@ -6916,23 +6782,20 @@ mod tests { let partition = meta_store .get_active_partitions_by_index_id(1) - .await - .unwrap() + .await? .first() .unwrap() .to_owned(); let ch = meta_store .create_chunk(partition.clone().get_id(), 10, None, None, true) - .await - .unwrap(); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + .await?; + meta_store.chunk_uploaded(ch.get_id()).await?; source_chunks.push(ch); let ch = meta_store .create_chunk(partition.get_id(), 16, None, None, true) - .await - .unwrap(); - meta_store.chunk_uploaded(ch.get_id()).await.unwrap(); + .await?; + meta_store.chunk_uploaded(ch.get_id()).await?; source_chunks.push(ch); let dest_row_count = partition.get_row().main_table_row_count() + 26; @@ -6955,13 +6818,15 @@ mod tests { assert!(true); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } #[tokio::test] - async fn job_priority_test() { + async fn job_priority_test() -> Result<(), CubeError> { let config = Config::test("job_priority_test"); - let store_path = env::current_dir().unwrap().join("test-job-priority-local"); - let remote_store_path = env::current_dir().unwrap().join("test-job-priority-remote"); + let store_path = env::current_dir()?.join("test-job-priority-local"); + let remote_store_path = env::current_dir()?.join("test-job-priority-remote"); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); let remote_fs = LocalDirRemoteFs::new(Some(remote_store_path.clone()), store_path.clone()); @@ -6970,40 +6835,35 @@ mod tests { store_path.clone().join("metastore").as_path(), BaseRocksStoreFs::new_for_metastore(remote_fs.clone(), config.config_obj()), config.config_obj(), - ) - .unwrap(); + )?; meta_store .add_job(Job::new( RowKey::Table(TableId::Partitions, 1), JobType::InMemoryChunksCompaction, "node1".to_string(), )) - .await - .unwrap(); + .await?; meta_store .add_job(Job::new( RowKey::Table(TableId::Partitions, 1), JobType::PartitionCompaction, "node1".to_string(), )) - .await - .unwrap(); + .await?; meta_store .add_job(Job::new( RowKey::Table(TableId::Partitions, 2), JobType::PartitionCompaction, "node1".to_string(), )) - .await - .unwrap(); + .await?; meta_store .add_job(Job::new( RowKey::Table(TableId::Partitions, 3), JobType::InMemoryChunksCompaction, "node1".to_string(), )) - .await - .unwrap(); + .await?; meta_store .add_job(Job::new( @@ -7011,37 +6871,32 @@ mod tests { JobType::PartitionCompaction, "node2".to_string(), )) - .await - .unwrap(); + .await?; meta_store .add_job(Job::new( RowKey::Table(TableId::Partitions, 12), JobType::PartitionCompaction, "node2".to_string(), )) - .await - .unwrap(); + .await?; meta_store .add_job(Job::new( RowKey::Table(TableId::Partitions, 13), JobType::InMemoryChunksCompaction, "node2".to_string(), )) - .await - .unwrap(); + .await?; meta_store .add_job(Job::new( RowKey::Table(TableId::Partitions, 11), JobType::InMemoryChunksCompaction, "node2".to_string(), )) - .await - .unwrap(); + .await?; let job = meta_store .start_processing_job("node1".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction); assert_eq!( @@ -7051,8 +6906,7 @@ mod tests { let job = meta_store .start_processing_job("node1".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction); assert_eq!( @@ -7062,8 +6916,7 @@ mod tests { let job = meta_store .start_processing_job("node1".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction); assert_eq!( @@ -7073,8 +6926,7 @@ mod tests { let job = meta_store .start_processing_job("node1".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction); assert_eq!( @@ -7084,8 +6936,7 @@ mod tests { let job = meta_store .start_processing_job("node2".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction); assert_eq!( @@ -7095,8 +6946,7 @@ mod tests { let job = meta_store .start_processing_job("node2".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::InMemoryChunksCompaction); assert_eq!( @@ -7106,8 +6956,7 @@ mod tests { let job = meta_store .start_processing_job("node2".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction); assert_eq!( @@ -7117,8 +6966,7 @@ mod tests { let job = meta_store .start_processing_job("node2".to_string(), false) - .await - .unwrap() + .await? .unwrap(); assert_eq!(job.get_row().job_type(), &JobType::PartitionCompaction); assert_eq!( @@ -7128,6 +6976,8 @@ mod tests { } let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); + + Ok(()) } } @@ -7136,7 +6986,7 @@ impl RocksMetaStore { deactivate_ids: Vec, uploaded_ids_and_sizes: Vec<(u64, Option)>, db_ref: DbTableRef, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, RocksMetaStore>, check_rows: bool, new_replay_handle_id: Option, ) -> Result<(), CubeError> { @@ -7236,7 +7086,7 @@ impl RocksMetaStore { impl RocksMetaStore { fn drop_index( db: DbTableRef, - pipe: &mut BatchPipe, + pipe: &mut BatchPipe<'_, RocksMetaStore>, index_id: u64, update_multi_partitions: bool, ) -> Result<(), CubeError> { diff --git a/rust/cubestore/cubestore/src/metastore/rocks_store.rs b/rust/cubestore/cubestore/src/metastore/rocks_store.rs index c6b54f7dc1a39..7615b63bfc559 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_store.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_store.rs @@ -1,5 +1,4 @@ use crate::config::ConfigObj; -use crate::metastore::table::TablePath; use crate::metastore::{MetaStoreEvent, MetaStoreFs}; use crate::util::time_span::warn_long; @@ -629,20 +628,22 @@ pub struct KeyVal { pub val: Vec, } -pub struct BatchPipe<'a> { +pub type PostCommitCallback = Box; + +pub struct BatchPipe<'a, S = ()> { db: &'a DB, write_batch: WriteBatch, events: Vec, - pub invalidate_tables_cache: bool, + post_commit_callback: Option>, } -impl<'a> BatchPipe<'a> { - pub fn new(db: &'a DB) -> BatchPipe<'a> { +impl<'a, S> BatchPipe<'a, S> { + pub fn new(db: &'a DB) -> BatchPipe<'a, S> { BatchPipe { db, write_batch: WriteBatch::default(), events: Vec::new(), - invalidate_tables_cache: false, + post_commit_callback: None, } } @@ -654,14 +655,24 @@ impl<'a> BatchPipe<'a> { self.events.push(event); } - pub fn batch_write_rows(self) -> Result, CubeError> { + pub fn batch_write_rows( + self, + ) -> Result<(Vec, Option>), CubeError> { let db = self.db; db.write(self.write_batch)?; - Ok(self.events) + + Ok((self.events, self.post_commit_callback)) } - pub fn invalidate_tables_cache(&mut self) { - self.invalidate_tables_cache = true; + /// Set the callback that runs on the RW-loop thread after the RocksDB + /// commit succeeds. Overwrites any previously set callback. The callback + /// receives the store instance so it can reach shared state. Must not + /// panic and must not block on async work. + pub fn set_post_commit_callback(&mut self, f: F) + where + F: FnOnce(&S) + Send + 'static, + { + self.post_commit_callback = Some(Box::new(f)); } } @@ -882,8 +893,7 @@ pub struct RocksStore { last_check_seq: Arc>, snapshot_uploaded: Arc>, snapshots_upload_stopped: Arc>, - pub(crate) cached_tables: Arc>>>>, - rw_loop_default_cf: RocksStoreRWLoop, + pub(crate) rw_loop_default_cf: RocksStoreRWLoop, details: Arc, } @@ -936,7 +946,6 @@ impl RocksStore { last_check_seq: Arc::new(RwLock::new(db_arc.latest_sequence_number())), snapshots_upload_stopped: Arc::new(AsyncMutex::new(false)), config, - cached_tables: Arc::new(Mutex::new(None)), rw_loop_default_cf: RocksStoreRWLoop::new("metastore", "default"), details, }; @@ -1019,33 +1028,34 @@ impl RocksStore { #[inline(always)] pub async fn write_operation(&self, op_name: &'static str, f: F) -> Result where - F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result + F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a>) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, { - self.write_operation_impl::(&self.rw_loop_default_cf, op_name, f) + self.write_operation_impl::(&self.rw_loop_default_cf, op_name, f, ()) .await } - pub async fn write_operation_impl( + pub async fn write_operation_impl( &self, rw_loop: &RocksStoreRWLoop, op_name: &'static str, f: F, + store: S, ) -> Result where - F: for<'a> FnOnce(DbTableRef<'a>, &'a mut BatchPipe) -> Result + F: for<'a> FnOnce(DbTableRef<'a>, &mut BatchPipe<'a, S>) -> Result + Send + Sync + 'static, R: Send + Sync + 'static, + S: Send + Sync + 'static, { let db = self.db.clone(); let mem_seq = MemorySequence::new(self.seq_store.clone()); let db_to_send = db.clone(); - let cached_tables = self.cached_tables.clone(); let loop_name = rw_loop.get_name(); let store_name = self.details.get_name(); @@ -1070,11 +1080,11 @@ impl RocksStore { ); match res { Ok(res) => { - if batch.invalidate_tables_cache { - *cached_tables.lock().unwrap() = None; + let (events, callback) = batch.batch_write_rows()?; + if let Some(cb) = callback { + cb(&store); } - let write_result = batch.batch_write_rows()?; - tx.send(Ok((res, write_result))).map_err(|_| { + tx.send(Ok((res, events))).map_err(|_| { CubeError::internal(format!( "[{}-{}] Write operation result receiver has been dropped", store_name, loop_name diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 8b8bacacc55c4..7635459fbeb0b 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -498,11 +498,11 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { } /// @internal Do not use this method directly, please use insert or insert_with_pk - fn do_insert( + fn do_insert( &self, row_id: Option, row: Self::T, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result, CubeError> { let mut ser = flexbuffers::FlexbufferSerializer::new(); row.serialize(&mut ser).unwrap(); @@ -550,19 +550,19 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { Ok(IdRow::new(row_id, row)) } - fn insert_with_pk( + fn insert_with_pk( &self, row_id: u64, row: Self::T, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result, CubeError> { self.do_insert(Some(row_id), row, batch_pipe) } - fn insert( + fn insert( &self, row: Self::T, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result, CubeError> { self.do_insert(None, row, batch_pipe) } @@ -911,34 +911,34 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { self.get_row_by_index_opt(row_key, secondary_index, true) } - fn update_with_fn( + fn update_with_fn( &self, row_id: u64, update_fn: impl FnOnce(&Self::T) -> Self::T, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result, CubeError> { let row = self.get_row_or_not_found(row_id)?; let new_row = update_fn(&row.get_row()); self.update(row_id, new_row, &row.get_row(), batch_pipe) } - fn update_with_res_fn( + fn update_with_res_fn( &self, row_id: u64, update_fn: impl FnOnce(&Self::T) -> Result, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result, CubeError> { let row = self.get_row_or_not_found(row_id)?; let new_row = update_fn(&row.get_row())?; self.update(row_id, new_row, &row.get_row(), batch_pipe) } - fn update( + fn update( &self, row_id: u64, new_row: Self::T, old_row: &Self::T, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result, CubeError> { let deleted_row = self.delete_index_row(&old_row, row_id)?; for row in deleted_row { @@ -969,7 +969,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { Ok(IdRow::new(row_id, new_row)) } - fn truncate(&self, batch_pipe: &mut BatchPipe) -> Result<(), CubeError> { + fn truncate(&self, batch_pipe: &mut BatchPipe<'_, S>) -> Result<(), CubeError> { let iter = self.table_scan(self.snapshot())?; for item in iter { @@ -981,13 +981,13 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { Ok(()) } - fn update_extended_ttl_secondary_index<'a, K: Debug>( + fn update_extended_ttl_secondary_index<'a, K: Debug, S>( &self, row_id: u64, secondary_index: &'a impl RocksSecondaryIndex, secondary_key_hash: SecondaryKeyHash, extended: RocksSecondaryIndexValueTTLExtended, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result where K: Hash, @@ -1025,15 +1025,19 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { } } - fn delete(&self, row_id: u64, batch_pipe: &mut BatchPipe) -> Result, CubeError> { + fn delete( + &self, + row_id: u64, + batch_pipe: &mut BatchPipe<'_, S>, + ) -> Result, CubeError> { let row = self.get_row_or_not_found(row_id)?; self.delete_row(row, batch_pipe) } - fn try_delete( + fn try_delete( &self, row_id: u64, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result>, CubeError> { if let Some(row) = self.get_row(row_id)? { Ok(Some(self.delete_row(row, batch_pipe)?)) @@ -1042,10 +1046,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { } } - fn delete_row( + fn delete_row( &self, row: IdRow, - batch_pipe: &mut BatchPipe, + batch_pipe: &mut BatchPipe<'_, S>, ) -> Result, CubeError> { let deleted_row = self.delete_index_row(row.get_row(), row.get_id())?; batch_pipe.add_event(MetaStoreEvent::Delete(Self::table_id(), row.get_id())); From 34a78f4ce12251fac65367e3917495f306096029 Mon Sep 17 00:00:00 2001 From: Artyom Keydunov Date: Tue, 14 Apr 2026 08:55:40 -0700 Subject: [PATCH 2/2] docs: clean up data modeling docs and move cubes/views diagram (#10685) - Remove stale docs-mintlify/data-modeling/ folder (duplicated content) - Move cubes/views diagram to top of Getting Started page - Remove OLAP terminology reference from Concepts page - Add introductory sentence listing key concepts on Concepts page Made-with: Cursor --- docs-mintlify/data-modeling/concepts.mdx | 132 ------------------ docs-mintlify/data-modeling/overview.mdx | 87 ------------ .../docs/data-modeling/concepts/index.mdx | 16 +-- docs-mintlify/docs/data-modeling/overview.mdx | 4 + 4 files changed, 11 insertions(+), 228 deletions(-) delete mode 100644 docs-mintlify/data-modeling/concepts.mdx delete mode 100644 docs-mintlify/data-modeling/overview.mdx diff --git a/docs-mintlify/data-modeling/concepts.mdx b/docs-mintlify/data-modeling/concepts.mdx deleted file mode 100644 index 753a5c2df73e6..0000000000000 --- a/docs-mintlify/data-modeling/concepts.mdx +++ /dev/null @@ -1,132 +0,0 @@ ---- -title: Concepts -description: Core concepts of Cube data modeling — cubes, views, measures, dimensions, and joins. ---- - - Data Modeling Concepts - -## Cubes - -A cube represents a dataset in your data model. Each cube is typically mapped to a single table or view in your database: - -```yaml -cubes: - - name: users - sql_table: public.users -``` - -You can also use a SQL query as the data source: - -```yaml -cubes: - - name: active_users - sql: > - SELECT * FROM public.users - WHERE is_active = true -``` - -## Measures - -Measures are quantitative data points — the "what" you're measuring: - -```yaml -measures: - - name: count - type: count - - - name: total_revenue - type: sum - sql: revenue - - - name: average_order_value - type: avg - sql: amount -``` - -### Measure types - -| Type | Description | -|------|-------------| -| `count` | Count of rows | -| `count_distinct` | Count of distinct values | -| `sum` | Sum of values | -| `avg` | Average of values | -| `min` | Minimum value | -| `max` | Maximum value | -| `number` | Custom SQL expression | - -## Dimensions - -Dimensions are qualitative attributes — the "by what" you're slicing data: - -```yaml -dimensions: - - name: status - sql: status - type: string - - - name: created_at - sql: created_at - type: time -``` - -### Dimension types - -| Type | Description | -|------|-------------| -| `string` | Text values | -| `number` | Numeric values | -| `boolean` | True/false values | -| `time` | Timestamps and dates | -| `geo` | Geographic coordinates | - -## Joins - -Joins define relationships between cubes: - -```yaml -cubes: - - name: orders - # ... - - joins: - - name: customers - relationship: many_to_one - sql: "{CUBE}.customer_id = {customers.id}" -``` - -### Relationship types - - - - Each record in the first cube matches exactly one record in the joined cube. - - - Multiple records in the first cube match one record in the joined cube. This is the most common type (e.g., orders → customers). - - - One record in the first cube matches multiple records in the joined cube. - - - -## Views - -Views provide curated interfaces to your data model. They select specific measures and dimensions from one or more cubes: - -```yaml -views: - - name: order_analytics - cubes: - - join_path: orders - includes: - - count - - total_amount - - status - - join_path: orders.customers - includes: - - name: company -``` - - -Views are the recommended way to expose data to end users and AI agents. They provide a clean, focused interface without exposing the full complexity of your data graph. - diff --git a/docs-mintlify/data-modeling/overview.mdx b/docs-mintlify/data-modeling/overview.mdx deleted file mode 100644 index a68df0280382c..0000000000000 --- a/docs-mintlify/data-modeling/overview.mdx +++ /dev/null @@ -1,87 +0,0 @@ ---- -title: Data Modeling Overview -description: Learn how to define your semantic layer with Cube data models. ---- - - Data Modeling - -Data modeling in Cube is the process of defining your semantic layer — the business logic, metrics, and relationships that sit between your data warehouse and your data consumers. - -## Key concepts - -Cube's data model is built around two main objects: - -### Cubes - -Cubes represent business entities (e.g., `orders`, `customers`, `products`). They define: - -- **Measures** — aggregated values like `count`, `sum`, `avg` -- **Dimensions** — attributes used for grouping and filtering -- **Joins** — relationships between cubes - -```yaml -cubes: - - name: orders - sql_table: public.orders - - measures: - - name: count - type: count - - - name: total_amount - type: sum - sql: amount - - dimensions: - - name: id - sql: id - type: number - primary_key: true - - - name: status - sql: status - type: string - - - name: created_at - sql: created_at - type: time -``` - -### Views - -Views are curated datasets built on top of cubes. They select specific measures and dimensions to create focused data products: - -```yaml -views: - - name: order_summary - cubes: - - join_path: orders - includes: - - count - - total_amount - - status - - created_at - - - join_path: orders.customers - includes: - - name: company -``` - -## Code-first approach - -Cube data models are defined in YAML or JavaScript files, managed through version control: - - - - Create cube files in the `model/` directory with your business entities. - - - Add joins between cubes to establish entity relationships. - - - Build views that combine cubes into focused data products. - - - Use development mode to test, then deploy to production. - - diff --git a/docs-mintlify/docs/data-modeling/concepts/index.mdx b/docs-mintlify/docs/data-modeling/concepts/index.mdx index 9e3a6ae34e7eb..6a3096829bf4b 100644 --- a/docs-mintlify/docs/data-modeling/concepts/index.mdx +++ b/docs-mintlify/docs/data-modeling/concepts/index.mdx @@ -3,11 +3,13 @@ title: Concepts description: Learn foundational OLAP concepts like cubes, dimensions, measures, and joins used in Cube data modeling. --- - Concepts +Cube's key concepts are [cubes](#cubes), [views](#views), and members +([measures](#measures), [dimensions](#dimensions)). This page is intended +for both newcomers and regular users to refresh their understanding. -Cube borrows a lot of terminology from [OLAP -theory][wiki-olap], and this document is intended for both newcomers and regular -users to refresh their understanding. + + + We'll use a sample e-commerce database with two tables, `orders` and `line_items` to illustrate the concepts throughout this page: @@ -136,10 +138,6 @@ data model with which data consumers can interact. They are useful for defining metrics, managing governance and data access, and controlling ambiguous join paths. - - - - Views do **not** define their own members. Instead, they reference cubes by specific join paths and include their members. @@ -834,7 +832,7 @@ See the reference documentaton for the full list of pre-aggregation [ref-subquery-dimensions]: /docs/data-modeling/concepts/calculated-members#subquery-dimensions [ref-calculated-measures]: /docs/data-modeling/concepts/calculated-members#calculated-measures [ref-working-with-joins]: /docs/data-modeling/concepts/working-with-joins -[wiki-olap]: https://en.wikipedia.org/wiki/Online_analytical_processing + [wiki-view-sql]: https://en.wikipedia.org/wiki/View_(SQL) [ref-matching-preaggs]: /docs/pre-aggregations/matching-pre-aggregations [ref-syntax-references]: /docs/data-modeling/syntax#references diff --git a/docs-mintlify/docs/data-modeling/overview.mdx b/docs-mintlify/docs/data-modeling/overview.mdx index a0ab514241663..80da010e3109a 100644 --- a/docs-mintlify/docs/data-modeling/overview.mdx +++ b/docs-mintlify/docs/data-modeling/overview.mdx @@ -9,6 +9,10 @@ exposed through a [rich set of APIs][ref-apis] that allows end-users to run a wide variety of analytical queries without modifying the data model itself. + + + + You can explore a carefully crafted sample data model if you create a [demo