diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index 7015e573..56d4ef9f 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -45,7 +45,7 @@ use crate::rpc::{RpcClient, ServerConnection}; use crate::error::{Error, Result}; use crate::proto::GetTableInfoResponse; -use crate::{BucketId, PartitionId, TableId}; +use crate::{BucketId, PartitionId, SnapshotId, TableId}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use tokio::task::JoinHandle; @@ -595,7 +595,7 @@ impl FlussAdmin { table_id: TableId, partition_id: Option, bucket_id: BucketId, - snapshot_id: i64, + snapshot_id: SnapshotId, ) -> Result { let response = self .admin_gateway() @@ -633,7 +633,7 @@ impl FlussAdmin { pub async fn get_lake_snapshot( &self, table_path: &TablePath, - snapshot_id: Option, + snapshot_id: Option, readable: Option, ) -> Result { let response = self diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index 62d4c57d..59951ad7 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -145,6 +145,7 @@ mod test_utils; pub type TableId = i64; pub type PartitionId = i64; pub type BucketId = i32; +pub type SnapshotId = i64; pub(crate) mod proto { // Generated; not every 1.x message is wired up to a caller yet. diff --git a/crates/fluss/src/metadata/kv_snapshot.rs b/crates/fluss/src/metadata/kv_snapshot.rs index 054254ad..2a2d508e 100644 --- a/crates/fluss/src/metadata/kv_snapshot.rs +++ b/crates/fluss/src/metadata/kv_snapshot.rs @@ -19,7 +19,7 @@ use crate::proto::{ AcquireKvSnapshotLeaseResponse, GetKvSnapshotMetadataResponse, GetLatestKvSnapshotsResponse, ListKvSnapshotsResponse, PbKvSnapshot, PbRemotePathAndLocalFile, }; -use crate::{BucketId, PartitionId, TableId}; +use crate::{BucketId, PartitionId, SnapshotId, TableId}; use crate::metadata::KvSnapshotLeaseForTable; @@ -27,7 +27,7 @@ use crate::metadata::KvSnapshotLeaseForTable; #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshot { pub bucket_id: BucketId, - pub snapshot_id: Option, + pub snapshot_id: Option, pub log_offset: Option, } diff --git a/crates/fluss/src/metadata/kv_snapshot_lease.rs b/crates/fluss/src/metadata/kv_snapshot_lease.rs index 98638f5d..11fda914 100644 --- a/crates/fluss/src/metadata/kv_snapshot_lease.rs +++ b/crates/fluss/src/metadata/kv_snapshot_lease.rs @@ -16,14 +16,14 @@ // under the License. use crate::proto::{PbKvSnapshotLeaseForBucket, PbKvSnapshotLeaseForTable}; -use crate::{BucketId, PartitionId, TableId}; +use crate::{BucketId, PartitionId, SnapshotId, TableId}; /// One bucket's slot in a KV-snapshot lease request. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshotLeaseForBucket { pub partition_id: Option, pub bucket_id: BucketId, - pub snapshot_id: i64, + pub snapshot_id: SnapshotId, } impl KvSnapshotLeaseForBucket { diff --git a/crates/fluss/src/metadata/lake_snapshot.rs b/crates/fluss/src/metadata/lake_snapshot.rs index 5ff50ad6..0cbe1a72 100644 --- a/crates/fluss/src/metadata/lake_snapshot.rs +++ b/crates/fluss/src/metadata/lake_snapshot.rs @@ -16,7 +16,7 @@ // under the License. use crate::proto::{GetLakeSnapshotResponse, PbLakeSnapshotForBucket}; -use crate::{BucketId, PartitionId, TableId}; +use crate::{BucketId, PartitionId, SnapshotId, TableId}; /// One bucket's slice of a lake snapshot. #[derive(Debug, Clone, PartialEq, Eq)] @@ -44,7 +44,7 @@ impl LakeBucketSnapshot { #[derive(Debug, Clone, PartialEq, Eq)] pub struct LakeSnapshotInfo { pub table_id: TableId, - pub snapshot_id: i64, + pub snapshot_id: SnapshotId, pub bucket_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index a796ed98..79755d4c 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -22,7 +22,7 @@ use crate::metadata::DataLakeFormat; use crate::metadata::datatype::{ DataField, DataType, RowType, UNASSIGNED_FIELD_ID, reassign_field_ids, }; -use crate::{BucketId, PartitionId, TableId}; +use crate::{BucketId, PartitionId, SnapshotId, TableId}; use core::fmt; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -1501,19 +1501,19 @@ impl Display for TableBucket { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LakeSnapshot { - pub snapshot_id: i64, + pub snapshot_id: SnapshotId, pub table_buckets_offset: HashMap, } impl LakeSnapshot { - pub fn new(snapshot_id: i64, table_buckets_offset: HashMap) -> Self { + pub fn new(snapshot_id: SnapshotId, table_buckets_offset: HashMap) -> Self { Self { snapshot_id, table_buckets_offset, } } - pub fn snapshot_id(&self) -> i64 { + pub fn snapshot_id(&self) -> SnapshotId { self.snapshot_id } diff --git a/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs index cf7f9a2c..6526295e 100644 --- a/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs +++ b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs @@ -18,7 +18,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; -use crate::{BucketId, PartitionId, TableId, impl_read_type, impl_write_type, proto}; +use crate::{BucketId, PartitionId, SnapshotId, TableId, impl_read_type, impl_write_type, proto}; use bytes::{Buf, BufMut}; use prost::Message; @@ -32,7 +32,7 @@ impl GetKvSnapshotMetadataRequest { table_id: TableId, partition_id: Option, bucket_id: BucketId, - snapshot_id: i64, + snapshot_id: SnapshotId, ) -> Self { GetKvSnapshotMetadataRequest { inner_request: proto::GetKvSnapshotMetadataRequest { diff --git a/crates/fluss/src/rpc/message/get_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_lake_snapshot.rs index 6273f2f3..f045b32e 100644 --- a/crates/fluss/src/rpc/message/get_lake_snapshot.rs +++ b/crates/fluss/src/rpc/message/get_lake_snapshot.rs @@ -20,7 +20,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::convert::to_table_path; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; -use crate::{impl_read_type, impl_write_type, proto}; +use crate::{SnapshotId, impl_read_type, impl_write_type, proto}; use bytes::{Buf, BufMut}; use prost::Message; @@ -30,7 +30,11 @@ pub struct GetLakeSnapshotRequest { } impl GetLakeSnapshotRequest { - pub fn new(table_path: &TablePath, snapshot_id: Option, readable: Option) -> Self { + pub fn new( + table_path: &TablePath, + snapshot_id: Option, + readable: Option, + ) -> Self { GetLakeSnapshotRequest { inner_request: proto::GetLakeSnapshotRequest { table_path: to_table_path(table_path),