Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/physical-expr-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ chrono = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-proto-models = { workspace = true, optional = true }
half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
Expand Down
345 changes: 343 additions & 2 deletions datafusion/physical-expr-common/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@
// under the License.

use arrow::array::BooleanArray;
use arrow::array::{ArrayRef, Datum, make_comparator};
use arrow::array::{Array, ArrayRef, AsArray, Datum, make_comparator};
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::compute::kernels::arity::unary;
use arrow::compute::kernels::cmp::{
distinct, eq, gt, gt_eq, lt, lt_eq, neq, not_distinct,
};
use arrow::compute::{SortOptions, ilike, like, nilike, nlike};
use arrow::datatypes::{
DataType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
};
use arrow::downcast_dictionary_array;
use arrow::error::ArrowError;
use datafusion_common::{Result, ScalarValue};
use datafusion_common::{arrow_datafusion_err, assert_or_internal_err, internal_err};
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::operator::Operator;
use half::f16;
use std::sync::Arc;

/// Applies a binary [`Datum`] kernel `f` to `lhs` and `rhs`
Expand Down Expand Up @@ -84,7 +90,147 @@ pub fn apply_cmp(
}
};

apply(lhs, rhs, |l, r| Ok(Arc::new(f(l, r)?)))
let lhs = normalize_neg_zero(lhs);
let rhs = normalize_neg_zero(rhs);
apply(&lhs, &rhs, |l, r| Ok(Arc::new(f(l, r)?)))
}
}

/// Replace `-0.0` with `+0.0` on float inputs so that comparison operators
/// follow IEEE 754 default semantics (where `-0.0 == +0.0`).
///
/// arrow-rs' comparison kernels intentionally use totalOrder semantics, which
/// treats `-0.0` as strictly less than `+0.0` and not equal to it
///
/// See [`normalize_neg_zero_array`] / [`normalize_neg_zero_scalar`] for the
/// per-variant behavior, including dictionary- and run-end-encoded arrays.
pub fn normalize_neg_zero(value: &ColumnarValue) -> ColumnarValue {
match value {
ColumnarValue::Array(array) => {
ColumnarValue::Array(normalize_neg_zero_array(array))
}
ColumnarValue::Scalar(scalar) => {
ColumnarValue::Scalar(normalize_neg_zero_scalar(scalar))
}
}
}

/// Array variant of [`normalize_neg_zero`]. Returns the input unchanged for
/// arrays that don't contain `-0.0` (no allocation) and for arrays whose
/// (transitive) value type is not floating-point.
///
/// Dictionary- and run-end-encoded arrays are peeled to their inner values
/// and rebuilt with normalized values when needed; the keys/run-ends are
/// preserved.
pub fn normalize_neg_zero_array(array: &ArrayRef) -> ArrayRef {
if !data_type_contains_float(array.data_type()) {
return Arc::clone(array);
}

match array.data_type() {
DataType::Float16 => {
let arr = array.as_primitive::<Float16Type>();
if arr
.values()
.iter()
.any(|v| v.is_sign_negative() && *v == f16::ZERO)
{
Arc::new(unary::<Float16Type, _, Float16Type>(arr, |x| {
if x == f16::ZERO { f16::ZERO } else { x }
}))
} else {
Arc::clone(array)
}
}
DataType::Float32 => {
let arr = array.as_primitive::<Float32Type>();
if arr
.values()
.iter()
.any(|v| v.is_sign_negative() && *v == 0.0)
{
Arc::new(unary::<Float32Type, _, Float32Type>(arr, |x| {
if x == 0.0 { 0.0 } else { x }
}))
} else {
Arc::clone(array)
}
}
DataType::Float64 => {
let arr = array.as_primitive::<Float64Type>();
if arr
.values()
.iter()
.any(|v| v.is_sign_negative() && *v == 0.0)
{
Arc::new(unary::<Float64Type, _, Float64Type>(arr, |x| {
if x == 0.0 { 0.0 } else { x }
}))
} else {
Arc::clone(array)
}
}
DataType::Dictionary(_, _) => {
let dyn_array: &dyn Array = array.as_ref();
downcast_dictionary_array!(
dyn_array => {
let inner = dyn_array.values();
let normalized = normalize_neg_zero_array(inner);
if Arc::ptr_eq(&normalized, inner) {
Arc::clone(array)
} else {
Arc::new(dyn_array.with_values(normalized))
}
}
_ => unreachable!("data_type matched Dictionary"),
)
}
DataType::RunEndEncoded(run_ends_field, _) => match run_ends_field.data_type() {
DataType::Int16 => normalize_neg_zero_ree::<Int16Type>(array),
DataType::Int32 => normalize_neg_zero_ree::<Int32Type>(array),
DataType::Int64 => normalize_neg_zero_ree::<Int64Type>(array),
_ => Arc::clone(array),
},
_ => Arc::clone(array),
}
}

