From f279d1792e53c8636395982aeedb193b8443de89 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 27 May 2026 17:23:02 -0600 Subject: [PATCH] chore(audit): audit hash expressions across Spark 3.4.3, 3.5.8, 4.0.1, 4.1.1 Add per-version audit sub-bullets to `crc32`, `hash`, `md5`, `sha`, `sha1`, `sha2`, and `xxhash64` in `docs/source/contributor-guide/spark_expressions_support.md`. `sha` is a registry alias of `Sha1`. Spark 4.0 only adds the `DefaultStringProducingExpression` trait and the `nullIntolerant` field refactor across this category; no runtime behaviour change. Apply support-level consistency fixes surfaced by the audit: - Refactor `HashUtils` to return reasons (`unsupportedReasonFor`, `supportLevelForChildren`, `unsupportedReasons`) instead of calling `withInfo` from inside the helper. The recursive type check no longer side-effects on the expression tree at type-check time. - `CometXxHash64`, `CometMurmur3Hash`, `CometSha1`, `CometSha2`: override `getSupportLevel` and `getUnsupportedReasons` so the unsupported-child-type and (for Sha2) the non-foldable-numBits restrictions reach the dispatcher and the compatibility doc. No correctness divergences were found, so no new tracking issues are filed. The known `TimeType` gap (Spark 4.0+) is covered by the existing #4418 EPIC; the `DecimalType`-precision-18 gap is a documented Spark semantic difference (BigDecimal hashing). --- .../spark_expressions_support.md | 29 +++++ .../scala/org/apache/comet/serde/hash.scala | 109 ++++++++++-------- 2 files changed, 88 insertions(+), 50 deletions(-) diff --git a/docs/source/contributor-guide/spark_expressions_support.md b/docs/source/contributor-guide/spark_expressions_support.md index 8f82847bb6..39f75fd893 100644 --- a/docs/source/contributor-guide/spark_expressions_support.md +++ b/docs/source/contributor-guide/spark_expressions_support.md @@ -315,12 +315,41 @@ ### hash_funcs - [x] crc32 + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): baseline. `Crc32(child) extends UnaryExpression`; `inputTypes = Seq(BinaryType) -> LongType`. Wired as `CometScalarFunction("crc32")`. + - Spark 4.0.1 (audited 2026-05-27): semantics unchanged; `NullIntolerant` trait replaced by `nullIntolerant: Boolean` override. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. - [x] hash + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): baseline. `Murmur3Hash(children, seed) extends HashExpression[Int]`; produces a Murmur3 hash with a configurable Int seed and `IntegerType` result. Comet routes via `CometMurmur3Hash` to the native `murmur3_hash` UDF. + - Spark 4.0.1 (audited 2026-05-27): semantics unchanged; some inner helper refactors only. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. + - Known limitation: `DecimalType` children with precision > 18 fall back because Spark hashes them through Java `BigDecimal`; `TimeType` (Spark 4.0+) is also unsupported. The same limitations apply to `xxhash64`, `sha1`, `sha2` through the shared `HashUtils`. - [x] md5 + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): baseline. `Md5(child) extends UnaryExpression`; `inputTypes = Seq(BinaryType) -> StringType`. Wired as `CometScalarFunction("md5")`. + - Spark 4.0.1 (audited 2026-05-27): semantics unchanged; trait set gains `DefaultStringProducingExpression` and the `nullIntolerant: Boolean` refactor. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. - [x] sha + - Spark 3.4.3 (audited 2026-05-27): registry alias of `Sha1`. Same support as `sha1`. + - Spark 3.5.8 (audited 2026-05-27): identical to 3.4.3. + - Spark 4.0.1 (audited 2026-05-27): identical to 3.4.3. + - Spark 4.1.1 (audited 2026-05-27): identical to 3.4.3. - [x] sha1 + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): baseline. `Sha1(child) extends UnaryExpression with NullIntolerant`; `inputTypes = Seq(BinaryType) -> StringType`. Comet routes via `CometSha1` to the native `sha1` UDF. + - Spark 4.0.1 (audited 2026-05-27): trait set gains `DefaultStringProducingExpression` and `NullIntolerant` is replaced by `nullIntolerant: Boolean`. Runtime unchanged. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. - [x] sha2 + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): baseline. `Sha2(left, right) extends BinaryExpression`; `inputTypes = Seq(BinaryType, IntegerType) -> StringType`. The `numBits` argument selects SHA-224/256/384/512 (0 is treated as 256); other values return NULL. Comet routes via `CometSha2` to the native `sha2` UDF; non-foldable `numBits` falls back to Spark. + - Spark 4.0.1 (audited 2026-05-27): trait set gains `DefaultStringProducingExpression` and the `nullIntolerant: Boolean` refactor. Runtime unchanged. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. - [x] xxhash64 + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): baseline. `XxHash64(children, seed) extends HashExpression[Long]`; produces an xxHash64 hash with a configurable Long seed and `LongType` result. Comet routes via `CometXxHash64` to the native `xxhash64` UDF. + - Spark 4.0.1 (audited 2026-05-27): semantics unchanged. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. ### json_funcs diff --git a/spark/src/main/scala/org/apache/comet/serde/hash.scala b/spark/src/main/scala/org/apache/comet/serde/hash.scala index a58e81b02d..ee3e80059d 100644 --- a/spark/src/main/scala/org/apache/comet/serde/hash.scala +++ b/spark/src/main/scala/org/apache/comet/serde/hash.scala @@ -22,17 +22,19 @@ package org.apache.comet.serde import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Murmur3Hash, Sha1, Sha2, XxHash64} import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegerType, LongType, MapType, StringType, StructType} -import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, isTimeType, scalarFunctionExprToProtoWithReturnType, serializeDataType, supportedDataType} object CometXxHash64 extends CometExpressionSerde[XxHash64] { + + override def getUnsupportedReasons(): Seq[String] = HashUtils.unsupportedReasons + + override def getSupportLevel(expr: XxHash64): SupportLevel = + HashUtils.supportLevelForChildren(expr) + override def convert( expr: XxHash64, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - if (!HashUtils.isSupportedType(expr)) { - return None - } val exprs = expr.children.map(exprToProtoInternal(_, inputs, binding)) val seedBuilder = LiteralOuterClass.Literal .newBuilder() @@ -45,13 +47,16 @@ object CometXxHash64 extends CometExpressionSerde[XxHash64] { } object CometMurmur3Hash extends CometExpressionSerde[Murmur3Hash] { + + override def getUnsupportedReasons(): Seq[String] = HashUtils.unsupportedReasons + + override def getSupportLevel(expr: Murmur3Hash): SupportLevel = + HashUtils.supportLevelForChildren(expr) + override def convert( expr: Murmur3Hash, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - if (!HashUtils.isSupportedType(expr)) { - return None - } val exprs = expr.children.map(exprToProtoInternal(_, inputs, binding)) val seedBuilder = LiteralOuterClass.Literal .newBuilder() @@ -68,21 +73,25 @@ object CometMurmur3Hash extends CometExpressionSerde[Murmur3Hash] { } object CometSha2 extends CometExpressionSerde[Sha2] { - override def convert( - expr: Sha2, - inputs: Seq[Attribute], - binding: Boolean): Option[ExprOuterClass.Expr] = { - if (!HashUtils.isSupportedType(expr)) { - return None - } - // It's possible for spark to dynamically compute the number of bits from input - // expression, however DataFusion does not support that yet. + private val nonFoldableNumBitsReason = + "The `numBits` argument must be a foldable literal value" + + override def getUnsupportedReasons(): Seq[String] = + HashUtils.unsupportedReasons :+ nonFoldableNumBitsReason + + override def getSupportLevel(expr: Sha2): SupportLevel = { if (!expr.right.foldable) { - withInfo(expr, "For Sha2, non literal numBits is not supported") - return None + Unsupported(Some(nonFoldableNumBitsReason)) + } else { + HashUtils.supportLevelForChildren(expr) } + } + override def convert( + expr: Sha2, + inputs: Seq[Attribute], + binding: Boolean): Option[ExprOuterClass.Expr] = { val leftExpr = exprToProtoInternal(expr.left, inputs, binding) val numBitsExpr = exprToProtoInternal(expr.right, inputs, binding) scalarFunctionExprToProtoWithReturnType("sha2", StringType, false, leftExpr, numBitsExpr) @@ -90,50 +99,50 @@ object CometSha2 extends CometExpressionSerde[Sha2] { } object CometSha1 extends CometExpressionSerde[Sha1] { + + override def getUnsupportedReasons(): Seq[String] = HashUtils.unsupportedReasons + + override def getSupportLevel(expr: Sha1): SupportLevel = + HashUtils.supportLevelForChildren(expr) + override def convert( expr: Sha1, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - if (!HashUtils.isSupportedType(expr)) { - withInfo(expr, s"HashUtils doesn't support dataType: ${expr.child.dataType}") - return None - } val childExpr = exprToProtoInternal(expr.child, inputs, binding) scalarFunctionExprToProtoWithReturnType("sha1", StringType, false, childExpr) } } private object HashUtils { - def isSupportedType(expr: Expression): Boolean = { - for (child <- expr.children) { - if (!isSupportedDataType(expr, child.dataType)) { - return false - } + + private val unsupportedDecimalReason = + "`DecimalType` with precision > 18 is not supported (Spark hashes via Java `BigDecimal`)" + private val unsupportedTimeTypeReason = "`TimeType` is not supported" + + val unsupportedReasons: Seq[String] = + Seq(unsupportedDecimalReason, unsupportedTimeTypeReason, "Unsupported child data type") + + def supportLevelForChildren(expr: Expression): SupportLevel = { + expr.children.iterator + .flatMap(c => unsupportedReasonFor(c.dataType).iterator) + .toSeq + .headOption match { + case Some(reason) => Unsupported(Some(reason)) + case None => Compatible() } - true } - private def isSupportedDataType(expr: Expression, dt: DataType): Boolean = { - dt match { - case d: DecimalType if d.precision > 18 => - // Spark converts decimals with precision > 18 into - // Java BigDecimal before hashing - withInfo(expr, s"Unsupported datatype: $dt (precision > 18)") - false - case s: StructType => - s.fields.forall(f => isSupportedDataType(expr, f.dataType)) - case a: ArrayType => - isSupportedDataType(expr, a.elementType) - case m: MapType => - isSupportedDataType(expr, m.keyType) && isSupportedDataType(expr, m.valueType) - case dt if isTimeType(dt) => - withInfo(expr, s"Unsupported datatype $dt") - false - case _ if !supportedDataType(dt, allowComplex = true) => - withInfo(expr, s"Unsupported datatype $dt") - false - case _ => - true - } + private def unsupportedReasonFor(dt: DataType): Option[String] = dt match { + case d: DecimalType if d.precision > 18 => Some(unsupportedDecimalReason) + case s: StructType => + s.fields.iterator.flatMap(f => unsupportedReasonFor(f.dataType).iterator).toSeq.headOption + case a: ArrayType => unsupportedReasonFor(a.elementType) + case m: MapType => + unsupportedReasonFor(m.keyType).orElse(unsupportedReasonFor(m.valueType)) + case dt if isTimeType(dt) => Some(unsupportedTimeTypeReason) + case _ if !supportedDataType(dt, allowComplex = true) => + Some(s"Unsupported child data type: $dt") + case _ => None } }