Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 68 additions & 30 deletions crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::RecordBatch;
use parking_lot::Mutex;

use crate::client::table::read_context_resolver::ReadContextResolver;
use crate::client::table::remote_log::{
PrefetchPermit, RemoteLogDownloadFuture, RemoteLogFile, RemoteLogSegment,
};
Expand Down Expand Up @@ -85,7 +86,7 @@ pub trait PendingFetch: Send + Sync {

/// Thread-safe buffer for completed fetches
pub struct LogFetchBuffer {
read_context: ReadContext,
resolver: Arc<ReadContextResolver>,
completed_fetches: Mutex<VecDeque<Box<dyn CompletedFetch>>>,
pending_fetches: Mutex<HashMap<TableBucket, VecDeque<Box<dyn PendingFetch>>>>,
next_in_line_fetch: Mutex<Option<Box<dyn CompletedFetch>>>,
Expand All @@ -94,9 +95,9 @@ pub struct LogFetchBuffer {
}

impl LogFetchBuffer {
pub fn new(read_context: ReadContext) -> Self {
pub fn new(resolver: Arc<ReadContextResolver>) -> Self {
Self {
read_context,
resolver,
completed_fetches: Mutex::new(VecDeque::new()),
pending_fetches: Mutex::new(HashMap::new()),
next_in_line_fetch: Mutex::new(None),
Expand Down Expand Up @@ -168,7 +169,7 @@ impl LogFetchBuffer {
api_error,
fetch_error_context,
fetch_offset,
self.read_context.clone(),
Arc::clone(&self.resolver),
);
self.completed_fetches
.lock()
Expand Down Expand Up @@ -226,7 +227,7 @@ impl LogFetchBuffer {
table_bucket.clone(),
error,
-1,
self.read_context.clone(),
Arc::clone(&self.resolver),
);
completed_to_push.push(Box::new(error_fetch));
}
Expand Down Expand Up @@ -339,7 +340,8 @@ pub struct DefaultCompletedFetch {
fetch_error_context: Option<FetchErrorContext>,
error: Option<Error>,
log_record_batch: LogRecordsBatches,
read_context: ReadContext,
resolver: Arc<ReadContextResolver>,
is_remote: bool,
next_fetch_offset: i64,
high_watermark: i64,
size_in_bytes: usize,
Expand All @@ -358,7 +360,8 @@ impl DefaultCompletedFetch {
table_bucket: TableBucket,
log_record_batch: LogRecordsBatches,
size_in_bytes: usize,
read_context: ReadContext,
resolver: Arc<ReadContextResolver>,
is_remote: bool,
fetch_offset: i64,
high_watermark: i64,
) -> Self {
Expand All @@ -368,7 +371,8 @@ impl DefaultCompletedFetch {
fetch_error_context: None,
error: None,
log_record_batch,
read_context,
resolver,
is_remote,
next_fetch_offset: fetch_offset,
high_watermark,
size_in_bytes,
Expand All @@ -387,15 +391,16 @@ impl DefaultCompletedFetch {
table_bucket: TableBucket,
error: Error,
fetch_offset: i64,
read_context: ReadContext,
resolver: Arc<ReadContextResolver>,
) -> Self {
Self {
table_bucket,
api_error: None,
fetch_error_context: None,
error: Some(error),
log_record_batch: LogRecordsBatches::new(Vec::new()),
read_context,
resolver,
is_remote: false,
next_fetch_offset: fetch_offset,
high_watermark: -1,
size_in_bytes: 0,
Expand All @@ -415,15 +420,16 @@ impl DefaultCompletedFetch {
api_error: ApiError,
fetch_error_context: FetchErrorContext,
fetch_offset: i64,
read_context: ReadContext,
resolver: Arc<ReadContextResolver>,
) -> Self {
Self {
table_bucket,
api_error: Some(api_error),
fetch_error_context: Some(fetch_error_context),
error: None,
log_record_batch: LogRecordsBatches::new(Vec::new()),
read_context,
resolver,
is_remote: false,
next_fetch_offset: fetch_offset,
high_watermark: -1,
size_in_bytes: 0,
Expand Down Expand Up @@ -451,7 +457,8 @@ impl DefaultCompletedFetch {
}
} else if let Some(batch_result) = self.log_record_batch.next() {
let batch = batch_result?;
self.current_record_iterator = Some(batch.records(&self.read_context)?);
let read_context = self.resolve_context_for_batch(&batch)?;
self.current_record_iterator = Some(batch.records(&read_context)?);
self.current_record_batch = Some(batch);
} else {
if let Some(batch) = self.current_record_batch.take() {
Expand Down Expand Up @@ -518,7 +525,8 @@ impl DefaultCompletedFetch {
};

let log_batch = log_batch_result?;
let mut record_batch = log_batch.record_batch(&self.read_context)?;
let read_context = self.resolve_context_for_batch(&log_batch)?;
let mut record_batch = log_batch.record_batch(&read_context)?;

// Skip empty batches
if record_batch.num_rows() == 0 {
Expand All @@ -544,6 +552,20 @@ impl DefaultCompletedFetch {
return Ok(Some((record_batch, effective_base_offset)));
}
}

/// Resolve the ReadContext for a given batch based on its schema_id.
fn resolve_context_for_batch(&self, batch: &LogRecordBatch) -> Result<ReadContext> {
let schema_id = batch.schema_id();
self.resolver
.resolve(schema_id, self.is_remote)
.ok_or_else(|| Error::UnexpectedError {
message: format!(
"No ReadContext found for schema_id {schema_id} in bucket {}",
self.table_bucket
),
source: None,
})
}
}

impl CompletedFetch for DefaultCompletedFetch {
Expand Down Expand Up @@ -766,7 +788,7 @@ pub struct RemotePendingFetch {
pos_in_log_segment: i32,
fetch_offset: i64,
high_watermark: i64,
read_context: ReadContext,
resolver: Arc<ReadContextResolver>,
}

impl RemotePendingFetch {
Expand All @@ -776,15 +798,15 @@ impl RemotePendingFetch {
pos_in_log_segment: i32,
fetch_offset: i64,
high_watermark: i64,
read_context: ReadContext,
resolver: Arc<ReadContextResolver>,
) -> Self {
Self {
segment,
download_future,
pos_in_log_segment,
fetch_offset,
high_watermark,
read_context,
resolver,
}
}
}
Expand Down Expand Up @@ -836,7 +858,8 @@ impl PendingFetch for RemotePendingFetch {
self.segment.table_bucket.clone(),
log_record_batch,
size_in_bytes,
self.read_context,
self.resolver,
true, // is_remote
self.fetch_offset,
self.high_watermark,
);
Expand All @@ -852,6 +875,7 @@ impl PendingFetch for RemotePendingFetch {
mod tests {
use super::*;
use crate::client::WriteRecord;
use crate::client::table::read_context_resolver::ReadContextResolver;
use crate::compression::{
ArrowCompressionInfo, ArrowCompressionRatioEstimator, ArrowCompressionType,
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
Expand All @@ -865,13 +889,15 @@ mod tests {
use crate::test_utils::build_table_info;
use std::sync::Arc;

fn test_read_context() -> Result<ReadContext> {
fn test_resolver() -> Result<Arc<ReadContextResolver>> {
let row_type = RowType::new(vec![DataField::new("id", DataTypes::int(), None)]);
Ok(ReadContext::new(
to_arrow_schema(&row_type)?,
Arc::new(row_type),
false,
))
let arrow_schema = to_arrow_schema(&row_type)?;
let row_type_arc = Arc::new(row_type);
let local_ctx = ReadContext::new(arrow_schema.clone(), row_type_arc.clone(), false);
let remote_ctx = ReadContext::new(arrow_schema, row_type_arc, true);
Ok(Arc::new(ReadContextResolver::new(
1, local_ctx, remote_ctx, None,
)))
}

struct ErrorPendingFetch {
Expand All @@ -897,7 +923,7 @@ mod tests {

#[tokio::test]
async fn await_not_empty_returns_wakeup_error() {
let buffer = LogFetchBuffer::new(test_read_context().unwrap());
let buffer = LogFetchBuffer::new(test_resolver().unwrap());
buffer.wakeup();

let result = buffer.await_not_empty(Duration::from_millis(10)).await;
Expand All @@ -906,7 +932,7 @@ mod tests {

#[tokio::test]
async fn await_not_empty_returns_pending_error() {
let buffer = LogFetchBuffer::new(test_read_context().unwrap());
let buffer = LogFetchBuffer::new(test_resolver().unwrap());
let table_bucket = TableBucket::new(1, 0);
buffer.pend(Box::new(ErrorPendingFetch {
table_bucket: table_bucket.clone(),
Expand Down Expand Up @@ -950,12 +976,17 @@ mod tests {

let data = builder.build()?;
let log_records = LogRecordsBatches::new(data.clone());
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false);
let arrow_schema = to_arrow_schema(&row_type)?;
let row_type_arc = Arc::new(row_type);
let local_ctx = ReadContext::new(arrow_schema.clone(), row_type_arc.clone(), false);
let remote_ctx = ReadContext::new(arrow_schema, row_type_arc, true);
let resolver = Arc::new(ReadContextResolver::new(1, local_ctx, remote_ctx, None));
let mut fetch = DefaultCompletedFetch::new(
TableBucket::new(1, 0),
log_records,
data.len(),
read_context,
resolver,
false,
0,
0,
);
Expand Down Expand Up @@ -1018,12 +1049,19 @@ mod tests {
let new_len = ((data.len() - LOG_OVERHEAD) as i32).to_le_bytes();
data[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH].copy_from_slice(&new_len);

let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false);
let read_context = ReadContext::new(
to_arrow_schema(&row_type)?,
Arc::new(row_type.clone()),
false,
);
let remote_ctx = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), true);
let resolver = Arc::new(ReadContextResolver::new(1, read_context, remote_ctx, None));
let mut fetch = DefaultCompletedFetch::new(
TableBucket::new(1, 0),
LogRecordsBatches::new(data.clone()),
data.len(),
read_context,
resolver,
false,
0,
0,
);
Expand Down
1 change: 1 addition & 0 deletions crates/fluss/src/client/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod lookup;

mod log_fetch_buffer;
mod partition_getter;
mod read_context_resolver;
mod reader;
mod remote_log;
mod scanner;
Expand Down
Loading
Loading