/// Returns `true` if `dt` is, or transitively wraps, a floating-point type.
/// Used to short-circuit [`normalize_neg_zero_array`] for non-float inputs.
fn data_type_contains_float(dt: &DataType) -> bool {
match dt {
DataType::Float16 | DataType::Float32 | DataType::Float64 => true,
DataType::Dictionary(_, value_type) => data_type_contains_float(value_type),
DataType::RunEndEncoded(_, values_field) => {
data_type_contains_float(values_field.data_type())
}
_ => false,
}
}

fn normalize_neg_zero_ree<R: arrow::datatypes::RunEndIndexType>(
array: &ArrayRef,
) -> ArrayRef {
let ree = array.as_ref().as_run::<R>();
let inner = ree.values();
let normalized = normalize_neg_zero_array(inner);
if Arc::ptr_eq(&normalized, inner) {
Arc::clone(array)
} else {
Arc::new(ree.with_values(normalized))
}
}

/// Scalar variant of [`normalize_neg_zero`]. Returns the input unchanged for
/// non-float scalars and for non-zero float values.
pub fn normalize_neg_zero_scalar(scalar: &ScalarValue) -> ScalarValue {
match scalar {
ScalarValue::Float16(Some(v)) if *v == f16::ZERO => {
ScalarValue::Float16(Some(f16::ZERO))
}
ScalarValue::Float32(Some(v)) if *v == 0.0 => ScalarValue::Float32(Some(0.0)),
ScalarValue::Float64(Some(v)) if *v == 0.0 => ScalarValue::Float64(Some(0.0)),
_ => scalar.clone(),
}
}

Expand Down Expand Up @@ -205,3 +351,198 @@ pub fn compare_op_for_nested(
Ok(BooleanArray::new(values, nulls))
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
DictionaryArray, Float16Array, Float32Array, Float64Array, Int32Array, RunArray,
};

#[test]
fn data_type_contains_float_detects_nested() {
use arrow::datatypes::Field;

assert!(data_type_contains_float(&DataType::Float32));
assert!(!data_type_contains_float(&DataType::Int64));

let dict_of_float =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Float64));
assert!(data_type_contains_float(&dict_of_float));

let dict_of_int =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int64));
assert!(!data_type_contains_float(&dict_of_int));

let ree_of_float = DataType::RunEndEncoded(
Arc::new(Field::new("run_ends", DataType::Int32, false)),
Arc::new(Field::new("values", DataType::Float64, true)),
);
assert!(data_type_contains_float(&ree_of_float));
}

fn has_neg_zero_f64(array: &dyn Array) -> bool {
array
.as_primitive::<Float64Type>()
.values()
.iter()
.any(|v| v.is_sign_negative() && *v == 0.0)
}

#[test]
fn normalize_float64_rewrites_neg_zero() {
let array: ArrayRef = Arc::new(Float64Array::from(vec![-0.0, 0.0, 1.5, -2.0]));
let normalized = normalize_neg_zero_array(&array);

assert!(!Arc::ptr_eq(&normalized, &array));
assert!(!has_neg_zero_f64(normalized.as_ref()));
let values = normalized.as_primitive::<Float64Type>().values();
assert_eq!(values[2], 1.5);
assert_eq!(values[3], -2.0);
}

#[test]
fn normalize_float64_passthrough_when_no_neg_zero() {
let array: ArrayRef = Arc::new(Float64Array::from(vec![0.0, 1.0, 2.0]));
let normalized = normalize_neg_zero_array(&array);
assert!(Arc::ptr_eq(&normalized, &array));
}

#[test]
fn normalize_dict_of_float64_peels_and_rewrites() {
let values: ArrayRef = Arc::new(Float64Array::from(vec![-0.0, 1.0, 2.0]));
let keys = Int32Array::from(vec![0, 1, 0, 2]);
let array: ArrayRef =
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap());

