diff --git a/docs/source/contributor-guide/spark_expressions_support.md b/docs/source/contributor-guide/spark_expressions_support.md index 8f82847bb6..76431a8d68 100644 --- a/docs/source/contributor-guide/spark_expressions_support.md +++ b/docs/source/contributor-guide/spark_expressions_support.md @@ -464,16 +464,34 @@ - [ ] is_variant_null - [ ] java_method - [x] monotonically_increasing_id + - Spark 3.4.3 (audited 2026-05-27): byte-for-byte identical to 4.1.1. `MonotonicallyIncreasingID() extends LeafExpression with Stateful`; produces a Long that encodes the partition id in the upper 31 bits and a per-partition row counter in the lower 33 bits. Comet emits an empty `MonotonicallyIncreasingId` proto and the native side produces the same encoding. + - Spark 3.5.8 (audited 2026-05-27): identical to 3.4.3. + - Spark 4.0.1 (audited 2026-05-27): identical to 3.4.3. + - Spark 4.1.1 (audited 2026-05-27): identical to 3.4.3. - [ ] parse_json - [ ] raise_error - [x] rand + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): baseline. `Rand(child, hideSeed) extends RDG` (an `UnaryExpression with ExpectsInputTypes with Nondeterministic with ExpressionWithRandomSeed`); `child` is the seed expression, coerced to `IntegerType` or `LongType` via `ImplicitCastInputTypes`. Uses `XORShiftRandom(seed + partitionIndex)` per partition and returns `nextDouble()` in `[0, 1)`. NULL seed evaluates to `0L` (via `null.asInstanceOf[Long]`). + - Spark 4.0.1 (audited 2026-05-27): `RDG` is refactored from an `abstract class` into a trait, and `Rand` now extends a new `NondeterministicUnaryRDG` base. `ExpressionWithRandomSeed.expressionToSeed` is hoisted as a shared helper and throws `QueryCompilationErrors.invalidRandomSeedParameter` for non-literal seeds at analysis time. Runtime semantics unchanged. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. + - Comet limitation: the seed argument must be a literal (column-reference seeds are rejected via `getSupportLevel`). Pre-4.0 Spark would otherwise silently fail at runtime; 4.0+ rejects at analysis time before the expression reaches Comet. - [x] randn + - Spark 3.4.3 (audited 2026-05-27): identical to 3.5.8. + - Spark 3.5.8 (audited 2026-05-27): same base as `Rand`; differs only in the eval body (`nextGaussian()` instead of `nextDouble()`), producing values from the standard normal distribution. + - Spark 4.0.1 (audited 2026-05-27): same refactor as `Rand`; runtime unchanged. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. + - Comet limitation: same as `rand` — the seed argument must be a literal. - [ ] reflect - [ ] schema_of_avro - [ ] schema_of_variant - [ ] schema_of_variant_agg - [ ] session_user - [x] spark_partition_id + - Spark 3.4.3 (audited 2026-05-27): byte-for-byte identical to 4.1.1. `SparkPartitionID() extends LeafExpression with Nondeterministic`; returns the integer index of the partition being processed. Comet emits an empty `SparkPartitionId` proto. + - Spark 3.5.8 (audited 2026-05-27): identical to 3.4.3. + - Spark 4.0.1 (audited 2026-05-27): identical to 3.4.3. + - Spark 4.1.1 (audited 2026-05-27): identical to 3.4.3. - [ ] st_asbinary - [ ] st_geogfromwkb - [ ] st_geomfromwkb @@ -492,6 +510,10 @@ - [ ] try_variant_get - [ ] typeof - [x] user + - Spark 3.4.3 (audited 2026-05-27): `CurrentUser() extends LeafExpression with Unevaluable`; the analyzer's `ResolveCurrentLike` rule replaces it with a `StringType` literal of the current user name before Comet sees the plan. No Comet serde needed; the literal flows through `CometLiteral`. + - Spark 3.5.8 (audited 2026-05-27): identical to 3.4.3. + - Spark 4.0.1 (audited 2026-05-27): identical to 3.4.3 except the resulting literal carries the default string collation. + - Spark 4.1.1 (audited 2026-05-27): identical to 4.0.1. - [ ] uuid - [ ] variant_get - [ ] version diff --git a/spark/src/main/scala/org/apache/comet/serde/nondetermenistic.scala b/spark/src/main/scala/org/apache/comet/serde/nondetermenistic.scala index 8cc46611be..8269896c2e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/nondetermenistic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/nondetermenistic.scala @@ -48,6 +48,17 @@ object CometMonotonicallyIncreasingId extends CometExpressionSerde[Monotonically } sealed abstract class CometRandCommonSerde[T <: Expression] extends CometExpressionSerde[T] { + protected val nonLiteralSeedReason = "The `seed` argument must be a literal value" + + override def getUnsupportedReasons(): Seq[String] = Seq(nonLiteralSeedReason) + + protected def seedExprOf(expr: T): Expression + + override def getSupportLevel(expr: T): SupportLevel = seedExprOf(expr) match { + case _: Literal => Compatible() + case _ => Unsupported(Some(nonLiteralSeedReason)) + } + protected def extractSeedFromExpr(expr: Expression): Option[Long] = { expr match { case Literal(seed: Long, _) => Some(seed) @@ -58,6 +69,8 @@ sealed abstract class CometRandCommonSerde[T <: Expression] extends CometExpress } object CometRand extends CometRandCommonSerde[Rand] { + override protected def seedExprOf(expr: Rand): Expression = expr.child + override def convert( expr: Rand, inputs: Seq[Attribute], @@ -72,6 +85,8 @@ object CometRand extends CometRandCommonSerde[Rand] { } object CometRandn extends CometRandCommonSerde[Randn] { + override protected def seedExprOf(expr: Randn): Expression = expr.child + override def convert( expr: Randn, inputs: Seq[Attribute],