Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/functions/src/string/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl ScalarUDFImpl for ConcatFunc {
columns
.iter()
.for_each(|column| builder.write::<true>(column, i));
builder.append_offset();
builder.append_offset()?;
}

let string_array = builder.finish(None)?;
Expand All @@ -271,7 +271,7 @@ impl ScalarUDFImpl for ConcatFunc {
columns
.iter()
.for_each(|column| builder.write::<true>(column, i));
builder.append_offset();
builder.append_offset()?;
}

let string_array = builder.finish(None)?;
Expand Down
8 changes: 4 additions & 4 deletions datafusion/functions/src/string/concat_ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl ScalarUDFImpl for ConcatWsFunc {
let mut builder = LargeStringArrayBuilder::with_capacity(len, data_size);
for i in 0..len {
if !sep.is_valid(i) {
builder.append_offset();
builder.append_offset()?;
continue;
}
let mut first = true;
Expand All @@ -348,15 +348,15 @@ impl ScalarUDFImpl for ConcatWsFunc {
first = false;
}
}
builder.append_offset();
builder.append_offset()?;
}
Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?)))
}
_ => {
let mut builder = StringArrayBuilder::with_capacity(len, data_size);
for i in 0..len {
if !sep.is_valid(i) {
builder.append_offset();
builder.append_offset()?;
continue;
}
let mut first = true;
Expand All @@ -369,7 +369,7 @@ impl ScalarUDFImpl for ConcatWsFunc {
first = false;
}
}
builder.append_offset();
builder.append_offset()?;
}
Ok(ColumnarValue::Array(Arc::new(builder.finish(sep.nulls())?)))
}
Expand Down
96 changes: 66 additions & 30 deletions datafusion/functions/src/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use datafusion_common::{Result, exec_datafusion_err, internal_err};

use arrow::array::{
Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray,
StringArray, StringViewArray, StringViewBuilder, make_view,
StringArray, StringViewArray, make_view,
};
use arrow::buffer::{MutableBuffer, NullBuffer};
use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, ScalarBuffer};
use arrow::datatypes::DataType;

/// Optimized version of the StringBuilder in Arrow that:
Expand Down Expand Up @@ -106,13 +106,14 @@ impl StringArrayBuilder {
}
}

pub fn append_offset(&mut self) {
pub fn append_offset(&mut self) -> Result<()> {
let next_offset: i32 = self
.value_buffer
.len()
.try_into()
.expect("byte array offset overflow");
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.offsets_buffer.push(next_offset);
Ok(())
}

/// Finalize the builder into a concrete [`StringArray`].
Expand Down Expand Up @@ -150,18 +151,25 @@ impl StringArrayBuilder {
}
}

/// Optimized version of Arrow's [`StringViewBuilder`]. Rather than adding NULLs
/// on a row-by-row basis, the caller should provide nulls when calling
/// [`finish`](Self::finish). This allows callers to compute nulls more
/// efficiently (e.g., via bulk bitmap operations).
///
/// [`StringViewBuilder`]: arrow::array::StringViewBuilder
pub struct StringViewArrayBuilder {
builder: StringViewBuilder,
views: Vec<u128>,
data: Vec<u8>,
block: Vec<u8>,
/// If true, a safety check is required during the `append_offset` call
tainted: bool,
}

impl StringViewArrayBuilder {
pub fn with_capacity(item_capacity: usize, _data_capacity: usize) -> Self {
let builder = StringViewBuilder::with_capacity(item_capacity);
pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self {
Self {
builder,
views: Vec::with_capacity(item_capacity),
data: Vec::with_capacity(data_capacity),
block: vec![],
tainted: false,
}
Expand Down Expand Up @@ -214,16 +222,29 @@ impl StringViewArrayBuilder {
}
}

/// Finalizes the current row by converting the accumulated data into a
/// StringView and appending it to the views buffer.
pub fn append_offset(&mut self) -> Result<()> {
let block_str = if self.tainted {
if self.tainted {
std::str::from_utf8(&self.block)
.map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))?
.map_err(|_| exec_datafusion_err!("invalid UTF-8 in binary literal"))?;
}

let v = &self.block;
if v.len() > 12 {
let offset: u32 = self
.data
.len()
.try_into()
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.data.extend_from_slice(v);
self.views.push(make_view(v, 0, offset));
} else {
// SAFETY: all data that was appended was valid UTF8
unsafe { std::str::from_utf8_unchecked(&self.block) }
};
self.builder.append_value(block_str);
self.views.push(make_view(v, 0, 0));
}

self.block.clear();
self.tainted = false;
Ok(())
}

Expand All @@ -233,21 +254,35 @@ impl StringViewArrayBuilder {
///
/// Returns an error when:
///
/// - the provided `null_buffer` does not match amount of `append_offset` calls.
pub fn finish(mut self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
let array = self.builder.finish();
match null_buffer {
Some(nulls) if nulls.len() != array.len() => {
internal_err!("Null buffer and views buffer must be the same length")
}
Some(nulls) => {
let array_builder = array.into_data().into_builder().nulls(Some(nulls));
// SAFETY: the underlying data is valid; we are only adding a null buffer
let array_data = unsafe { array_builder.build_unchecked() };
Ok(StringViewArray::from(array_data))
}
None => Ok(array),
/// - the provided `null_buffer` length does not match the row count.
pub fn finish(self, null_buffer: Option<NullBuffer>) -> Result<StringViewArray> {
if let Some(ref nulls) = null_buffer
&& nulls.len() != self.views.len()
{
return internal_err!(
"Null buffer length ({}) must match row count ({})",
nulls.len(),
self.views.len()
);
}

let buffers: Vec<Buffer> = if self.data.is_empty() {
vec![]
} else {
vec![Buffer::from(self.data)]
};

// SAFETY: views were constructed with correct lengths, offsets, and
// prefixes. UTF-8 validity was checked in append_offset() for any row
// where tainted data (e.g., binary literals) was appended.
let array = unsafe {
StringViewArray::new_unchecked(
ScalarBuffer::from(self.views),
buffers,
null_buffer,
)
};
Ok(array)
}
}

Expand Down Expand Up @@ -328,13 +363,14 @@ impl LargeStringArrayBuilder {
}
}

pub fn append_offset(&mut self) {
pub fn append_offset(&mut self) -> Result<()> {
let next_offset: i64 = self
.value_buffer
.len()
.try_into()
.expect("byte array offset overflow");
.map_err(|_| exec_datafusion_err!("byte array offset overflow"))?;
self.offsets_buffer.push(next_offset);
Ok(())
}

/// Finalize the builder into a concrete [`LargeStringArray`].
Expand Down
Loading