let normalized = normalize_neg_zero_array(&array);
assert!(!Arc::ptr_eq(&normalized, &array));

let dict = normalized
.as_ref()
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap();
assert!(!has_neg_zero_f64(dict.values().as_ref()));
}

#[test]
fn normalize_dict_of_non_float_passes_through() {
let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
let keys = Int32Array::from(vec![0, 1, 2, 0]);
let array: ArrayRef =
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap());

let normalized = normalize_neg_zero_array(&array);
assert!(Arc::ptr_eq(&normalized, &array));
}

#[test]
fn normalize_ree_of_float32_peels_and_rewrites() {
let run_ends = Int32Array::from(vec![2, 3, 5]);
let values: ArrayRef = Arc::new(Float32Array::from(vec![-0.0, 1.0, 2.0]));
let array: ArrayRef =
Arc::new(RunArray::<Int32Type>::try_new(&run_ends, values.as_ref()).unwrap());

let normalized = normalize_neg_zero_array(&array);
assert!(!Arc::ptr_eq(&normalized, &array));

let ree = normalized.as_ref().as_run::<Int32Type>();
let inner = ree.values().as_primitive::<Float32Type>();
assert!(
!inner
.values()
.iter()
.any(|v| v.is_sign_negative() && *v == 0.0)
);
}

#[test]
fn normalize_float16_rewrites_neg_zero() {
let array: ArrayRef = Arc::new(Float16Array::from(vec![
f16::NEG_ZERO,
f16::ZERO,
f16::from_f32(1.5),
]));
let normalized = normalize_neg_zero_array(&array);

assert!(!Arc::ptr_eq(&normalized, &array));
let values = normalized.as_primitive::<Float16Type>().values();
assert!(
!values
.iter()
.any(|v| v.is_sign_negative() && *v == f16::ZERO)
);
assert_eq!(values[2], f16::from_f32(1.5));
}

#[test]
fn normalize_nested_dict_recurses() {
// Dictionary<Int32, Dictionary<Int32, Float64>> — exercises the
// recursive peel through two layers of dictionary encoding.
let values: ArrayRef = Arc::new(Float64Array::from(vec![-0.0, 1.0]));
let inner_keys = Int32Array::from(vec![0, 1, 0]);
let inner_dict: ArrayRef =
Arc::new(DictionaryArray::<Int32Type>::try_new(inner_keys, values).unwrap());
let outer_keys = Int32Array::from(vec![0, 1, 2]);
let array: ArrayRef = Arc::new(
DictionaryArray::<Int32Type>::try_new(outer_keys, inner_dict).unwrap(),
);

let normalized = normalize_neg_zero_array(&array);
assert!(!Arc::ptr_eq(&normalized, &array));

let outer = normalized
.as_ref()
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap();
let inner = outer
.values()
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.unwrap();
assert!(!has_neg_zero_f64(inner.values().as_ref()));
}

#[test]
fn normalize_scalar_float64() {
assert_eq!(
normalize_neg_zero_scalar(&ScalarValue::Float64(Some(-0.0))),
ScalarValue::Float64(Some(0.0))
);
assert_eq!(
normalize_neg_zero_scalar(&ScalarValue::Float64(Some(0.0))),
ScalarValue::Float64(Some(0.0))
);
// Non-zero values are unchanged, including the sign of negative numbers.
assert_eq!(
normalize_neg_zero_scalar(&ScalarValue::Float64(Some(-1.5))),
ScalarValue::Float64(Some(-1.5))
);
assert_eq!(
normalize_neg_zero_scalar(&ScalarValue::Float64(None)),
ScalarValue::Float64(None)
);
}

#[test]
fn normalize_scalar_float32_and_float16() {
assert_eq!(
normalize_neg_zero_scalar(&ScalarValue::Float32(Some(-0.0))),
ScalarValue::Float32(Some(0.0))
);
assert_eq!(
normalize_neg_zero_scalar(&ScalarValue::Float16(Some(f16::NEG_ZERO))),
ScalarValue::Float16(Some(f16::ZERO))
);
}

#[test]
fn normalize_scalar_non_float_passthrough() {
let s = ScalarValue::Int32(Some(0));
assert_eq!(normalize_neg_zero_scalar(&s), s);
let s = ScalarValue::Utf8(Some("hello".into()));
assert_eq!(normalize_neg_zero_scalar(&s), s);
}
}
Loading
Loading