From c8c9bb9156370b17ca350e8b9feac239af75520d Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 14 Apr 2026 20:36:17 +0400 Subject: [PATCH 1/6] Spark is_valid_utf8 function implementation --- .../src/function/string/is_valid_utf8.rs | 103 ++++++++++++++++++ datafusion/spark/src/function/string/mod.rs | 8 ++ .../test_files/spark/string/is_valid_utf8.slt | 41 +++++++ 3 files changed, 152 insertions(+) create mode 100644 datafusion/spark/src/function/string/is_valid_utf8.rs create mode 100644 datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt diff --git a/datafusion/spark/src/function/string/is_valid_utf8.rs b/datafusion/spark/src/function/string/is_valid_utf8.rs new file mode 100644 index 0000000000000..fe09cc5c079d6 --- /dev/null +++ b/datafusion/spark/src/function/string/is_valid_utf8.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion::logical_expr::{ColumnarValue, Signature, Volatility}; +use datafusion_common::{Result, internal_err}; +use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl}; + +use arrow::array::{Array, ArrayRef, BooleanArray}; +use datafusion_common::cast::{as_binary_array, as_string_array}; +use datafusion_common::utils::take_function_args; +use datafusion_functions::utils::make_scalar_function; + +use std::sync::Arc; + +/// Spark-compatible `is_valid_utf8` expression +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkIsValidUtf8 { + signature: Signature, +} + +impl Default for SparkIsValidUtf8 { + fn default() -> Self { + Self::new() + } +} + +impl SparkIsValidUtf8 { + pub fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![ + DataType::Utf8, + DataType::LargeUtf8, + DataType::Utf8View, + DataType::Binary, + DataType::BinaryView, + DataType::LargeBinary, + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkIsValidUtf8 { + fn name(&self) -> &str { + "is_valid_utf8" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result { + Ok(Arc::new(Field::new(self.name(), DataType::Boolean, true))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + make_scalar_function(spark_is_valid_utf8_inner, vec![])(&args.args) + } +} + +fn spark_is_valid_utf8_inner(args: &[ArrayRef]) -> Result { + let [array] = take_function_args("is_valid_utf8", args)?; + match array.data_type() { + DataType::Utf8 => Ok(Arc::new( + as_string_array(array)? + .iter() + .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) + .collect::(), + )), + DataType::Binary => Ok(Arc::new( + as_binary_array(array)? + .iter() + .map(|x| x.map(|y| String::from_utf8(y.into()).is_ok())) + .collect::(), + )), + data_type => { + internal_err!("is_valid_utf8 does not support: {data_type}") + } + } +} diff --git a/datafusion/spark/src/function/string/mod.rs b/datafusion/spark/src/function/string/mod.rs index 7bcdac5d85474..b163cd3d04e1a 100644 --- a/datafusion/spark/src/function/string/mod.rs +++ b/datafusion/spark/src/function/string/mod.rs @@ -22,6 +22,7 @@ pub mod concat; pub mod elt; pub mod format_string; pub mod ilike; +pub mod is_valid_utf8; pub mod length; pub mod like; pub mod luhn_check; @@ -47,6 +48,7 @@ make_udf_function!(space::SparkSpace, space); make_udf_function!(substring::SparkSubstring, substring); make_udf_function!(base64::SparkUnBase64, unbase64); make_udf_function!(soundex::SparkSoundex, soundex); +make_udf_function!(is_valid_utf8::SparkIsValidUtf8, is_valid_utf8); pub mod expr_fn { use datafusion_functions::export_functions; @@ -113,6 +115,11 @@ pub mod expr_fn { str )); export_functions!((soundex, "Returns Soundex code of the string.", str)); + export_functions!(( + is_valid_utf8, + "Returns true if str is a valid UTF-8 string, otherwise returns false", + str + )); } pub fn functions() -> Vec> { @@ -131,5 +138,6 @@ pub fn functions() -> Vec> { substring(), unbase64(), soundex(), + is_valid_utf8(), ] } diff --git a/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt new file mode 100644 index 0000000000000..aaca350a13e15 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +query T +SELECT is_valid_utf8('Spark'::string); +---- +true + +query T +SELECT is_valid_utf8(NULL::string); +---- +NULL + +query T +SELECT is_valid_utf8(arrow_cast(x'61', 'Binary')); +---- +true + +query T +SELECT is_valid_utf8(arrow_cast(x'80', 'Binary')); +---- +false + +query T +SELECT is_valid_utf8(arrow_cast(x'61C262', 'Binary')); +---- +false From 614a4776cd28de6df18234f7f69fdaa65bd09e5d Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 14 Apr 2026 21:03:00 +0400 Subject: [PATCH 2/6] Fix tests --- .../test_files/spark/string/is_valid_utf8.slt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt index aaca350a13e15..f3177c1254e9e 100644 --- a/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt +++ b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt @@ -15,27 +15,27 @@ # specific language governing permissions and limitations # under the License. -query T +query B SELECT is_valid_utf8('Spark'::string); ---- true -query T +query B SELECT is_valid_utf8(NULL::string); ---- NULL -query T +query B SELECT is_valid_utf8(arrow_cast(x'61', 'Binary')); ---- true -query T +query B SELECT is_valid_utf8(arrow_cast(x'80', 'Binary')); ---- false -query T +query B SELECT is_valid_utf8(arrow_cast(x'61C262', 'Binary')); ---- false From 160d25f70f4b6eee60cd02297690179ebc799f52 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Tue, 14 Apr 2026 21:12:24 +0400 Subject: [PATCH 3/6] Fix tests --- datafusion/spark/src/function/string/is_valid_utf8.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/string/is_valid_utf8.rs b/datafusion/spark/src/function/string/is_valid_utf8.rs index fe09cc5c079d6..39d09db7b7bd5 100644 --- a/datafusion/spark/src/function/string/is_valid_utf8.rs +++ b/datafusion/spark/src/function/string/is_valid_utf8.rs @@ -21,7 +21,7 @@ use datafusion_common::{Result, internal_err}; use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl}; use arrow::array::{Array, ArrayRef, BooleanArray}; -use datafusion_common::cast::{as_binary_array, as_string_array}; +use datafusion_common::cast::{as_binary_array, as_string_array, as_string_view_array}; use datafusion_common::utils::take_function_args; use datafusion_functions::utils::make_scalar_function; @@ -90,6 +90,12 @@ fn spark_is_valid_utf8_inner(args: &[ArrayRef]) -> Result { .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) .collect::(), )), + DataType::Utf8View => Ok(Arc::new( + as_string_view_array(array)? + .iter() + .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) + .collect::(), + )), DataType::Binary => Ok(Arc::new( as_binary_array(array)? .iter() From 21187d1cfd63f4cecea335d44b566ee2609a324b Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Wed, 15 Apr 2026 21:53:18 +0400 Subject: [PATCH 4/6] More tests --- .../src/function/string/is_valid_utf8.rs | 20 ++- .../test_files/spark/string/is_valid_utf8.slt | 148 +++++++++++++++++- 2 files changed, 163 insertions(+), 5 deletions(-) diff --git a/datafusion/spark/src/function/string/is_valid_utf8.rs b/datafusion/spark/src/function/string/is_valid_utf8.rs index 39d09db7b7bd5..1d1e162137b10 100644 --- a/datafusion/spark/src/function/string/is_valid_utf8.rs +++ b/datafusion/spark/src/function/string/is_valid_utf8.rs @@ -21,7 +21,7 @@ use datafusion_common::{Result, internal_err}; use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl}; use arrow::array::{Array, ArrayRef, BooleanArray}; -use datafusion_common::cast::{as_binary_array, as_string_array, as_string_view_array}; +use datafusion_common::cast::{as_binary_array, as_binary_view_array, as_large_binary_array, as_large_string_array, as_string_array, as_string_view_array}; use datafusion_common::utils::take_function_args; use datafusion_functions::utils::make_scalar_function; @@ -96,12 +96,30 @@ fn spark_is_valid_utf8_inner(args: &[ArrayRef]) -> Result { .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) .collect::(), )), + DataType::LargeUtf8 => Ok(Arc::new( + as_large_string_array(array)? + .iter() + .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) + .collect::(), + )), DataType::Binary => Ok(Arc::new( as_binary_array(array)? .iter() .map(|x| x.map(|y| String::from_utf8(y.into()).is_ok())) .collect::(), )), + DataType::LargeBinary => Ok(Arc::new( + as_large_binary_array(array)? + .iter() + .map(|x| x.map(|y| String::from_utf8(y.into()).is_ok())) + .collect::(), + )), + DataType::BinaryView => Ok(Arc::new( + as_binary_view_array(array)? + .iter() + .map(|x| x.map(|y| String::from_utf8(y.into()).is_ok())) + .collect::(), + )), data_type => { internal_err!("is_valid_utf8 does not support: {data_type}") } diff --git a/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt index f3177c1254e9e..8ec6eb5f2e085 100644 --- a/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt +++ b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt @@ -16,17 +16,52 @@ # under the License. query B -SELECT is_valid_utf8('Spark'::string); +SELECT is_valid_utf8('Hello, world!'::string); ---- true query B -SELECT is_valid_utf8(NULL::string); +SELECT is_valid_utf8('😀🎉✨'::string); ---- -NULL +true + +query B +SELECT is_valid_utf8(''::string); +---- +true + +query B +SELECT is_valid_utf8('ASCII only 123 !@#'::string); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'C2A9', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'C2AE', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'E282AC', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'E284A2', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'F09F9880', 'Binary')); +---- +true query B -SELECT is_valid_utf8(arrow_cast(x'61', 'Binary')); +SELECT is_valid_utf8(arrow_cast(x'F09F8E89', 'Binary')); ---- true @@ -35,7 +70,112 @@ SELECT is_valid_utf8(arrow_cast(x'80', 'Binary')); ---- false +query B +SELECT is_valid_utf8(arrow_cast(x'BF', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'808080', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'C2', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'E2', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'F0', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'E282', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'C081', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'E08080', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'F0808080', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'FE', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'FF', 'Binary')); +---- +false + query B SELECT is_valid_utf8(arrow_cast(x'61C262', 'Binary')); ---- false + +query B +SELECT is_valid_utf8(arrow_cast(x'41BF42', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'ED9FBF', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'EDA080', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'EDBFBF', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'F48FBFBF', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'F4908080', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'6162C2A963', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'6162806364', 'Binary')); +---- +false + +query B +SELECT is_valid_utf8(arrow_cast(x'610062', 'Binary')); +---- +true + +query B +SELECT is_valid_utf8(arrow_cast(x'', 'Binary')); +---- +true From d5057f18588744e164339fa365e917216c1c65c5 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Wed, 15 Apr 2026 21:54:14 +0400 Subject: [PATCH 5/6] fmt --- datafusion/spark/src/function/string/is_valid_utf8.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/string/is_valid_utf8.rs b/datafusion/spark/src/function/string/is_valid_utf8.rs index 1d1e162137b10..3304cdda94fc0 100644 --- a/datafusion/spark/src/function/string/is_valid_utf8.rs +++ b/datafusion/spark/src/function/string/is_valid_utf8.rs @@ -21,7 +21,10 @@ use datafusion_common::{Result, internal_err}; use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl}; use arrow::array::{Array, ArrayRef, BooleanArray}; -use datafusion_common::cast::{as_binary_array, as_binary_view_array, as_large_binary_array, as_large_string_array, as_string_array, as_string_view_array}; +use datafusion_common::cast::{ + as_binary_array, as_binary_view_array, as_large_binary_array, as_large_string_array, + as_string_array, as_string_view_array, +}; use datafusion_common::utils::take_function_args; use datafusion_functions::utils::make_scalar_function; From aa571f94d29828f4e910253636acd2b8ff816b62 Mon Sep 17 00:00:00 2001 From: Kazantsev Maksim Date: Fri, 17 Apr 2026 21:20:16 +0400 Subject: [PATCH 6/6] Add more tests --- .../src/function/string/is_valid_utf8.rs | 34 ++++++------------- .../test_files/spark/string/is_valid_utf8.slt | 22 ++++++++++++ 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/datafusion/spark/src/function/string/is_valid_utf8.rs b/datafusion/spark/src/function/string/is_valid_utf8.rs index 3304cdda94fc0..04958a25317d2 100644 --- a/datafusion/spark/src/function/string/is_valid_utf8.rs +++ b/datafusion/spark/src/function/string/is_valid_utf8.rs @@ -21,9 +21,9 @@ use datafusion_common::{Result, internal_err}; use datafusion_expr::{ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl}; use arrow::array::{Array, ArrayRef, BooleanArray}; +use arrow::buffer::BooleanBuffer; use datafusion_common::cast::{ - as_binary_array, as_binary_view_array, as_large_binary_array, as_large_string_array, - as_string_array, as_string_view_array, + as_binary_array, as_binary_view_array, as_large_binary_array, }; use datafusion_common::utils::take_function_args; use datafusion_functions::utils::make_scalar_function; @@ -87,40 +87,28 @@ impl ScalarUDFImpl for SparkIsValidUtf8 { fn spark_is_valid_utf8_inner(args: &[ArrayRef]) -> Result { let [array] = take_function_args("is_valid_utf8", args)?; match array.data_type() { - DataType::Utf8 => Ok(Arc::new( - as_string_array(array)? - .iter() - .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) - .collect::(), - )), - DataType::Utf8View => Ok(Arc::new( - as_string_view_array(array)? - .iter() - .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) - .collect::(), - )), - DataType::LargeUtf8 => Ok(Arc::new( - as_large_string_array(array)? - .iter() - .map(|x| x.map(|y| String::from_utf8(y.as_bytes().to_vec()).is_ok())) - .collect::(), - )), + DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 => { + Ok(Arc::new(BooleanArray::new( + BooleanBuffer::new_set(array.len()), + array.nulls().cloned(), + ))) + } DataType::Binary => Ok(Arc::new( as_binary_array(array)? .iter() - .map(|x| x.map(|y| String::from_utf8(y.into()).is_ok())) + .map(|x| x.map(|y| str::from_utf8(y).is_ok())) .collect::(), )), DataType::LargeBinary => Ok(Arc::new( as_large_binary_array(array)? .iter() - .map(|x| x.map(|y| String::from_utf8(y.into()).is_ok())) + .map(|x| x.map(|y| str::from_utf8(y).is_ok())) .collect::(), )), DataType::BinaryView => Ok(Arc::new( as_binary_view_array(array)? .iter() - .map(|x| x.map(|y| String::from_utf8(y.into()).is_ok())) + .map(|x| x.map(|y| str::from_utf8(y).is_ok())) .collect::(), )), data_type => { diff --git a/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt index 8ec6eb5f2e085..9b04595334ae1 100644 --- a/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt +++ b/datafusion/sqllogictest/test_files/spark/string/is_valid_utf8.slt @@ -15,6 +15,28 @@ # specific language governing permissions and limitations # under the License. +statement ok +CREATE TABLE test_is_valid_utf8(value STRING) AS VALUES + (arrow_cast('Hello, world!', 'Utf8')), + (arrow_cast('Spark', 'Utf8')), + (arrow_cast('DataFusion', 'Utf8')), + (arrow_cast('ASCII only 123 !@#', 'Utf8')), + (arrow_cast(NULL, 'Utf8')); + +query B +SELECT is_valid_utf8(value) FROM test_is_valid_utf8; +---- +true +true +true +true +NULL + +query B +SELECT is_valid_utf8(NULL::string); +---- +NULL + query B SELECT is_valid_utf8('Hello, world!'::string); ----