diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index dee68c95..550335e0 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -18,6 +18,7 @@ use arrow::array::RecordBatch; use parking_lot::Mutex; +use crate::client::table::read_context_resolver::ReadContextResolver; use crate::client::table::remote_log::{ PrefetchPermit, RemoteLogDownloadFuture, RemoteLogFile, RemoteLogSegment, }; @@ -85,7 +86,7 @@ pub trait PendingFetch: Send + Sync { /// Thread-safe buffer for completed fetches pub struct LogFetchBuffer { - read_context: ReadContext, + resolver: Arc, completed_fetches: Mutex>>, pending_fetches: Mutex>>>, next_in_line_fetch: Mutex>>, @@ -94,9 +95,9 @@ pub struct LogFetchBuffer { } impl LogFetchBuffer { - pub fn new(read_context: ReadContext) -> Self { + pub fn new(resolver: Arc) -> Self { Self { - read_context, + resolver, completed_fetches: Mutex::new(VecDeque::new()), pending_fetches: Mutex::new(HashMap::new()), next_in_line_fetch: Mutex::new(None), @@ -168,7 +169,7 @@ impl LogFetchBuffer { api_error, fetch_error_context, fetch_offset, - self.read_context.clone(), + Arc::clone(&self.resolver), ); self.completed_fetches .lock() @@ -226,7 +227,7 @@ impl LogFetchBuffer { table_bucket.clone(), error, -1, - self.read_context.clone(), + Arc::clone(&self.resolver), ); completed_to_push.push(Box::new(error_fetch)); } @@ -339,7 +340,8 @@ pub struct DefaultCompletedFetch { fetch_error_context: Option, error: Option, log_record_batch: LogRecordsBatches, - read_context: ReadContext, + resolver: Arc, + is_remote: bool, next_fetch_offset: i64, high_watermark: i64, size_in_bytes: usize, @@ -358,7 +360,8 @@ impl DefaultCompletedFetch { table_bucket: TableBucket, log_record_batch: LogRecordsBatches, size_in_bytes: usize, - read_context: ReadContext, + resolver: Arc, + is_remote: bool, fetch_offset: i64, high_watermark: i64, ) -> Self { @@ -368,7 +371,8 @@ impl DefaultCompletedFetch { fetch_error_context: None, error: None, log_record_batch, - read_context, + resolver, + is_remote, next_fetch_offset: fetch_offset, high_watermark, size_in_bytes, @@ -387,7 +391,7 @@ impl DefaultCompletedFetch { table_bucket: TableBucket, error: Error, fetch_offset: i64, - read_context: ReadContext, + resolver: Arc, ) -> Self { Self { table_bucket, @@ -395,7 +399,8 @@ impl DefaultCompletedFetch { fetch_error_context: None, error: Some(error), log_record_batch: LogRecordsBatches::new(Vec::new()), - read_context, + resolver, + is_remote: false, next_fetch_offset: fetch_offset, high_watermark: -1, size_in_bytes: 0, @@ -415,7 +420,7 @@ impl DefaultCompletedFetch { api_error: ApiError, fetch_error_context: FetchErrorContext, fetch_offset: i64, - read_context: ReadContext, + resolver: Arc, ) -> Self { Self { table_bucket, @@ -423,7 +428,8 @@ impl DefaultCompletedFetch { fetch_error_context: Some(fetch_error_context), error: None, log_record_batch: LogRecordsBatches::new(Vec::new()), - read_context, + resolver, + is_remote: false, next_fetch_offset: fetch_offset, high_watermark: -1, size_in_bytes: 0, @@ -451,7 +457,8 @@ impl DefaultCompletedFetch { } } else if let Some(batch_result) = self.log_record_batch.next() { let batch = batch_result?; - self.current_record_iterator = Some(batch.records(&self.read_context)?); + let read_context = self.resolve_context_for_batch(&batch)?; + self.current_record_iterator = Some(batch.records(&read_context)?); self.current_record_batch = Some(batch); } else { if let Some(batch) = self.current_record_batch.take() { @@ -518,7 +525,8 @@ impl DefaultCompletedFetch { }; let log_batch = log_batch_result?; - let mut record_batch = log_batch.record_batch(&self.read_context)?; + let read_context = self.resolve_context_for_batch(&log_batch)?; + let mut record_batch = log_batch.record_batch(&read_context)?; // Skip empty batches if record_batch.num_rows() == 0 { @@ -544,6 +552,20 @@ impl DefaultCompletedFetch { return Ok(Some((record_batch, effective_base_offset))); } } + + /// Resolve the ReadContext for a given batch based on its schema_id. + fn resolve_context_for_batch(&self, batch: &LogRecordBatch) -> Result { + let schema_id = batch.schema_id(); + self.resolver + .resolve(schema_id, self.is_remote) + .ok_or_else(|| Error::UnexpectedError { + message: format!( + "No ReadContext found for schema_id {schema_id} in bucket {}", + self.table_bucket + ), + source: None, + }) + } } impl CompletedFetch for DefaultCompletedFetch { @@ -766,7 +788,7 @@ pub struct RemotePendingFetch { pos_in_log_segment: i32, fetch_offset: i64, high_watermark: i64, - read_context: ReadContext, + resolver: Arc, } impl RemotePendingFetch { @@ -776,7 +798,7 @@ impl RemotePendingFetch { pos_in_log_segment: i32, fetch_offset: i64, high_watermark: i64, - read_context: ReadContext, + resolver: Arc, ) -> Self { Self { segment, @@ -784,7 +806,7 @@ impl RemotePendingFetch { pos_in_log_segment, fetch_offset, high_watermark, - read_context, + resolver, } } } @@ -836,7 +858,8 @@ impl PendingFetch for RemotePendingFetch { self.segment.table_bucket.clone(), log_record_batch, size_in_bytes, - self.read_context, + self.resolver, + true, // is_remote self.fetch_offset, self.high_watermark, ); @@ -852,6 +875,7 @@ impl PendingFetch for RemotePendingFetch { mod tests { use super::*; use crate::client::WriteRecord; + use crate::client::table::read_context_resolver::ReadContextResolver; use crate::compression::{ ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, @@ -865,13 +889,15 @@ mod tests { use crate::test_utils::build_table_info; use std::sync::Arc; - fn test_read_context() -> Result { + fn test_resolver() -> Result> { let row_type = RowType::new(vec![DataField::new("id", DataTypes::int(), None)]); - Ok(ReadContext::new( - to_arrow_schema(&row_type)?, - Arc::new(row_type), - false, - )) + let arrow_schema = to_arrow_schema(&row_type)?; + let row_type_arc = Arc::new(row_type); + let local_ctx = ReadContext::new(arrow_schema.clone(), row_type_arc.clone(), false); + let remote_ctx = ReadContext::new(arrow_schema, row_type_arc, true); + Ok(Arc::new(ReadContextResolver::new( + 1, local_ctx, remote_ctx, None, + ))) } struct ErrorPendingFetch { @@ -897,7 +923,7 @@ mod tests { #[tokio::test] async fn await_not_empty_returns_wakeup_error() { - let buffer = LogFetchBuffer::new(test_read_context().unwrap()); + let buffer = LogFetchBuffer::new(test_resolver().unwrap()); buffer.wakeup(); let result = buffer.await_not_empty(Duration::from_millis(10)).await; @@ -906,7 +932,7 @@ mod tests { #[tokio::test] async fn await_not_empty_returns_pending_error() { - let buffer = LogFetchBuffer::new(test_read_context().unwrap()); + let buffer = LogFetchBuffer::new(test_resolver().unwrap()); let table_bucket = TableBucket::new(1, 0); buffer.pend(Box::new(ErrorPendingFetch { table_bucket: table_bucket.clone(), @@ -950,12 +976,17 @@ mod tests { let data = builder.build()?; let log_records = LogRecordsBatches::new(data.clone()); - let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false); + let arrow_schema = to_arrow_schema(&row_type)?; + let row_type_arc = Arc::new(row_type); + let local_ctx = ReadContext::new(arrow_schema.clone(), row_type_arc.clone(), false); + let remote_ctx = ReadContext::new(arrow_schema, row_type_arc, true); + let resolver = Arc::new(ReadContextResolver::new(1, local_ctx, remote_ctx, None)); let mut fetch = DefaultCompletedFetch::new( TableBucket::new(1, 0), log_records, data.len(), - read_context, + resolver, + false, 0, 0, ); @@ -1018,12 +1049,19 @@ mod tests { let new_len = ((data.len() - LOG_OVERHEAD) as i32).to_le_bytes(); data[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH].copy_from_slice(&new_len); - let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false); + let read_context = ReadContext::new( + to_arrow_schema(&row_type)?, + Arc::new(row_type.clone()), + false, + ); + let remote_ctx = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), true); + let resolver = Arc::new(ReadContextResolver::new(1, read_context, remote_ctx, None)); let mut fetch = DefaultCompletedFetch::new( TableBucket::new(1, 0), LogRecordsBatches::new(data.clone()), data.len(), - read_context, + resolver, + false, 0, 0, ); diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 657a44bf..41cbadab 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -30,6 +30,7 @@ mod lookup; mod log_fetch_buffer; mod partition_getter; +mod read_context_resolver; mod reader; mod remote_log; mod scanner; diff --git a/crates/fluss/src/client/table/read_context_resolver.rs b/crates/fluss/src/client/table/read_context_resolver.rs new file mode 100644 index 00000000..cf8f373c --- /dev/null +++ b/crates/fluss/src/client/table/read_context_resolver.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-schema `ReadContext` cache for schema evolution support. +//! +//! In DYNAMIC mode (no projection), records are returned with their write-time +//! schema: old-schema batches return fewer columns, new-schema batches return +//! more columns. +//! +//! When projection is active, the schema is pinned at scanner creation time +//! and all batches use the initial ReadContext regardless of schema_id. + +use crate::error::Result; +use crate::metadata::Schema; +use crate::record::{ReadContext, to_arrow_schema}; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; + +/// Resolves `ReadContext` per schema version to support schema evolution. +pub(crate) struct ReadContextResolver { + /// Schema ID at scanner creation time. + initial_schema_id: i16, + /// ReadContexts keyed by schema_id. Contains both local and remote contexts. + contexts: RwLock>, + /// When Some, projection is active and schema is pinned to the initial one. + projected_fields: Option>, +} + +/// A pair of ReadContexts for local and remote reads. +struct ResolvedContexts { + local: ReadContext, + remote: ReadContext, +} + +impl ReadContextResolver { + /// Create a new resolver with the initial schema's ReadContexts. + pub fn new( + initial_schema_id: i16, + local_context: ReadContext, + remote_context: ReadContext, + projected_fields: Option>, + ) -> Self { + let mut map = HashMap::new(); + map.insert( + initial_schema_id, + ResolvedContexts { + local: local_context, + remote: remote_context, + }, + ); + Self { + initial_schema_id, + contexts: RwLock::new(map), + projected_fields, + } + } + + /// Resolve the ReadContext for the given schema_id. + /// Returns the initial context if projection is active (schema pinned). + /// Returns None if the schema_id is not yet cached. + pub fn resolve(&self, schema_id: i16, is_remote: bool) -> Option { + // If projection is active, always return the initial context + let effective_id = if self.projected_fields.is_some() { + self.initial_schema_id + } else { + schema_id + }; + + let guard = self.contexts.read(); + guard.get(&effective_id).map(|ctx| { + if is_remote { + ctx.remote.clone() + } else { + ctx.local.clone() + } + }) + } + + /// Check if a schema_id is already cached. + pub fn contains(&self, schema_id: i16) -> bool { + if self.projected_fields.is_some() { + // projection pinned, always have the answer + true + } else { + self.contexts.read().contains_key(&schema_id) + } + } + + /// Register a new schema by its ID. Builds ReadContexts from the Schema. + /// No-op if already cached or if projection is active. + pub fn register_schema(&self, schema_id: i16, schema: &Schema) -> Result<()> { + if self.projected_fields.is_some() { + // Projection pins the schema, no need to register new ones + return Ok(()); + } + if self.contexts.read().contains_key(&schema_id) { + return Ok(()); + } + + let row_type = schema.row_type(); + let arrow_schema = to_arrow_schema(row_type)?; + let row_type_arc = Arc::new(row_type.clone()); + + let local_context = ReadContext::new(arrow_schema.clone(), row_type_arc.clone(), false) + .with_fluss_row_type(row_type_arc.clone()); + let remote_context = ReadContext::new(arrow_schema, row_type_arc.clone(), true) + .with_fluss_row_type(row_type_arc); + + self.contexts.write().insert( + schema_id, + ResolvedContexts { + local: local_context, + remote: remote_context, + }, + ); + Ok(()) + } + + /// Returns the initial schema ID. + pub fn initial_schema_id(&self) -> i16 { + self.initial_schema_id + } + + /// Returns the projection fields if active, used for fetch request pushdown. + #[allow(dead_code)] + pub fn projected_fields(&self) -> Option<&[usize]> { + self.projected_fields.as_deref() + } +} + +/// Extract all unique schema_ids from raw log record batch bytes. +/// +/// Scans through the concatenated batch buffer reading each batch header +/// to extract the schema_id field. Used to pre-resolve schemas asynchronously +/// before synchronous record decoding. +pub(crate) fn extract_schema_ids(data: &[u8]) -> Vec { + use crate::record::{LENGTH_OFFSET, LOG_OVERHEAD, SCHEMA_ID_OFFSET}; + use byteorder::{ByteOrder, LittleEndian}; + + let mut schema_ids = Vec::new(); + let mut seen = std::collections::HashSet::new(); + let mut pos = 0; + + while pos + LOG_OVERHEAD <= data.len() { + // Read batch length at LENGTH_OFFSET within the current batch + let length_pos = pos + LENGTH_OFFSET; + if length_pos + 4 > data.len() { + break; + } + let batch_size_bytes = LittleEndian::read_i32(&data[length_pos..length_pos + 4]); + if batch_size_bytes < 0 { + break; + } + let batch_total_size = batch_size_bytes as usize + LOG_OVERHEAD; + + // Read schema_id + let schema_id_pos = pos + SCHEMA_ID_OFFSET; + if schema_id_pos + 2 > data.len() { + break; + } + let schema_id = LittleEndian::read_i16(&data[schema_id_pos..schema_id_pos + 2]); + if seen.insert(schema_id) { + schema_ids.push(schema_id); + } + + // Advance to next batch + pos += batch_total_size; + } + + schema_ids +} diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 287fcd06..24eee1d8 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -24,6 +24,7 @@ use crate::client::table::log_fetch_buffer::{ CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, LogFetchBuffer, RemotePendingFetch, }; +use crate::client::table::read_context_resolver::{ReadContextResolver, extract_schema_ids}; use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo}; use crate::config::Config; use crate::error::Error::UnsupportedOperation; @@ -326,12 +327,14 @@ impl<'a> TableScan<'a> { pub fn create_log_scanner(self) -> Result { self.reject_limit("LogScanner")?; validate_scan_support_inner(&self.table_info.table_path, &self.table_info, true)?; + let admin = self.conn.get_admin()?; let inner = LogScannerInner::new( &self.table_info, self.metadata.clone(), self.conn.get_connections(), self.conn.config(), self.projected_fields, + admin, )?; Ok(LogScanner { inner: Arc::new(inner), @@ -347,12 +350,14 @@ impl<'a> TableScan<'a> { pub fn create_record_batch_log_scanner(self) -> Result { self.reject_limit("RecordBatchLogScanner")?; validate_scan_support(&self.table_info.table_path, &self.table_info)?; + let admin = self.conn.get_admin()?; let inner = LogScannerInner::new( &self.table_info, self.metadata.clone(), self.conn.get_connections(), self.conn.config(), self.projected_fields, + admin, )?; Ok(RecordBatchLogScanner { inner: Arc::new(inner), @@ -537,6 +542,7 @@ impl LogScannerInner { connections: Arc, config: &Config, projected_fields: Option>, + admin: Arc, ) -> Result { let log_scanner_status = Arc::new(LogScannerStatus::new()); @@ -553,6 +559,15 @@ impl LogScannerInner { None => to_arrow_schema(full_row_type)?, }; + // Create schema getter for schema evolution support + let latest_schema = + SchemaInfo::new(table_info.get_schema().clone(), table_info.get_schema_id()); + let schema_getter = Arc::new(ClientSchemaGetter::new( + table_info.table_path.clone(), + admin, + latest_schema, + )); + let metrics = Arc::new(ScannerMetrics::new(&table_info.table_path)); let last_poll_unix_ms = Arc::new(AtomicI64::new(0)); let last_poll_seconds_ago_task = spawn_last_poll_seconds_ago_ticker( @@ -573,6 +588,7 @@ impl LogScannerInner { config, projected_fields, Arc::clone(&metrics), + schema_getter, )?, arrow_schema, reader_active: std::sync::atomic::AtomicBool::new(false), @@ -1122,8 +1138,8 @@ struct LogFetcher { table_path: TablePath, is_partitioned: bool, log_scanner_status: Arc, - read_context: ReadContext, - remote_read_context: ReadContext, + resolver: Arc, + schema_getter: Arc, remote_log_downloader: Arc, /// Background security token manager for remote filesystem access. /// Kept alive to run the background refresh task; stopped on drop. @@ -1145,8 +1161,8 @@ struct FetchResponseContext { metadata: Arc, log_fetch_buffer: Arc, log_scanner_status: Arc, - read_context: ReadContext, - remote_read_context: ReadContext, + resolver: Arc, + schema_getter: Arc, remote_log_downloader: Arc, /// Per-table scanner metric handles for `scanner.fetch_*` recording. metrics: Arc, @@ -1156,6 +1172,7 @@ struct FetchResponseContext { } impl LogFetcher { + #[allow(clippy::too_many_arguments)] pub fn new( table_info: TableInfo, conns: Arc, @@ -1164,6 +1181,7 @@ impl LogFetcher { config: &Config, projected_fields: Option>, metrics: Arc, + schema_getter: Arc, ) -> Result { let full_row_type = table_info.get_row_type(); let full_arrow_schema = to_arrow_schema(full_row_type)?; @@ -1191,8 +1209,16 @@ impl LogFetcher { )? .with_fluss_row_type(projected_row_type); + let initial_schema_id = table_info.get_schema_id() as i16; + let resolver = Arc::new(ReadContextResolver::new( + initial_schema_id, + read_context, + remote_read_context, + projected_fields, + )); + let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; - let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone())); + let log_fetch_buffer = Arc::new(LogFetchBuffer::new(Arc::clone(&resolver))); // Create security token manager for background token refresh let security_token_manager = @@ -1219,8 +1245,8 @@ impl LogFetcher { table_path: table_info.table_path.clone(), is_partitioned: table_info.is_partitioned(), log_scanner_status, - read_context, - remote_read_context, + resolver, + schema_getter, remote_log_downloader, security_token_manager, log_fetch_buffer, @@ -1393,8 +1419,8 @@ impl LogFetcher { let conns = Arc::clone(&self.conns); let log_fetch_buffer = self.log_fetch_buffer.clone(); let log_scanner_status = self.log_scanner_status.clone(); - let read_context = self.read_context.clone(); - let remote_read_context = self.remote_read_context.clone(); + let resolver = Arc::clone(&self.resolver); + let schema_getter = Arc::clone(&self.schema_getter); let remote_log_downloader = Arc::clone(&self.remote_log_downloader); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); let metadata = self.metadata.clone(); @@ -1456,8 +1482,8 @@ impl LogFetcher { metadata: metadata.clone(), log_fetch_buffer, log_scanner_status, - read_context, - remote_read_context, + resolver, + schema_getter, remote_log_downloader, metrics, request_start_time, @@ -1487,8 +1513,8 @@ impl LogFetcher { metadata, log_fetch_buffer, log_scanner_status, - read_context, - remote_read_context, + resolver, + schema_getter, remote_log_downloader, metrics, request_start_time, @@ -1581,7 +1607,7 @@ impl LogFetcher { Self::pending_remote_fetches( remote_log_downloader.clone(), log_fetch_buffer.clone(), - remote_read_context.clone(), + Arc::clone(&resolver), &table_bucket, remote_fetch_info, fetch_offset, @@ -1592,13 +1618,35 @@ impl LogFetcher { let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); let records = fetch_log_for_bucket.records.unwrap_or(vec![]); let size_in_bytes = records.len(); - let log_record_batch = LogRecordsBatches::new(records); + // Pre-resolve schemas for schema evolution support + let schema_ids = extract_schema_ids(&records); + for schema_id in schema_ids { + if !resolver.contains(schema_id) { + match schema_getter.get_schema(schema_id as i32).await { + Ok(schema) => { + if let Err(e) = resolver.register_schema(schema_id, &schema) { + warn!( + "Failed to register schema {schema_id} for bucket {table_bucket}: {e}" + ); + } + } + Err(e) => { + warn!( + "Failed to fetch schema {schema_id} for bucket {table_bucket}: {e}" + ); + } + } + } + } + + let log_record_batch = LogRecordsBatches::new(records); let completed_fetch = DefaultCompletedFetch::new( table_bucket.clone(), log_record_batch, size_in_bytes, - read_context.clone(), + Arc::clone(&resolver), + false, // is_remote fetch_offset, high_watermark, ); @@ -1611,7 +1659,7 @@ impl LogFetcher { fn pending_remote_fetches( remote_log_downloader: Arc, log_fetch_buffer: Arc, - read_context: ReadContext, + resolver: Arc, table_bucket: &TableBucket, remote_fetch_info: RemoteLogFetchInfo, fetch_offset: i64, @@ -1647,7 +1695,7 @@ impl LogFetcher { pos_in_log_segment, current_fetch_offset, high_watermark, - read_context.clone(), + Arc::clone(&resolver), ); // Add to pending fetches in buffer (similar to Java's logFetchBuffer.pend) log_fetch_buffer.pend(Box::new(pending_fetch)); @@ -2057,11 +2105,15 @@ impl LogFetcher { if ready_for_fetch_count == 0 { HashMap::new() } else { - let (projection_enabled, projected_fields) = - match self.read_context.project_fields_in_order() { - None => (false, vec![]), - Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), - }; + let initial_ctx = self + .resolver + .resolve(self.resolver.initial_schema_id(), false) + .expect("initial ReadContext must exist"); + let (projection_enabled, projected_fields) = match initial_ctx.project_fields_in_order() + { + None => (false, vec![]), + Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()), + }; fetch_log_req_for_buckets .into_iter() @@ -2280,7 +2332,9 @@ fn validate_scan_support_inner( mod tests { use super::*; use crate::client::WriteRecord; + use crate::client::admin::FlussAdmin; use crate::client::metadata::Metadata; + use crate::client::table::read_context_resolver::ReadContextResolver; use crate::compression::{ ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, @@ -2294,6 +2348,41 @@ mod tests { assert_scanner_entries_labeled, build_cluster_arc, build_table_info, test_scanner_metrics, }; + fn test_admin(metadata: &Arc) -> Arc { + Arc::new(FlussAdmin::new( + Arc::new(RpcClient::new()), + metadata.clone(), + )) + } + + fn test_schema_getter( + table_info: &TableInfo, + metadata: &Arc, + ) -> Arc { + let latest = SchemaInfo::new(table_info.get_schema().clone(), table_info.get_schema_id()); + Arc::new(ClientSchemaGetter::new( + table_info.table_path.clone(), + test_admin(metadata), + latest, + )) + } + + fn test_resolver(table_info: &TableInfo) -> Arc { + let row_type = table_info.get_row_type(); + let arrow_schema = to_arrow_schema(row_type).unwrap(); + let row_type_arc = Arc::new(row_type.clone()); + let local_ctx = ReadContext::new(arrow_schema.clone(), row_type_arc.clone(), false) + .with_fluss_row_type(row_type_arc.clone()); + let remote_ctx = ReadContext::new(arrow_schema, row_type_arc.clone(), true) + .with_fluss_row_type(row_type_arc); + Arc::new(ReadContextResolver::new( + table_info.get_schema_id() as i16, + local_ctx, + remote_ctx, + None, + )) + } + fn build_records(table_info: &TableInfo, table_path: Arc) -> Result> { let mut builder = MemoryLogRecordsArrowBuilder::new( 1, @@ -2326,11 +2415,12 @@ mod tests { let fetcher = LogFetcher::new( table_info.clone(), Arc::new(RpcClient::new()), - metadata, + metadata.clone(), status.clone(), &Config::default(), None, test_scanner_metrics(&table_path), + test_schema_getter(&table_info, &metadata), )?; let bucket = TableBucket::new(1, 0); @@ -2338,10 +2428,16 @@ mod tests { let data = build_records(&table_info, Arc::new(table_path))?; let log_records = LogRecordsBatches::new(data.clone()); - let row_type = Arc::new(table_info.get_row_type().clone()); - let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false); - let completed = - DefaultCompletedFetch::new(bucket.clone(), log_records, data.len(), read_context, 0, 0); + let resolver = test_resolver(&table_info); + let completed = DefaultCompletedFetch::new( + bucket.clone(), + log_records, + data.len(), + resolver, + false, + 0, + 0, + ); fetcher.log_fetch_buffer.add(Box::new(completed)); let fetched = fetcher.collect_fetches()?; @@ -2360,23 +2456,24 @@ mod tests { let fetcher = LogFetcher::new( table_info.clone(), Arc::new(RpcClient::new()), - metadata, + metadata.clone(), status, &Config::default(), None, test_scanner_metrics(&table_path), + test_schema_getter(&table_info, &metadata), )?; let bucket = TableBucket::new(1, 0); let data = build_records(&table_info, Arc::new(table_path))?; let log_records = LogRecordsBatches::new(data.clone()); - let row_type = Arc::new(table_info.get_row_type().clone()); - let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false); + let resolver = test_resolver(&table_info); let mut completed: Box = Box::new(DefaultCompletedFetch::new( bucket, log_records, data.len(), - read_context, + resolver, + false, 0, 0, )); @@ -2396,13 +2493,14 @@ mod tests { let status = Arc::new(LogScannerStatus::new()); status.assign_scan_bucket(TableBucket::new(1, 0), 0); let fetcher = LogFetcher::new( - table_info, + table_info.clone(), Arc::new(RpcClient::new()), - metadata, + metadata.clone(), status, &Config::default(), None, test_scanner_metrics(&table_path), + test_schema_getter(&table_info, &metadata), )?; fetcher.nodes_with_pending_fetch_requests.lock().insert(1); @@ -2428,6 +2526,7 @@ mod tests { &Config::default(), None, test_scanner_metrics(&table_path), + test_schema_getter(&table_info, &metadata), )?; let response = FetchLogResponse { @@ -2451,8 +2550,8 @@ mod tests { metadata: metadata.clone(), log_fetch_buffer: fetcher.log_fetch_buffer.clone(), log_scanner_status: fetcher.log_scanner_status.clone(), - read_context: fetcher.read_context.clone(), - remote_read_context: fetcher.remote_read_context.clone(), + resolver: Arc::clone(&fetcher.resolver), + schema_getter: Arc::clone(&fetcher.schema_getter), remote_log_downloader: fetcher.remote_log_downloader.clone(), metrics: Arc::clone(&fetcher.metrics), request_start_time: Instant::now(), @@ -2482,6 +2581,7 @@ mod tests { &Config::default(), None, test_scanner_metrics(&table_path), + test_schema_getter(&table_info, &metadata), )?; let bucket = TableBucket::new(1, 0); @@ -2508,8 +2608,8 @@ mod tests { metadata: metadata.clone(), log_fetch_buffer: fetcher.log_fetch_buffer.clone(), log_scanner_status: fetcher.log_scanner_status.clone(), - read_context: fetcher.read_context.clone(), - remote_read_context: fetcher.remote_read_context.clone(), + resolver: Arc::clone(&fetcher.resolver), + schema_getter: Arc::clone(&fetcher.schema_getter), remote_log_downloader: fetcher.remote_log_downloader.clone(), metrics: Arc::clone(&fetcher.metrics), request_start_time: Instant::now(), @@ -2608,13 +2708,14 @@ mod tests { }; let fetcher = LogFetcher::new( - table_info, + table_info.clone(), Arc::new(RpcClient::new()), - metadata, + metadata.clone(), status, &config, None, test_scanner_metrics(&table_path), + test_schema_getter(&table_info, &metadata), )?; let requests = fetcher.prepare_fetch_log_requests().await; @@ -2648,12 +2749,18 @@ mod tests { let table_info = build_table_info(table_path.clone(), 1, 1); let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster)); + let rpc_client = Arc::new(RpcClient::new()); + let admin = Arc::new(crate::client::admin::FlussAdmin::new( + rpc_client.clone(), + metadata.clone(), + )); let inner = LogScannerInner::new( &table_info, metadata, - Arc::new(RpcClient::new()), + rpc_client, &Config::default(), None, + admin, ) .expect("build LogScannerInner"); body(&inner); @@ -2813,13 +2920,14 @@ mod tests { let status = Arc::new(LogScannerStatus::new()); status.assign_scan_bucket(TableBucket::new(1, 0), 5); let fetcher = LogFetcher::new( - table_info, + table_info.clone(), Arc::new(RpcClient::new()), metadata.clone(), status, &Config::default(), None, test_scanner_metrics(&table_path), + test_schema_getter(&table_info, &metadata), ) .expect("build LogFetcher"); @@ -2844,8 +2952,8 @@ mod tests { metadata: metadata.clone(), log_fetch_buffer: fetcher.log_fetch_buffer.clone(), log_scanner_status: fetcher.log_scanner_status.clone(), - read_context: fetcher.read_context.clone(), - remote_read_context: fetcher.remote_read_context.clone(), + resolver: Arc::clone(&fetcher.resolver), + schema_getter: Arc::clone(&fetcher.schema_getter), remote_log_downloader: fetcher.remote_log_downloader.clone(), metrics: Arc::clone(&fetcher.metrics), request_start_time: Instant::now(), @@ -3156,12 +3264,18 @@ mod tests { let table_info = build_table_info(table_path.clone(), 1, 1); let cluster = build_cluster_arc(&table_path, 1, 1); let metadata = Arc::new(Metadata::new_for_test(cluster)); + let rpc_client = Arc::new(RpcClient::new()); + let admin = Arc::new(crate::client::admin::FlussAdmin::new( + rpc_client.clone(), + metadata.clone(), + )); let inner = LogScannerInner::new( &table_info, metadata, - Arc::new(RpcClient::new()), + rpc_client, &Config::default(), None, + admin, ) .expect("build LogScannerInner"); diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 625f48ac..db59f291 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -26,13 +26,16 @@ mod table_test { }; use arrow::array::record_batch; use fluss::client::{EARLIEST_OFFSET, FlussTable, TableScan}; - use fluss::metadata::{DataField, DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::metadata::{ + AddColumn, AlterTableChanges, ColumnPositionType, DataField, DataTypes, JsonSerde, Schema, + TableDescriptor, TablePath, + }; use fluss::record::ScanRecord; use fluss::row::binary_array::FlussArrayWriter; use fluss::row::binary_map::FlussMapWriter; use fluss::row::{ - DataGetters, Date, Datum, Decimal, GenericRow, InternalArray, InternalMap, Time, - TimestampLtz, TimestampNtz, + DataGetters, Date, Datum, Decimal, GenericRow, InternalArray, InternalMap, InternalRow, + Time, TimestampLtz, TimestampNtz, }; use fluss::rpc::message::OffsetSpec; use std::collections::HashMap; @@ -1949,4 +1952,175 @@ mod table_test { admin.drop_table(&table_path, false).await.expect("drop"); } + + /// Test that a single log scanner can read records across a schema change + /// (add column). Records written before the change should have NULL for the + /// new column, and records written after should carry the new column's value. + #[tokio::test] + async fn schema_evolution_add_column_log_scanner() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_schema_evolution_log_scanner"); + + // 1. Create table with initial schema: (id INT, name STRING) + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + wait_for_table_ready(&admin, &table_path).await; + + // 2. Get table handle and create scanner + subscribe from EARLIEST + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + let num_buckets = table.get_table_info().get_num_buckets(); + for bucket_id in 0..num_buckets { + log_scanner + .subscribe(bucket_id, EARLIEST_OFFSET) + .await + .expect("Failed to subscribe"); + } + + // 3. Write records with old schema + let writer_v0 = table + .new_append() + .expect("append") + .create_writer() + .expect("writer"); + writer_v0 + .append_arrow_batch( + record_batch!( + ("id", Int32, [1, 2, 3]), + ("name", Utf8, ["alice", "bob", "charlie"]) + ) + .unwrap(), + ) + .expect("Failed to append old-schema batch"); + writer_v0.flush().await.expect("flush"); + + // 4. Poll old-schema records + let mut old_records: Vec<(i32, String)> = Vec::new(); + let start = std::time::Instant::now(); + while old_records.len() < 3 && start.elapsed() < Duration::from_secs(10) { + let recs = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("poll"); + for rec in recs { + let row = rec.row(); + old_records.push(( + row.get_int(0).unwrap(), + row.get_string(1).unwrap().to_string(), + )); + } + } + assert_eq!(old_records.len(), 3, "Expected 3 old-schema records"); + old_records.sort_by_key(|r| r.0); + assert_eq!( + old_records, + vec![ + (1, "alice".to_string()), + (2, "bob".to_string()), + (3, "charlie".to_string()), + ] + ); + + // 5. Alter table: add column "age INT" + let age_type_json = serde_json::to_vec( + &DataTypes::int() + .serialize_json() + .expect("serialize INT type"), + ) + .expect("to_vec"); + admin + .alter_table( + &table_path, + false, + AlterTableChanges { + add_columns: vec![AddColumn { + column_name: "age".to_string(), + data_type_json: age_type_json, + comment: None, + position: ColumnPositionType::Last, + }], + ..Default::default() + }, + ) + .await + .expect("Failed to alter table"); + + // 6. Get a new table handle with the updated schema and write new records + let table_v1 = connection + .get_table(&table_path) + .await + .expect("Failed to get updated table"); + let writer_v1 = table_v1 + .new_append() + .expect("append") + .create_writer() + .expect("writer"); + writer_v1 + .append_arrow_batch( + record_batch!( + ("id", Int32, [4, 5, 6]), + ("name", Utf8, ["dave", "eve", "frank"]), + ("age", Int32, [30, 25, 40]) + ) + .unwrap(), + ) + .expect("Failed to append new-schema batch"); + writer_v1.flush().await.expect("flush"); + + // 7. Continue reading from the SAME scanner — it should handle schema + // evolution and return the new-schema records with the extra column. + let mut new_records: Vec<(i32, String, Option)> = Vec::new(); + let start = std::time::Instant::now(); + while new_records.len() < 3 && start.elapsed() < Duration::from_secs(10) { + let recs = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("poll"); + for rec in recs { + let row = rec.row(); + let id = row.get_int(0).unwrap(); + let name = row.get_string(1).unwrap().to_string(); + let age = if row.get_field_count() > 2 && !row.is_null_at(2).unwrap_or(true) { + Some(row.get_int(2).unwrap()) + } else { + None + }; + new_records.push((id, name, age)); + } + } + assert_eq!(new_records.len(), 3, "Expected 3 new-schema records"); + new_records.sort_by_key(|r| r.0); + assert_eq!( + new_records, + vec![ + (4, "dave".to_string(), Some(30)), + (5, "eve".to_string(), Some(25)), + (6, "frank".to_string(), Some(40)), + ] + ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } }