Conversation
…ce epoch from timestamps.
parthchandra
left a comment
There was a problem hiding this comment.
The PR text says there is an end-to-end test but there doesn't seem to be any. CometTemporalExpressionSuite is probably the right place to add such a test similar to the "days - timestamp(input)" test.
| )?; | ||
| let offset_secs = | ||
| tz.offset_from_utc_datetime(&dt).fix().local_minus_utc() as i64; | ||
| let local_micros = micros + offset_secs * 1_000_000; |
There was a problem hiding this comment.
In Spark's corresponding implementation in InMemoryBaseTable it looks like the session timezone is not being applied.
Can you add a unit test that reads from InMemoryBaseTable and compares with the results produced by Spark ?
There was a problem hiding this comment.
Sure, thanks @parthchandra for review. I’ll correct it, add the missing test, and update it in the next commit.
|
|
||
| match args { | ||
| [ColumnarValue::Array(array)] => { | ||
| let ts_array = as_primitive_array::<TimestampMicrosecondType>(&array); |
There was a problem hiding this comment.
This should be after the match on array.data_type in the DataType::Timestamp(Microsecond, _) arm. This would panic for other types.
There was a problem hiding this comment.
Fixed. Moved the cast inside the DataType::Timestamp(Microsecond,_) to prevent panics on unsupported types.
| let result: Int32Array = match array.data_type() { | ||
| DataType::Timestamp(Microsecond, _) => { | ||
| arrow::compute::kernels::arity::unary(ts_array, |micros| { | ||
| micros.div_euclid(MICROS_PER_HOUR) as i32 |
There was a problem hiding this comment.
Why div_euclid? Elsewhere the code is generally using div_floor
There was a problem hiding this comment.
Updated to use div_floor to match the rest of the codebase. Thanks for pointing this out!
| binding: Boolean): Option[ExprOuterClass.Expr] = { | ||
| val childExpr = exprToProtoInternal(expr.child, inputs, binding) | ||
|
|
||
| if (childExpr.isDefined) { |
There was a problem hiding this comment.
It might be better to explicitly check the child expr datatype and only allow valid types, fall back otherwise.
See CometDays below.
There was a problem hiding this comment.
Fixed. Added explicit type checking to only allow TimestampType and TimestampNTZType, and it will now fall back for other types similarly to CometDays.
| val ts = row.getAs[java.time.LocalDateTime]("ts") | ||
| val micros = if (ts != null) { | ||
| org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros(ts) | ||
| } else 0L // assuming safe non-null |
There was a problem hiding this comment.
If the timestamp generated by the generator is null, then hours should return null. This will return 0.
There was a problem hiding this comment.
Fixed. It now properly handles null values and returns null instead of 0.
|
Thanks to @parthchandra for the review and feedback. PR has been updated. |
parthchandra
left a comment
There was a problem hiding this comment.
lgtm. @0lai0 can you resolve the merge conflicts so this can be merged?
|
Thanks @parthchandra . Conflicts resolved and PR updated. |
Which issue does this PR close?
Closes #3125
Rationale for this change
Comet previously did not support the Spark
hoursexpression (a V2 partition transform).Queries using the
hoursfunction for partitioning would fall back to Spark's JVM execution instead of running natively on DataFusion. By adding native support for this expression, we allow more Spark workloads (especially those partitioned by hourly intervals) to benefit from Comet's native acceleration.What changes are included in this PR?
This change adds end-to-end native support for the
hourspartition transform. SinceHoursis aPartitionTransformExpression(and not aTimeZoneAwareExpression), the timezone is injected from the session configuration during the planning phase.The native implementation uses Arrow's
unaryandtry_unarykernels for efficient vectorized computationThe semantics are: hours since Unix epoch (1970-01-01 00:00:00 UTC), computed by floor-dividing the raw microsecond value by 3_600_000_000. Both TimestampType and TimestampNTZType use the same arithmetic — no session timezone offset is applied, since this transform is always UTC-based..expr.proto: AddedHoursTransformmessage definition to pass the child expression and session timezone.datetime.scala: AddedCometHoursserde handler to intercept the SparkHoursexpression and read the timezone fromSQLConf.QueryPlanSerde.scala: Registered theCometHourshandler in the temporal expressions map.hours.rs: AddedSparkHoursTransformUDF using efficient Arrow kernels.temporal.rs&expression_registry.rs: Registered the native Builder for the new expression.How are these changes tested?
Added comprehensive evaluation in both Rust and Scala:
hours.rscovering:TimestampNTZType(ensuring it ignores timezone offsets)cargo test -p datafusion-comet-spark-expr -- datetime_funcs::hours