When a ScalaUDF is dispatched into the native plan via the JVM Scala UDF codegen dispatcher (enabled by default in #4514), Comet's CometProject and CometHashAggregate do not implement Spark's cross-sibling common subexpression elimination over ScalaUDF. An expression such as sum(udf(b) + udf(b) + udf(b)) therefore invokes the UDF body once per reference instead of once.
This is observable in Spark's SQLQuerySuite "Common subexpression elimination" test: the call count for the aggregate case is 3 under Comet versus 1 in vanilla Spark. The query result is unchanged; only the number of UDF invocations differs.
Follow-on from #4514. We should extend the cross-sibling CSE that CometProject performs to the aggregate operator's input projection (and any other operator that builds an input projection) so that repeated ScalaUDF references are evaluated once.
When a
ScalaUDFis dispatched into the native plan via the JVM Scala UDF codegen dispatcher (enabled by default in #4514), Comet'sCometProjectandCometHashAggregatedo not implement Spark's cross-sibling common subexpression elimination overScalaUDF. An expression such assum(udf(b) + udf(b) + udf(b))therefore invokes the UDF body once per reference instead of once.This is observable in Spark's
SQLQuerySuite"Common subexpression elimination" test: the call count for the aggregate case is 3 under Comet versus 1 in vanilla Spark. The query result is unchanged; only the number of UDF invocations differs.Follow-on from #4514. We should extend the cross-sibling CSE that
CometProjectperforms to the aggregate operator's input projection (and any other operator that builds an input projection) so that repeatedScalaUDFreferences are evaluated once.