diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index 0a557d62c77c4..e22c3b80dacb4 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -247,7 +247,7 @@ impl ScalarUDFImpl for ConcatFunc { columns .iter() .for_each(|column| builder.write::(column, i)); - builder.append_offset(); + builder.append_offset()?; } let string_array = builder.finish(None)?; @@ -271,7 +271,7 @@ impl ScalarUDFImpl for ConcatFunc { columns .iter() .for_each(|column| builder.write::(column, i)); - builder.append_offset(); + builder.append_offset()?; } let string_array = builder.finish(None)?; diff --git a/datafusion/functions/src/string/concat_ws.rs b/datafusion/functions/src/string/concat_ws.rs index ab26b8a39dc08..fc4cc6e43b160 100644 --- a/datafusion/functions/src/string/concat_ws.rs +++ b/datafusion/functions/src/string/concat_ws.rs @@ -335,7 +335,7 @@ impl ScalarUDFImpl for ConcatWsFunc { let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size); for i in 0..len { if !sep.is_valid(i) { - builder.append_offset(); + builder.append_offset()?; continue; } let mut first = true; @@ -348,7 +348,7 @@ impl ScalarUDFImpl for ConcatWsFunc { first = false; } } - builder.append_offset(); + builder.append_offset()?; } Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?))) } @@ -356,7 +356,7 @@ impl ScalarUDFImpl for ConcatWsFunc { let mut builder = StringArrayBuilder::with_capacity(len, data_size); for i in 0..len { if !sep.is_valid(i) { - builder.append_offset(); + builder.append_offset()?; continue; } let mut first = true; @@ -369,7 +369,7 @@ impl ScalarUDFImpl for ConcatWsFunc { first = false; } } - builder.append_offset(); + builder.append_offset()?; } Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?))) } diff --git a/datafusion/functions/src/strings.rs b/datafusion/functions/src/strings.rs index ebc58490b2dc1..d0e1ecf8a8e7c 100644 --- a/datafusion/functions/src/strings.rs +++ b/datafusion/functions/src/strings.rs @@ -21,9 +21,9 @@ use datafusion_common::{Result, exec_datafusion_err, internal_err}; use arrow::array::{ Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray, - StringArray, StringViewArray, StringViewBuilder, make_view, + StringArray, StringViewArray, make_view, }; -use arrow::buffer::{MutableBuffer, NullBuffer}; +use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, ScalarBuffer}; use arrow::datatypes::DataType; /// Optimized version of the StringBuilder in Arrow that: @@ -106,13 +106,14 @@ impl StringArrayBuilder { } } - pub fn append_offset(&mut self) { + pub fn append_offset(&mut self) -> Result<()> { let next_offset: i32 = self .value_buffer .len() .try_into() - .expect("byte array offset overflow"); + .map_err(|_| exec_datafusion_err!("byte array offset overflow"))?; self.offsets_buffer.push(next_offset); + Ok(()) } /// Finalize the builder into a concrete [`StringArray`]. @@ -150,18 +151,25 @@ impl StringArrayBuilder { } } +/// Optimized version of Arrow's [`StringViewBuilder`]. Rather than adding NULLs +/// on a row-by-row basis, the caller should provide nulls when calling +/// [`finish`](Self::finish). This allows callers to compute nulls more +/// efficiently (e.g., via bulk bitmap operations). +/// +/// [`StringViewBuilder`]: arrow::array::StringViewBuilder pub struct StringViewArrayBuilder { - builder: StringViewBuilder, + views: Vec, + data: Vec, block: Vec, /// If true, a safety check is required during the `append_offset` call tainted: bool, } impl StringViewArrayBuilder { - pub fn with_capacity(item_capacity: usize, _data_capacity: usize) -> Self { - let builder = StringViewBuilder::with_capacity(item_capacity); + pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self { Self { - builder, + views: Vec::with_capacity(item_capacity), + data: Vec::with_capacity(data_capacity), block: vec![], tainted: false, } @@ -214,16 +222,29 @@ impl StringViewArrayBuilder { } } + /// Finalizes the current row by converting the accumulated data into a + /// StringView and appending it to the views buffer. pub fn append_offset(&mut self) -> Result<()> { - let block_str = if self.tainted { + if self.tainted { std::str::from_utf8(&self.block) - .map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))? + .map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))?; + } + + let v = &self.block; + if v.len() > 12 { + let offset: u32 = self + .data + .len() + .try_into() + .map_err(|_| exec_datafusion_err!("byte array offset overflow"))?; + self.data.extend_from_slice(v); + self.views.push(make_view(v, 0, offset)); } else { - // SAFETY: all data that was appended was valid UTF8 - unsafe { std::str::from_utf8_unchecked(&self.block) } - }; - self.builder.append_value(block_str); + self.views.push(make_view(v, 0, 0)); + } + self.block.clear(); + self.tainted = false; Ok(()) } @@ -233,21 +254,35 @@ impl StringViewArrayBuilder { /// /// Returns an error when: /// - /// - the provided `null_buffer` does not match amount of `append_offset` calls. - pub fn finish(mut self, null_buffer: Option) -> Result { - let array = self.builder.finish(); - match null_buffer { - Some(nulls) if nulls.len() != array.len() => { - internal_err!("Null buffer and views buffer must be the same length") - } - Some(nulls) => { - let array_builder = array.into_data().into_builder().nulls(Some(nulls)); - // SAFETY: the underlying data is valid; we are only adding a null buffer - let array_data = unsafe { array_builder.build_unchecked() }; - Ok(StringViewArray::from(array_data)) - } - None => Ok(array), + /// - the provided `null_buffer` length does not match the row count. + pub fn finish(self, null_buffer: Option) -> Result { + if let Some(ref nulls) = null_buffer + && nulls.len() != self.views.len() + { + return internal_err!( + "Null buffer length ({}) must match row count ({})", + nulls.len(), + self.views.len() + ); } + + let buffers: Vec = if self.data.is_empty() { + vec![] + } else { + vec![Buffer::from(self.data)] + }; + + // SAFETY: views were constructed with correct lengths, offsets, and + // prefixes. UTF-8 validity was checked in append_offset() for any row + // where tainted data (e.g., binary literals) was appended. + let array = unsafe { + StringViewArray::new_unchecked( + ScalarBuffer::from(self.views), + buffers, + null_buffer, + ) + }; + Ok(array) } } @@ -328,13 +363,14 @@ impl LargeStringArrayBuilder { } } - pub fn append_offset(&mut self) { + pub fn append_offset(&mut self) -> Result<()> { let next_offset: i64 = self .value_buffer .len() .try_into() - .expect("byte array offset overflow"); + .map_err(|_| exec_datafusion_err!("byte array offset overflow"))?; self.offsets_buffer.push(next_offset); + Ok(()) } /// Finalize the builder into a concrete [`LargeStringArray`].