From 1f5de3ebbcaa34f5c902e5be5825c014fcc24437 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 29 May 2026 06:19:40 -0600 Subject: [PATCH 1/4] feat: enable JVM Scala UDF codegen dispatch by default Flip `spark.comet.exec.scalaUDF.codegen.enabled` to default `true` so that eligible Spark `ScalaUDF` expressions are routed through Comet's Arrow-direct codegen dispatcher without requiring opt-in. The feature is no longer marked experimental. Update the Scala/Java UDF and Iceberg user guides to reflect that the dispatcher is on by default and document how to disable it. --- docs/source/user-guide/latest/iceberg.md | 12 ++++++------ docs/source/user-guide/latest/scala_java_udfs.md | 4 ++-- .../src/main/scala/org/apache/comet/CometConf.scala | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index f22180ec77..99a68ff2bd 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -157,12 +157,12 @@ Iceberg ships several `ScalaUDF`s that surface in user queries and maintenance a (`INT_ORDERED_BYTES`, `LONG_ORDERED_BYTES`, ..., `INTERLEAVE_BYTES`) over the sort key columns during compaction. -By default these UDFs cause the enclosing operator to fall back to Spark, which forces a -columnar-to-row roundtrip and demotes the surrounding shuffle from `CometExchange` to -`CometColumnarExchange`. Enabling the experimental -[Scala UDF and Java UDF Support](scala_java_udfs.md) feature -(`spark.comet.exec.scalaUDF.codegen.enabled=true`) routes these UDFs through native execution so -the project, exchange, and sort operators around them stay on the Comet path end-to-end. +[Scala UDF and Java UDF Support](scala_java_udfs.md) is enabled by default +(`spark.comet.exec.scalaUDF.codegen.enabled=true`), so these UDFs run through native execution and +the project, exchange, and sort operators around them stay on the Comet path end-to-end. Setting +`spark.comet.exec.scalaUDF.codegen.enabled=false` causes the enclosing operator to fall back to +Spark, which forces a columnar-to-row roundtrip and demotes the surrounding shuffle from +`CometExchange` to `CometColumnarExchange`. ### Task input metrics diff --git a/docs/source/user-guide/latest/scala_java_udfs.md b/docs/source/user-guide/latest/scala_java_udfs.md index e8163e494c..89b0c3e941 100644 --- a/docs/source/user-guide/latest/scala_java_udfs.md +++ b/docs/source/user-guide/latest/scala_java_udfs.md @@ -23,13 +23,13 @@ Comet executes Spark's Scala and Java [scalar user-defined functions (UDFs)](htt This page covers Spark's `ScalaUDF` (Scala `udf(...)`, `spark.udf.register(...)` over Scala or Java functional interfaces, and SQL `CREATE FUNCTION ... AS 'com.example.MyUDF'`). Other UDF kinds (Python / Pandas, Hive, aggregate) are out of scope and continue to fall back to Spark. -This feature is experimental and disabled by default. +This feature is enabled by default. Set `spark.comet.exec.scalaUDF.codegen.enabled` to `false` to route plans containing a `ScalaUDF` back to Spark for the enclosing operator. ## Configuration | Key | Default | Description | | ------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------ | -| `spark.comet.exec.scalaUDF.codegen.enabled` | `false` | When `true`, eligible `ScalaUDF`s run on the Comet path. When `false`, the enclosing operator falls back to Spark. | +| `spark.comet.exec.scalaUDF.codegen.enabled` | `true` | When `true`, eligible `ScalaUDF`s run on the Comet path. When `false`, the enclosing operator falls back to Spark. | ## Supported diff --git a/spark/src/main/scala/org/apache/comet/CometConf.scala b/spark/src/main/scala/org/apache/comet/CometConf.scala index 82700a939e..78ea0f0168 100644 --- a/spark/src/main/scala/org/apache/comet/CometConf.scala +++ b/spark/src/main/scala/org/apache/comet/CometConf.scala @@ -365,13 +365,13 @@ object CometConf extends ShimCometConf { val COMET_SCALA_UDF_CODEGEN_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.scalaUDF.codegen.enabled") .category(CATEGORY_EXEC) - .doc("Experimental. Whether to route Spark `ScalaUDF` expressions through Comet's " + + .doc("Whether to route Spark `ScalaUDF` expressions through Comet's " + "Arrow-direct codegen dispatcher. When enabled, a supported ScalaUDF is compiled into " + "a per-batch kernel that reads and writes Arrow vectors directly from native " + "execution. When disabled, plans containing a ScalaUDF fall back to Spark for the " + "enclosing operator.") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.native.shuffle.partitioning.hash.enabled") From d474b4661e1350d877fd649a57f1f02453f8810c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 29 May 2026 07:15:46 -0600 Subject: [PATCH 2/4] fix: keep ArrayInsertUnsupportedArgs fallback and quiet codegen log Disable UDF codegen dispatch within the ArrayInsertUnsupportedArgs test so the UDF-derived position stays non-convertible and continues to exercise the ArrayInsert fallback path. With codegen now enabled by default, the UDF was converted to proto, making ArrayInsert fully native and removing the expected fallback reason. Drop the per-compile log in CometBatchKernelCodegen from info to debug now that codegen dispatch is on by default and the message would otherwise be noisy. --- .../org/apache/comet/codegen/CometBatchKernelCodegen.scala | 2 +- .../scala/org/apache/comet/CometArrayExpressionSuite.scala | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala index 042fd9ced3..672302ad1d 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegen.scala @@ -199,7 +199,7 @@ object CometBatchKernelCodegen extends Logging with CometExprTraitShim { t) throw t } - logInfo( + logDebug( s"CometBatchKernelCodegen: compiled ${boundExpr.getClass.getSimpleName} " + s"-> ${boundExpr.dataType} inputs=" + inputSchema diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 797020aed0..081e99fc68 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -236,7 +236,11 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp test("ArrayInsertUnsupportedArgs") { // This test checks that the else branch in ArrayInsert // mapping to the comet is valid and fallback to spark is working fine. - withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[ArrayInsert]) -> "true") { + // Disable UDF codegen dispatch so the UDF-derived position remains + // non-convertible and forces the ArrayInsert fallback path. + withSQLConf( + CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "false", + CometConf.getExprAllowIncompatConfigKey(classOf[ArrayInsert]) -> "true") { withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled = false, 10000) From 72286f5131104111b136557d5a8b3ad258369096 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 29 May 2026 17:19:59 -0600 Subject: [PATCH 3/4] test: adjust two Spark SQL test expectations for native Scala UDF dispatch Enabling the JVM Scala UDF codegen dispatcher by default routes supported ScalaUDF expressions into native execution, changing observable behavior in two upstream Spark tests. Rather than disabling the dispatcher for these tests, adjust the expectations so the feature stays exercised: - SQLQuerySuite "Common subexpression elimination": CometProject and CometHashAggregate do not implement cross-sibling subexpression elimination over ScalaUDF, so the aggregate case invokes the UDF once per reference (count 3 vs 1). The query result is unchanged. Tracking: #4516. - SQLQueryTestSuite "udf/postgreSQL/udf-select_having.sql": the divide-by-zero in 1/udf(a) evaluates natively and surfaces as CometNativeException instead of SparkArithmeticException; normalize both to a DIVIDE_BY_ZERO placeholder for the literal compare. Tracking: #4517. --- dev/diffs/3.4.3.diff | 53 +++++++++++++++++++++++++++++++++++++++++--- dev/diffs/3.5.8.diff | 52 +++++++++++++++++++++++++++++++++++++++++-- dev/diffs/4.0.2.diff | 50 ++++++++++++++++++++++++++++++++++++++++- dev/diffs/4.1.1.diff | 48 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 195 insertions(+), 8 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 91bfe70d53..170cd2f190 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -497,7 +497,7 @@ index f33432ddb6f..b375e285dde 100644 } assert(scanOption.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala -index a6b295578d6..91acca4306f 100644 +index a6b295578d6..1167bbe6554 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -260,7 +260,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite @@ -925,7 +925,7 @@ index b5b34922694..a72403780c4 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 525d97e4998..f600e162da3 100644 +index 525d97e4998..aded8906d75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1508,7 +1508,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -938,7 +938,24 @@ index 525d97e4998..f600e162da3 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } -@@ -3730,7 +3731,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark +@@ -1960,8 +1961,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + countAcc.add(1) + x + }) ++ // Comet's `CometProject` and `CometHashAggregate` do not implement Spark's cross-sibling ++ // subexpression elimination over `ScalaUDF`, so each reference invokes the UDF body ++ // separately. The other call sites in this test pass against Comet because the source ++ // (`testData2`, a `LocalRelation`) is not Comet-scannable and the project runs on Spark's ++ // path; the `agg` case routes through `CometHashAggregate` once an Exchange enters the plan. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4516 + verifyCallCount( +- df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 1) ++ df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), ++ if (isCometEnabled) 3 else 1) + + verifyCallCount( + df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) +@@ -3730,7 +3738,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } @@ -948,6 +965,36 @@ index 525d97e4998..f600e162da3 100644 val sc = spark.sparkContext val hiveVersion = "2.3.9" // transitive=false, only download specified jar +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +index 2dabcf01be7..8fcec0d1ce4 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +@@ -491,8 +491,23 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper + s"Schema did not match for query #$i\n${expected.sql}: $output") { + output.schema + } +- assertResult(expected.output, s"Result did not match" + +- s" for query #$i\n${expected.sql}") { output.output } ++ // Comet may surface errors as `CometNativeException` instead of the matching Spark ++ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a ++ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). ++ // Same category, different surface. Collapse both sides to a placeholder when this happens ++ // so the literal compare passes. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 ++ val (expectedOut, actualOut) = if (isCometEnabled && ++ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && ++ expected.output.contains("\"DIVIDE_BY_ZERO\"") && ++ output.output.startsWith("org.apache.comet.CometNativeException") && ++ output.output.contains("DivideByZero")) { ++ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") ++ } else { ++ (expected.output, output.output) ++ } ++ assertResult(expectedOut, s"Result did not match" + ++ s" for query #$i\n${expected.sql}") { actualOut } + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 48ad10992c5..51d1ee65422 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 51c5054f91..792aa1a82c 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -932,7 +932,7 @@ index c26757c9cff..d55775f09d7 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 3cf2bfd17ab..a3effb1eeb8 100644 +index 3cf2bfd17ab..5bcf9478e9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -945,7 +945,24 @@ index 3cf2bfd17ab..a3effb1eeb8 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } -@@ -3750,7 +3751,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark +@@ -1979,8 +1980,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + countAcc.add(1) + x + }) ++ // Comet's `CometProject` and `CometHashAggregate` do not implement Spark's cross-sibling ++ // subexpression elimination over `ScalaUDF`, so each reference invokes the UDF body ++ // separately. The other call sites in this test pass against Comet because the source ++ // (`testData2`, a `LocalRelation`) is not Comet-scannable and the project runs on Spark's ++ // path; the `agg` case routes through `CometHashAggregate` once an Exchange enters the plan. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4516 + verifyCallCount( +- df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 1) ++ df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), ++ if (isCometEnabled) 3 else 1) + + verifyCallCount( + df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) +@@ -3750,7 +3758,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } @@ -955,6 +972,37 @@ index 3cf2bfd17ab..a3effb1eeb8 100644 val sc = spark.sparkContext val hiveVersion = "2.3.9" // transitive=false, only download specified jar +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +index 71af1fd69c3..81a04c93c9c 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +@@ -872,9 +872,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper + s"Schema did not match for query #$i\n${expected.sql}: $output") { + output.schema + } +- assertResult(expected.output, s"Result did not match" + ++ // Comet may surface errors as `CometNativeException` instead of the matching Spark ++ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a ++ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). ++ // Same category, different surface. Collapse both sides to a placeholder when this happens ++ // so the literal compare passes. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 ++ val (expectedOut, actualOut) = if (isCometEnabled && ++ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && ++ expected.output.contains("\"DIVIDE_BY_ZERO\"") && ++ output.output.startsWith("org.apache.comet.CometNativeException") && ++ output.output.contains("DivideByZero")) { ++ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") ++ } else { ++ (expected.output, output.output) ++ } ++ assertResult(expectedOut, s"Result did not match" + + s" for query #$i\n${expected.sql}") { +- output.output ++ actualOut + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 8b4ac474f87..3f79f20822f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/4.0.2.diff b/dev/diffs/4.0.2.diff index 5bfc423cd5..b7dcc042c2 100644 --- a/dev/diffs/4.0.2.diff +++ b/dev/diffs/4.0.2.diff @@ -1079,7 +1079,7 @@ index ad424b3a7cc..4ece0117a34 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index f294ff81021..7775027bcee 100644 +index f294ff81021..02d72be8d29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1524,7 +1524,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -1092,6 +1092,54 @@ index f294ff81021..7775027bcee 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -1985,8 +1986,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + countAcc.add(1) + x + }) ++ // Comet's `CometProject` and `CometHashAggregate` do not implement Spark's cross-sibling ++ // subexpression elimination over `ScalaUDF`, so each reference invokes the UDF body ++ // separately. The other call sites in this test pass against Comet because the source ++ // (`testData2`, a `LocalRelation`) is not Comet-scannable and the project runs on Spark's ++ // path; the `agg` case routes through `CometHashAggregate` once an Exchange enters the plan. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4516 + verifyCallCount( +- df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 1) ++ df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), ++ if (isCometEnabled) 3 else 1) + + verifyCallCount( + df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) +diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +index 575a4ae69d1..129d9f27232 100644 +--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ++++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +@@ -679,9 +679,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper + s"Schema did not match for query #$i\n${expected.sql}: $output") { + output.schema + } +- assertResult(expected.output, s"Result did not match" + ++ // Comet may surface errors as `CometNativeException` instead of the matching Spark ++ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a ++ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). ++ // Same category, different surface. Collapse both sides to a placeholder when this happens ++ // so the literal compare passes. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 ++ val (expectedOut, actualOut) = if (isCometEnabled && ++ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && ++ expected.output.contains("\"DIVIDE_BY_ZERO\"") && ++ output.output.startsWith("org.apache.comet.CometNativeException") && ++ output.output.contains("DivideByZero")) { ++ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") ++ } else { ++ (expected.output, output.output) ++ } ++ assertResult(expectedOut, s"Result did not match" + + s" for query #$i\n${expected.sql}") { +- output.output ++ actualOut + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index c1c041509c3..7d463e4b85e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala diff --git a/dev/diffs/4.1.1.diff b/dev/diffs/4.1.1.diff index 5cf6326dbf..6721ca5b0f 100644 --- a/dev/diffs/4.1.1.diff +++ b/dev/diffs/4.1.1.diff @@ -1150,7 +1150,7 @@ index e4b5e10f7c3..c6efde09c8a 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 74cdee49e55..3decf393ed0 100644 +index 74cdee49e55..f7452c9abb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -1163,8 +1163,25 @@ index 74cdee49e55..3decf393ed0 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } +@@ -1982,8 +1983,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + countAcc.add(1) + x + }) ++ // Comet's `CometProject` and `CometHashAggregate` do not implement Spark's cross-sibling ++ // subexpression elimination over `ScalaUDF`, so each reference invokes the UDF body ++ // separately. The other call sites in this test pass against Comet because the source ++ // (`testData2`, a `LocalRelation`) is not Comet-scannable and the project runs on Spark's ++ // path; the `agg` case routes through `CometHashAggregate` once an Exchange enters the plan. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4516 + verifyCallCount( +- df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), 1) ++ df.agg(sum(testUdf($"b") + testUdf($"b") + testUdf($"b"))), Row(3.0), ++ if (isCometEnabled) 3 else 1) + + verifyCallCount( + df.selectExpr("testUdf(a + 1) + testUdf(1 + a)", "testUdf(a + 1)"), Row(4, 2), 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala -index 23f0144dcec..df845f7295a 100644 +index 23f0144dcec..2586d93d630 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -166,7 +166,16 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper @@ -1185,6 +1202,33 @@ index 23f0144dcec..df845f7295a 100644 ) ++ otherIgnoreList /** List of test cases that require TPCDS table schemas to be loaded. */ private def requireTPCDSCases: Seq[String] = Seq("pipe-operators.sql") +@@ -682,9 +691,24 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper + s"Schema did not match for query #$i\n${expected.sql}: $output") { + output.schema + } +- assertResult(expected.output, s"Result did not match" + ++ // Comet may surface errors as `CometNativeException` instead of the matching Spark ++ // exception class when a `ScalaUDF` dispatched into the native plan evaluates a ++ // divide-by-zero (DataFusion wraps the typed error so the JNI bridge cannot downcast it). ++ // Same category, different surface. Collapse both sides to a placeholder when this happens ++ // so the literal compare passes. ++ // Tracking issue: https://github.com/apache/datafusion-comet/issues/4517 ++ val (expectedOut, actualOut) = if (isCometEnabled && ++ expected.output.startsWith("org.apache.spark.SparkArithmeticException") && ++ expected.output.contains("\"DIVIDE_BY_ZERO\"") && ++ output.output.startsWith("org.apache.comet.CometNativeException") && ++ output.output.contains("DivideByZero")) { ++ ("[DIVIDE_BY_ZERO]", "[DIVIDE_BY_ZERO]") ++ } else { ++ (expected.output, output.output) ++ } ++ assertResult(expectedOut, s"Result did not match" + + s" for query #$i\n${expected.sql}") { +- output.output ++ actualOut + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 66826a9ca76..ab4265a5fb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala From 0828864c9be8b8b9621dd13757dd985f9de4489b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 29 May 2026 18:13:44 -0600 Subject: [PATCH 4/4] test: disable codegen dispatcher in InitCap incompat-config test After merging main, #4499 routes InitCap through the JVM codegen dispatcher when allowIncompatible is off. With this PR enabling the dispatcher by default, `select initcap(_1)` now runs natively without allowIncompatible, so the "enable incompat expression using dynamic config" test no longer sees the Spark fallback it asserts. Disable the dispatcher within the test so it exercises the serde-level incompatible fallback path it is meant to cover. --- .../apache/comet/CometExpressionSuite.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index a172538f45..af56a03ab5 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1452,13 +1452,19 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { true }.length } - withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") { - val sql = "select initcap(_1) from tbl" - val (_, cometPlan) = checkSparkAnswer(sql) - assert(1 == countSparkProjectExec(cometPlan)) - withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> "true") { - val (_, cometPlan) = checkSparkAnswerAndOperator(sql) - assert(0 == countSparkProjectExec(cometPlan)) + // Disable the JVM codegen dispatcher so InitCap exercises the serde-level incompatible + // fallback path under test here. With the dispatcher enabled (the default), InitCap is routed + // natively through Spark's own codegen regardless of the allowIncompat config, which is + // covered separately. + withSQLConf(CometConf.COMET_SCALA_UDF_CODEGEN_ENABLED.key -> "false") { + withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") { + val sql = "select initcap(_1) from tbl" + val (_, cometPlan) = checkSparkAnswer(sql) + assert(1 == countSparkProjectExec(cometPlan)) + withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> "true") { + val (_, cometPlan) = checkSparkAnswerAndOperator(sql) + assert(0 == countSparkProjectExec(cometPlan)) + } } } }