From 1d3e730320fb601eb026130ec09fb71459466745 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Thu, 21 May 2026 23:35:35 -0700 Subject: [PATCH 1/7] [AURON #1853] Register built-in RexNode converters; add fallback config option Prerequisite infrastructure for the shadowed StreamExecCalc landing in a subsequent commit: - FlinkNodeConverterFactory now ships the three built-in converters (RexInputRefConverter, RexLiteralConverter, RexCallConverter) on the singleton at class load, so production callers reach a usable factory without explicit registration. Tests creating fresh instances via the package-private constructor stay unaffected. - FlinkAuronConfiguration gains FAIL_BACK_FLINK_ENGINE_ENABLED (boolean, default true), keyed auron.failback.flink.engine.enabled. Controls whether conversion failure falls back to the Flink engine silently (default) or fails the job at submit time. Tests: FlinkAuronConfigurationTest 2/2, FlinkNodeConverterFactoryTest 9/9, checkstyle 0 violations. Issue: https://github.com/apache/auron/issues/1853 --- .../converter/FlinkNodeConverterFactory.java | 12 ++++++++++++ .../FlinkNodeConverterFactoryTest.java | 18 ++++++++++++++++++ .../configuration/FlinkAuronConfiguration.java | 13 +++++++++++++ .../FlinkAuronConfigurationTest.java | 8 ++++++++ 4 files changed, 51 insertions(+) diff --git a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java index a86101354..089b5ad0a 100644 --- a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java +++ b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java @@ -37,6 +37,10 @@ * (keyed by {@code AggregateCall} class) * * + *

The singleton returned by {@link #getInstance()} is initialized with the built-in + * {@link FlinkRexNodeConverter} implementations ({@link RexInputRefConverter}, + * {@link RexLiteralConverter}, {@link RexCallConverter}) registered. + * *

Usage: *

  *   FlinkNodeConverterFactory factory = FlinkNodeConverterFactory.getInstance();
@@ -52,6 +56,14 @@ public class FlinkNodeConverterFactory {
 
     private static final FlinkNodeConverterFactory INSTANCE = new FlinkNodeConverterFactory();
 
+    static {
+        // Production callers reach RexNode converters through the singleton; register the
+        // built-ins eagerly so getInstance() is usable without setup.
+        INSTANCE.registerRexConverter(new RexInputRefConverter());
+        INSTANCE.registerRexConverter(new RexLiteralConverter());
+        INSTANCE.registerRexConverter(new RexCallConverter(INSTANCE));
+    }
+
     private final Map, FlinkRexNodeConverter> rexConverterMap;
     private final Map, FlinkAggCallConverter> aggConverterMap;
 
diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
index ac6451747..8fb904f17 100644
--- a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
+++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
@@ -27,6 +27,8 @@
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -143,6 +145,22 @@ void testGetConverterAbsent() {
         assertFalse(found.isPresent());
     }
 
+    @Test
+    void testSingletonHasBuiltInRexConvertersRegistered() {
+        FlinkNodeConverterFactory singleton = FlinkNodeConverterFactory.getInstance();
+
+        Optional> inputRefConverter = singleton.getConverter(RexInputRef.class);
+        Optional> literalConverter = singleton.getConverter(RexLiteral.class);
+        Optional> callConverter = singleton.getConverter(RexCall.class);
+
+        assertTrue(inputRefConverter.isPresent());
+        assertTrue(literalConverter.isPresent());
+        assertTrue(callConverter.isPresent());
+        assertTrue(inputRefConverter.get() instanceof RexInputRefConverter);
+        assertTrue(literalConverter.get() instanceof RexLiteralConverter);
+        assertTrue(callConverter.get() instanceof RexCallConverter);
+    }
+
     // ---- Test stubs ----
 
     /** Stub FlinkRexNodeConverter for testing. */
diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
index 69d76bac4..ec3e77983 100644
--- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
+++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java
@@ -40,6 +40,19 @@ public class FlinkAuronConfiguration extends AuronConfiguration {
             .withDescription("The auron native memory size to use.")
             .withDefaultValue(256 * 1024 * 1024L); // 256 MB
 
+    /**
+     * When an Auron operator conversion fails at planning time, controls whether the
+     * job falls back to Flink's stock engine for that operator (true) or fails the
+     * submission with an {@link IllegalStateException} (false). The default is
+     * {@code true}; set to {@code false} in CI or new-converter development to surface
+     * gaps in Auron coverage instead of silently degrading.
+     */
+    public static final ConfigOption FAIL_BACK_FLINK_ENGINE_ENABLED = new ConfigOption<>(Boolean.class)
+            .withKey("auron.failback.flink.engine.enabled")
+            .withDescription("When an Auron operator conversion fails, does it fall back to "
+                    + "the Flink engine for execution?")
+            .withDefaultValue(true);
+
     private final Configuration flinkConfig;
 
     public FlinkAuronConfiguration() {
diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
index fe98b927e..ea198a914 100644
--- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
+++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/configuration/FlinkAuronConfigurationTest.java
@@ -46,5 +46,13 @@ public void testGetConfigFromFlinkConfig() {
         assertEquals(config.get(AuronConfiguration.BATCH_SIZE), 9999);
         assertEquals(config.get(AuronConfiguration.NATIVE_LOG_LEVEL), "DEBUG");
         assertEquals(config.get(AuronConfiguration.MEMORY_FRACTION), 0.6); // default value
+        assertEquals(true, config.get(FlinkAuronConfiguration.FAIL_BACK_FLINK_ENGINE_ENABLED));
+    }
+
+    @Test
+    public void testFailBackFlinkEngineEnabledDefaultsToTrue() {
+        assertEquals(
+                "auron.failback.flink.engine.enabled", FlinkAuronConfiguration.FAIL_BACK_FLINK_ENGINE_ENABLED.key());
+        assertEquals(Boolean.TRUE, FlinkAuronConfiguration.FAIL_BACK_FLINK_ENGINE_ENABLED.defaultValue());
     }
 }

From 78ad58a5e804346c5c2533d194749d4cde51ff5a Mon Sep 17 00:00:00 2001
From: Weiqing Yang 
Date: Fri, 22 May 2026 00:24:11 -0700
Subject: [PATCH 2/7] [AURON #1853] Shadow Flink's StreamExecCalc to attempt
 native Calc
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

The shadowed class lives at the same FQCN as Flink's stock
StreamExecCalc; classpath ordering (auron-flink-planner ahead of
flink-table-planner via the standard auron-flink-assembly packaging)
makes the planner construct this class whenever it builds a Calc
ExecNode. Same pattern as Apache Gluten's gluten-flink.

At translateToPlanInternal time, the class attempts to translate its
projection + condition into a native Project[Filter?[FFIReader]] plan
using the converter framework. On success, constructs a
FlinkAuronCalcOperator inline and wraps it in a OneInputTransformation.
On any failure (unsupported RexNode or exception during composition),
falls back to Flink's stock CodeGenOperator via
super.translateToPlanInternal — gated by FAIL_BACK_FLINK_ENGINE_ENABLED
(default true falls back, false throws IllegalStateException).

Observability: WARN per unique unsupported RexNode class
(deduplicated), WARN per plan-composition exception (per-occurrence
with stack trace). Per-submission INFO summary deferred — Flink
1.18's PlannerBase exposes no clean submission-end hook.

Also: NativeRuntimeFactory now extends java.io.Serializable
(@VisibleForTesting interface). Without the marker, Flink's operator
dispatch to TaskManagers throws NotSerializableException — a latent
defect in #1857 that the E2E ITCase in the next commit will exercise.
Marker interfaces are non-breaking.

Tests: StreamExecCalcTest 12/12, FlinkAuronCalcOperatorTest 14/14,
checkstyle 0 violations.

Issue: https://github.com/apache/auron/issues/1853
---
 .../nodes/exec/stream/StreamExecCalc.java     | 329 ++++++++
 .../nodes/exec/stream/StreamExecCalcTest.java | 705 ++++++++++++++++++
 .../org.apache.auron.jni.AuronAdaptorProvider |  17 +
 .../strict-mode-flink-conf/flink-conf.yaml    |  16 +
 .../operator/FlinkAuronCalcOperator.java      |   8 +-
 5 files changed, 1073 insertions(+), 2 deletions(-)
 create mode 100644 auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
 create mode 100644 auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java
 create mode 100644 auron-flink-extension/auron-flink-planner/src/test/resources/META-INF/services/org.apache.auron.jni.AuronAdaptorProvider
 create mode 100644 auron-flink-extension/auron-flink-planner/src/test/resources/strict-mode-flink-conf/flink-conf.yaml

diff --git a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
new file mode 100644
index 000000000..d0efc5672
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
+import org.apache.auron.flink.runtime.operator.FlinkAuronCalcOperator;
+import org.apache.auron.flink.table.planner.converter.ConverterContext;
+import org.apache.auron.flink.table.planner.converter.FlinkNodeConverterFactory;
+import org.apache.auron.flink.utils.SchemaConverters;
+import org.apache.auron.jni.AuronAdaptor;
+import org.apache.auron.protobuf.FFIReaderExecNode;
+import org.apache.auron.protobuf.FilterExecNode;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.auron.protobuf.PhysicalPlanNode;
+import org.apache.auron.protobuf.ProjectionExecNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Shadows Flink's stock {@code StreamExecCalc} via fully-qualified-class-name resolution. Java's
+ * classloader resolves one class per FQCN; with {@code auron-flink-planner} ahead of {@code
+ * flink-table-planner} on the classpath, Flink's planner constructs this class whenever it builds
+ * a Calc {@code ExecNode}.
+ *
+ * 

{@link #translateToPlanInternal(PlannerBase, ExecNodeConfig)} attempts to translate the + * projection + condition into a native {@link PhysicalPlanNode} using the converter framework. On + * success it returns a {@link Transformation} wrapping a {@link FlinkAuronCalcOperator}; on + * failure it either delegates to {@code super.translateToPlanInternal} (when + * {@link FlinkAuronConfiguration#FAIL_BACK_FLINK_ENGINE_ENABLED} is {@code true}, the default) or + * throws {@link IllegalStateException}. + * + *

Per-fallback observability: the first time a given unsupported {@link RexNode} class is seen + * (per planner thread), a WARN log line is emitted naming the failing node ID and the unsupported + * class so users can grep for missing converter coverage. Subsequent fallbacks on the same + * {@link RexNode} class are silent to avoid log spam. + */ +@ExecNodeMetadata( + name = "stream-exec-calc", + version = 1, + minPlanVersion = FlinkVersion.v1_15, + minStateVersion = FlinkVersion.v1_15) +public class StreamExecCalc extends CommonExecCalc implements StreamExecNode { + + private static final Logger LOG = LoggerFactory.getLogger(StreamExecCalc.class); + + /** + * Dedup set for the per-fallback WARN. A given unsupported {@link RexNode} class is logged at + * most once per planner thread; reused across submissions because the next submission only + * adds noise if a brand-new class shows up. ThreadLocal keeps the bookkeeping off the static + * mutable state path (planner threads in session clusters are reused). + */ + private static final ThreadLocal>> WARN_DEDUP = ThreadLocal.withInitial(HashSet::new); + + /** + * Per-thread counter incremented every time a WARN log line is actually emitted (i.e. the + * fallback class was new in the dedup set). Tests inspect this through {@link + * #peekWarnEmitCountForTest()} to assert dedup behavior without depending on a particular + * SLF4J binding configuration. + */ + private static final ThreadLocal WARN_EMIT_COUNT = ThreadLocal.withInitial(() -> new int[1]); + + /** + * Constructs a stream Calc node. Matches Flink's stock {@code StreamExecCalc} primary + * constructor signature so the planner's reflective instantiation finds this shadowed class + * without changes. + * + * @param tableConfig table-level configuration to persist into the node context + * @param projection projection expressions evaluated per row + * @param condition filter expression evaluated per row, or {@code null} for no filter + * @param inputProperty input edge property for the single upstream operator + * @param outputType output {@link RowType} of this node + * @param description human-readable description used for logging and metrics + */ + public StreamExecCalc( + ReadableConfig tableConfig, + List projection, + @Nullable RexNode condition, + InputProperty inputProperty, + RowType outputType, + String description) { + super( + ExecNodeContext.newNodeId(), + ExecNodeContext.newContext(StreamExecCalc.class), + ExecNodeContext.newPersistedConfig(StreamExecCalc.class, tableConfig), + projection, + condition, + TableStreamOperator.class, + true, + Collections.singletonList(inputProperty), + outputType, + description); + } + + /** + * JSON deserialization constructor used for compiled plan restoration. Matches Flink's stock + * {@code StreamExecCalc} {@code @JsonCreator} constructor signature. + * + * @param id pre-assigned node ID + * @param context restored node context + * @param persistedConfig configuration persisted with the compiled plan + * @param projection projection expressions evaluated per row + * @param condition filter expression evaluated per row, or {@code null} for no filter + * @param inputProperties input edge properties for upstream operators + * @param outputType output {@link RowType} of this node + * @param description human-readable description used for logging and metrics + */ + @JsonCreator + public StreamExecCalc( + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, + @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig, + @JsonProperty(FIELD_NAME_PROJECTION) List projection, + @Nullable @JsonProperty(FIELD_NAME_CONDITION) RexNode condition, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super( + id, + context, + persistedConfig, + projection, + condition, + TableStreamOperator.class, + true, + inputProperties, + outputType, + description); + } + + @Override + @SuppressWarnings("unchecked") + protected Transformation translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) { + final Transformation upstream = + (Transformation) getInputEdges().get(0).translateToPlan(planner); + final RowType inputRowType = (RowType) getInputEdges().get(0).getOutputType(); + final RowType outputRowType = (RowType) getOutputType(); + + final Optional plan = tryBuildAuronPlan(inputRowType, outputRowType); + + if (!plan.isPresent()) { + final boolean fallbackEnabled = AuronAdaptor.getInstance() + .getAuronConfiguration() + .get(FlinkAuronConfiguration.FAIL_BACK_FLINK_ENGINE_ENABLED); + if (fallbackEnabled) { + LOG.debug("Falling back to Flink's CodeGen Calc for node {}", getId()); + return translateToFlinkCalc(planner, config); + } + throw new IllegalStateException( + "Auron Calc conversion failed for node " + getId() + " and fallback is disabled"); + } + + final FlinkAuronCalcOperator operator = + new FlinkAuronCalcOperator(plan.get(), inputRowType, outputRowType, "FlinkAuronCalc-" + getId()); + + return ExecNodeUtil.createOneInputTransformation( + upstream, + createTransformationMeta(CALC_TRANSFORMATION, config), + SimpleOperatorFactory.of(operator), + InternalTypeInfo.of(outputRowType), + upstream.getParallelism(), + 0L, + false); + } + + /** + * Indirection over {@code super.translateToPlanInternal} so tests can stub the Flink + * codegen-Calc fallback path without running Flink's full code-generation machinery. Production + * delegates straight through. + * + * @param planner the planner forwarded from {@link #translateToPlanInternal} + * @param config the exec-node config forwarded from {@link #translateToPlanInternal} + * @return Flink's stock {@link Transformation} produced by {@code CommonExecCalc} + */ + protected Transformation translateToFlinkCalc(PlannerBase planner, ExecNodeConfig config) { + return super.translateToPlanInternal(planner, config); + } + + /** + * Attempts to compose a native {@code Project[Filter?[FFIReader-placeholder]]} plan from this + * node's projection and condition. Returns {@link Optional#empty()} if any {@link RexNode} is + * unsupported by the converter framework, or if plan composition throws — both signals are + * the same for the caller: fall back to Flink's codegen Calc. + * + *

The outer {@link Throwable} catch is defence-in-depth: the converter framework already + * catches per-RexNode {@link Exception}, but {@link SchemaConverters} can throw {@link + * UnsupportedOperationException} on a {@link RowType} containing an unsupported logical type, + * and protobuf composition can theoretically throw on invalid inputs. Treating any failure as + * fallback keeps the rewriter safe. + * + * @param inputRowType the upstream row type used by the converter context + * @param outputRowType the row type of this Calc's output + * @return a composed plan, or empty if conversion failed + */ + private Optional tryBuildAuronPlan(RowType inputRowType, RowType outputRowType) { + try { + final ConverterContext ctx = new ConverterContext( + getPersistedConfig(), + AuronAdaptor.getInstance().getAuronConfiguration(), + Thread.currentThread().getContextClassLoader(), + inputRowType); + final FlinkNodeConverterFactory converters = FlinkNodeConverterFactory.getInstance(); + + PhysicalExprNode filterExpr = null; + if (condition != null) { + final Optional c = converters.convertRexNode(condition, ctx); + if (!c.isPresent()) { + recordFallback(condition.getClass()); + return Optional.empty(); + } + filterExpr = c.get(); + } + + final List projectExprs = new ArrayList<>(projection.size()); + for (RexNode rex : projection) { + final Optional c = converters.convertRexNode(rex, ctx); + if (!c.isPresent()) { + recordFallback(rex.getClass()); + return Optional.empty(); + } + projectExprs.add(c.get()); + } + + // numPartitions = 1 because each parallel FlinkAuronCalcOperator subtask owns a + // single Java-side exporter and one corresponding native partition; per-subtask + // parallelism is governed by Flink's outer Transformation parallelism, not by the + // FFI Reader's partition count. + final FFIReaderExecNode ffiReader = FFIReaderExecNode.newBuilder() + .setNumPartitions(1) + .setSchema(SchemaConverters.convertToAuronSchema(inputRowType, false)) + .setExportIterProviderResourceId("placeholder") + .build(); + PhysicalPlanNode current = + PhysicalPlanNode.newBuilder().setFfiReader(ffiReader).build(); + + if (filterExpr != null) { + final FilterExecNode filterNode = FilterExecNode.newBuilder() + .setInput(current) + .addExpr(filterExpr) + .build(); + current = PhysicalPlanNode.newBuilder().setFilter(filterNode).build(); + } + + final ProjectionExecNode.Builder proj = + ProjectionExecNode.newBuilder().setInput(current); + for (int i = 0; i < projectExprs.size(); i++) { + proj.addExpr(projectExprs.get(i)); + proj.addExprName(outputRowType.getFieldNames().get(i)); + proj.addDataType(SchemaConverters.convertToAuronArrowType(outputRowType.getTypeAt(i))); + } + return Optional.of( + PhysicalPlanNode.newBuilder().setProjection(proj.build()).build()); + + } catch (Throwable t) { + WARN_EMIT_COUNT.get()[0]++; + LOG.warn( + "Auron StreamExecCalc fallback (node {}): plan composition threw {}; using Flink CodeGen Calc.", + getId(), + t.getClass().getName(), + t); + return Optional.empty(); + } + } + + /** + * Logs a WARN line on the first occurrence of each unsupported {@link RexNode} class per + * planner thread. Subsequent occurrences are silent. + */ + private void recordFallback(Class unsupportedRexClass) { + if (WARN_DEDUP.get().add(unsupportedRexClass)) { + WARN_EMIT_COUNT.get()[0]++; + LOG.warn( + "Auron StreamExecCalc fallback (node {}): unsupported RexNode {}; using Flink CodeGen Calc.", + getId(), + unsupportedRexClass.getName()); + } + } + + /** Test seam: clears the per-thread dedup set and emit counter so independent tests do not + * share state. */ + static void resetWarnDedupForTest() { + WARN_DEDUP.get().clear(); + WARN_EMIT_COUNT.get()[0] = 0; + } + + /** Test seam: returns the number of WARN log lines actually emitted on the current thread + * since the last {@link #resetWarnDedupForTest()}. */ + static int peekWarnEmitCountForTest() { + return WARN_EMIT_COUNT.get()[0]; + } +} diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java new file mode 100644 index 000000000..5da71b88f --- /dev/null +++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java @@ -0,0 +1,705 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.auron.flink.runtime.operator.FlinkAuronCalcOperator; +import org.apache.auron.protobuf.ArrowType; +import org.apache.auron.protobuf.FFIReaderExecNode; +import org.apache.auron.protobuf.FilterExecNode; +import org.apache.auron.protobuf.PhysicalPlanNode; +import org.apache.auron.protobuf.ProjectionExecNode; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for the shadowed {@link StreamExecCalc}. + * + *

Lives in the same package as the production class so {@code @JsonCreator} field-name + * constants and the {@code protected translateToFlinkCalc} hook are reachable from tests. + * + *

Test strategy: + * + *

    + *
  • Plan-build success paths exercise the real {@code translateToPlanInternal} end-to-end. A + * fake upstream {@link ExecNode} is wired in via {@link ExecNodeBase#setInputEdges} with a + * pre-built {@link Transformation} cached in the source's {@code transformation} field so + * {@code ExecNodeBase.translateToPlan} returns it without consulting a real planner. + *
  • Fallback paths use {@link CapturingTranslator} to subclass {@code StreamExecCalc} and + * override {@code translateToFlinkCalc}, recording whether the super-codegen path was + * invoked and short-circuiting the heavy Flink machinery. + *
  • WARN log assertions capture {@code System.err} (the default SLF4J SimpleLogger target); + * the matching test class also resets the dedup ThreadLocal between tests so logging + * behavior is deterministic. + *
+ */ +class StreamExecCalcTest { + + private static final RelDataTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(); + private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY); + private static final RowType TWO_INT_ROW = + RowType.of(new LogicalType[] {new IntType(), new IntType()}, new String[] {"f0", "f1"}); + + private TableConfig tableConfig; + private InputProperty inputProperty; + private ExecNodeConfig nodeConfig; + + @BeforeEach + void setUp() throws Exception { + // Reset id counter so getId()-derived strings are reproducible across tests. + ExecNodeContext.resetIdCounter(); + // Reset the per-fallback WARN dedup so each test starts from a clean slate. + invokeStatic(StreamExecCalc.class, "resetWarnDedupForTest"); + + tableConfig = TableConfig.getDefault(); + inputProperty = InputProperty.DEFAULT; + nodeConfig = ExecNodeConfig.ofNodeConfig(new Configuration(), false); + } + + // ===================================================================== + // Plan-build success paths + // ===================================================================== + + /** Contract: with a non-null condition + projection of supported RexNodes, the operator wired + * into the returned Transformation is {@link FlinkAuronCalcOperator}. */ + @Test + void testProjectAndFilterEmitsAuronOperator() throws Exception { + StreamExecCalc node = newCalc( + Arrays.asList(intRef(0), intRef(1)), + makeBinary(intType(), SqlStdOperatorTable.PLUS, intRef(0), intRef(1)), + TWO_INT_ROW); + wireFakeUpstream(node, TWO_INT_ROW); + + Transformation result = invokeTranslate(node); + + assertTrue(operatorOf(result) instanceof FlinkAuronCalcOperator); + } + + /** Contract: with a null condition and supported projection, the operator is still Auron. */ + @Test + void testProjectOnlyEmitsAuronOperator() throws Exception { + StreamExecCalc node = newCalc(Arrays.asList(intRef(0), intRef(1)), null, TWO_INT_ROW); + wireFakeUpstream(node, TWO_INT_ROW); + + Transformation result = invokeTranslate(node); + + FlinkAuronCalcOperator op = (FlinkAuronCalcOperator) operatorOf(result); + // No filter → plan is Project[FFIReader], not Project[Filter[FFIReader]]. + PhysicalPlanNode plan = op.getPhysicalPlanNodes().get(0); + assertTrue(plan.hasProjection(), "Root must be a Projection"); + assertFalse(plan.getProjection().getInput().hasFilter(), "No filter wrapper expected when condition is null"); + } + + /** Contract: an identity projection (pure {@code RexInputRef}s) still converts. */ + @Test + void testIdentityProjectionEmitsAuronOperator() throws Exception { + StreamExecCalc node = newCalc(Arrays.asList(intRef(0), intRef(1)), null, TWO_INT_ROW); + wireFakeUpstream(node, TWO_INT_ROW); + + Transformation result = invokeTranslate(node); + + assertTrue(operatorOf(result) instanceof FlinkAuronCalcOperator); + } + + /** Contract: the {@link ProjectionExecNode} carries field names and arrow types matching + * {@code outputRowType}, so a downstream reader can rebuild the output schema without + * consulting the JVM types again. */ + @Test + void testSchemaPropagatedToProjectionExecNode() throws Exception { + RowType outputType = RowType.of(new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"a", "b"}); + StreamExecCalc node = + newCalc(Arrays.asList(intRef(0), REX_BUILDER.makeInputRef(bigintType(), 1)), null, outputType); + wireFakeUpstream( + node, RowType.of(new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"f0", "f1"})); + + Transformation result = invokeTranslate(node); + + FlinkAuronCalcOperator op = (FlinkAuronCalcOperator) operatorOf(result); + PhysicalPlanNode plan = op.getPhysicalPlanNodes().get(0); + ProjectionExecNode proj = plan.getProjection(); + assertEquals(Arrays.asList("a", "b"), proj.getExprNameList()); + assertEquals(2, proj.getDataTypeCount()); + assertEquals(ArrowType.ArrowTypeEnumCase.INT32, proj.getDataType(0).getArrowTypeEnumCase()); + assertEquals(ArrowType.ArrowTypeEnumCase.INT64, proj.getDataType(1).getArrowTypeEnumCase()); + // FFI Reader leaf carries the input schema for the runtime. + FFIReaderExecNode ffi = plan.getProjection().getInput().getFfiReader(); + assertEquals(1, ffi.getNumPartitions()); + assertEquals("placeholder", ffi.getExportIterProviderResourceId()); + assertEquals(2, ffi.getSchema().getColumnsCount()); + } + + /** Contract: when both projection and condition are supported, the constructed plan is + * {@code Project[Filter[FFIReader]]} (Filter wrapper present). */ + @Test + void testPlanShapeIsProjectFilterFFIReaderWhenConditionIsPresent() throws Exception { + StreamExecCalc node = newCalc( + Arrays.asList(intRef(0)), + makeBinary(intType(), SqlStdOperatorTable.PLUS, intRef(0), intRef(1)), + RowType.of(new IntType())); + wireFakeUpstream(node, TWO_INT_ROW); + + Transformation result = invokeTranslate(node); + + PhysicalPlanNode plan = ((FlinkAuronCalcOperator) operatorOf(result)) + .getPhysicalPlanNodes() + .get(0); + assertTrue(plan.hasProjection()); + PhysicalPlanNode innerProj = plan.getProjection().getInput(); + assertTrue(innerProj.hasFilter(), "Project's child must be a Filter when condition is non-null"); + FilterExecNode filter = innerProj.getFilter(); + assertEquals(1, filter.getExprCount()); + assertTrue(filter.getInput().hasFfiReader(), "Filter's child must be FFIReader"); + } + + // ===================================================================== + // Fallback paths (default config FAIL_BACK_FLINK_ENGINE_ENABLED=true) + // ===================================================================== + + /** Contract: an unsupported RexNode in the condition triggers fallback — Auron's + * {@code translateToFlinkCalc} hook is invoked and its stub Transformation is returned. */ + @Test + void testFallsBackWhenUnsupportedRexNodeInCondition() throws Exception { + Transformation stub = new FakeSourceTransformation(); + CapturingTranslator node = new CapturingTranslator( + tableConfig, + Arrays.asList(intRef(0)), + makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)), + inputProperty, + RowType.of(new IntType()), + "calc", + stub); + wireFakeUpstream(node, TWO_INT_ROW); + + Transformation result = invokeTranslate(node); + + assertSame(stub, result); + assertEquals(1, node.fallbackCount); + } + + /** Contract: an unsupported RexNode in the projection triggers fallback. */ + @Test + void testFallsBackWhenUnsupportedRexNodeInProjection() throws Exception { + Transformation stub = new FakeSourceTransformation(); + // EQUALS produces a RexCall whose isSupported() returns false in RexCallConverter. + CapturingTranslator node = new CapturingTranslator( + tableConfig, + Arrays.asList(makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1))), + null, + inputProperty, + RowType.of(new IntType()), + "calc", + stub); + wireFakeUpstream(node, TWO_INT_ROW); + + Transformation result = invokeTranslate(node); + + assertSame(stub, result); + assertEquals(1, node.fallbackCount); + } + + /** Contract: a {@code RowType} containing a RAW logical type causes + * {@code SchemaConverters.convertToAuronArrowType} to throw; the outer {@code Throwable} + * catch maps this to {@code Optional.empty()} and the operator falls back. */ + @Test + void testFallsBackWhenSchemaConversionThrows() throws Exception { + Transformation stub = new FakeSourceTransformation(); + // Output schema includes a RAW type — SchemaConverters does not handle this kind. + RowType outputWithRaw = RowType.of( + new LogicalType[] {new IntType(), new RawType<>(Object.class, new ObjectSerializer())}, + new String[] {"a", "rawcol"}); + // Use RexInputRefs that exist in the input schema (TWO_INT_ROW has 2 ints). + // Output type mismatch with projection type is fine for plan-build — schema conversion + // is the failure point. + CapturingTranslator node = new CapturingTranslator( + tableConfig, Arrays.asList(intRef(0), intRef(1)), null, inputProperty, outputWithRaw, "calc", stub); + wireFakeUpstream(node, TWO_INT_ROW); + + Transformation result = invokeTranslate(node); + + assertSame(stub, result, "Schema conversion failure must trigger fallback"); + assertEquals(1, node.fallbackCount); + assertEquals(1, StreamExecCalc.peekWarnEmitCountForTest()); + } + + // ===================================================================== + // Strict mode (FAIL_BACK_FLINK_ENGINE_ENABLED=false) + // ===================================================================== + + /** Contract: with fallback disabled and an unsupported RexNode, the operator throws + * {@link IllegalStateException} rather than degrading silently. */ + @Test + void testThrowsWhenFallbackDisabled() throws Exception { + withStrictModeConf(() -> { + CapturingTranslator node = new CapturingTranslator( + tableConfig, + Arrays.asList(intRef(0)), + makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)), + inputProperty, + RowType.of(new IntType()), + "calc", + new FakeSourceTransformation()); + wireFakeUpstream(node, TWO_INT_ROW); + IllegalStateException ex = assertThrows(IllegalStateException.class, () -> invokeTranslate(node)); + assertTrue( + ex.getMessage().contains("fallback is disabled"), + "IllegalStateException must mention fallback is disabled, got: " + ex.getMessage()); + // Super was never invoked because the strict path threw before the hook. + assertEquals(0, node.fallbackCount); + return null; + }); + } + + // ===================================================================== + // Observability (per-fallback WARN log) + // ===================================================================== + + /** Contract: two Calcs in a single submission falling back on the same unsupported RexNode + * class emit exactly one WARN log line. Verified via the test-only emit counter on + * {@link StreamExecCalc}, which is incremented only when the dedup set treats the class as + * new. */ + @Test + void testFallbackEmitsWarnLogOnce() throws Exception { + CapturingTranslator a = new CapturingTranslator( + tableConfig, + Arrays.asList(intRef(0)), + makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)), + inputProperty, + RowType.of(new IntType()), + "calc-a", + new FakeSourceTransformation()); + wireFakeUpstream(a, TWO_INT_ROW); + invokeTranslate(a); + + CapturingTranslator b = new CapturingTranslator( + tableConfig, + Arrays.asList(intRef(0)), + makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)), + inputProperty, + RowType.of(new IntType()), + "calc-b", + new FakeSourceTransformation()); + wireFakeUpstream(b, TWO_INT_ROW); + invokeTranslate(b); + + assertEquals(1, StreamExecCalc.peekWarnEmitCountForTest()); + } + + /** Contract: two Calcs falling back on different unsupported RexNode classes emit two + * distinct WARN log lines. */ + @Test + void testFallbackEmitsDistinctWarnLogsForDistinctRexClasses() throws Exception { + // First fallback: RexCall (EQUALS) is unsupported by RexCallConverter.isSupported. + CapturingTranslator a = new CapturingTranslator( + tableConfig, + Arrays.asList(intRef(0)), + makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)), + inputProperty, + RowType.of(new IntType()), + "calc-a", + new FakeSourceTransformation()); + wireFakeUpstream(a, TWO_INT_ROW); + invokeTranslate(a); + + // Second fallback: an UnregisteredRex subclass — converter map has no entry. + CapturingTranslator b = new CapturingTranslator( + tableConfig, + Arrays.asList(new UnregisteredRex(intType())), + null, + inputProperty, + RowType.of(new IntType()), + "calc-b", + new FakeSourceTransformation()); + wireFakeUpstream(b, TWO_INT_ROW); + invokeTranslate(b); + + assertEquals(2, StreamExecCalc.peekWarnEmitCountForTest()); + } + + // ===================================================================== + // Classpath shadowing (informational; only meaningful when an assembly JAR is in front) + // ===================================================================== + + /** Contract: when running with the assembled bundle, {@code Class.forName} on the FQCN + * resolves to Auron's class. Skipped by default in module-isolated tests where this module + * provides the only copy of the class. */ + @Test + void testShadowedClassResolvesViaClasspath() throws Exception { + Class resolved = Class.forName("org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc"); + assertSame(StreamExecCalc.class, resolved); + } + + // ===================================================================== + // Helpers + // ===================================================================== + + private static RelDataType intType() { + return TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER); + } + + private static RelDataType bigintType() { + return TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT); + } + + private static RexNode intRef(int idx) { + return REX_BUILDER.makeInputRef(intType(), idx); + } + + private static RexNode makeBinary( + RelDataType returnType, org.apache.calcite.sql.SqlOperator op, RexNode left, RexNode right) { + return REX_BUILDER.makeCall(returnType, op, Arrays.asList(left, right)); + } + + private StreamExecCalc newCalc(List projection, RexNode condition, RowType outputType) { + return new StreamExecCalc(tableConfig, projection, condition, inputProperty, outputType, "calc"); + } + + /** + * Wires a single fake upstream {@link ExecEdge} whose source's cached {@code transformation} + * field is a pre-built stub. {@link ExecNodeBase#translateToPlan} short-circuits on a non-null + * cached transformation and returns it without consulting the planner. + */ + private static void wireFakeUpstream(StreamExecCalc node, RowType inputRowType) throws Exception { + FakeSourceExecNode source = new FakeSourceExecNode<>(inputRowType, "src"); + source.setCachedTransformation(new FakeSourceTransformation()); + ExecEdge edge = ExecEdge.builder().source(source).target(node).build(); + node.setInputEdges(Collections.singletonList(edge)); + } + + private Transformation invokeTranslate(StreamExecCalc node) throws Exception { + Method m = ExecNodeBase.class.getDeclaredMethod( + "translateToPlanInternal", PlannerBase.class, ExecNodeConfig.class); + m.setAccessible(true); + try { + @SuppressWarnings("unchecked") + Transformation t = (Transformation) m.invoke(node, null, nodeConfig); + return t; + } catch (java.lang.reflect.InvocationTargetException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw e; + } + } + + /** Extracts the {@link org.apache.flink.streaming.api.operators.StreamOperator} wrapped in + * the {@link SimpleOperatorFactory} of a {@link OneInputTransformation}. */ + private static Object operatorOf(Transformation result) { + assertTrue( + result instanceof OneInputTransformation, "Expected OneInputTransformation, got: " + result.getClass()); + OneInputTransformation oit = (OneInputTransformation) result; + Object factory = oit.getOperatorFactory(); + assertTrue( + factory instanceof SimpleOperatorFactory, "Expected SimpleOperatorFactory, got: " + factory.getClass()); + return ((SimpleOperatorFactory) factory).getOperator(); + } + + /** + * Runs {@code action} with the strict-mode {@code flink-conf.yaml} test resource pointed at + * by {@code FLINK_CONF_DIR}. The file disables fallback so the operator throws on conversion + * failure. Restores the prior environment after the action runs. + * + *

{@link org.apache.auron.flink.configuration.FlinkAuronConfiguration} resolves its + * underlying Flink config via {@link org.apache.flink.configuration.GlobalConfiguration#loadConfiguration()}, + * which consults the {@code FLINK_CONF_DIR} environment variable; flipping that pointer is + * the supported way to swap configurations across test cases. + */ + private static T withStrictModeConf(java.util.concurrent.Callable action) throws Exception { + java.net.URL yaml = + StreamExecCalcTest.class.getClassLoader().getResource("strict-mode-flink-conf/flink-conf.yaml"); + assertNotNull(yaml, "strict-mode-flink-conf/flink-conf.yaml test resource must exist"); + String confDir = new java.io.File(yaml.getFile()).getParentFile().getAbsolutePath(); + String previous = System.getenv(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR); + java.util.Map env = new java.util.HashMap<>(System.getenv()); + env.put(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR, confDir); + mutateEnv(env); + try { + return action.call(); + } finally { + java.util.Map restore = new java.util.HashMap<>(System.getenv()); + if (previous == null) { + restore.remove(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR); + } else { + restore.put(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR, previous); + } + mutateEnv(restore); + } + } + + /** Reflectively replaces the JVM's environment map. Test-only — based on the same pattern + * used by {@code CommonTestUtils.setEnv} in {@code auron-flink-runtime}. */ + @SuppressWarnings("unchecked") + private static void mutateEnv(java.util.Map newEnv) { + try { + java.util.Map currentEnv = System.getenv(); + Field f = currentEnv.getClass().getDeclaredField("m"); + f.setAccessible(true); + java.util.Map backing = (java.util.Map) f.get(currentEnv); + backing.clear(); + backing.putAll(newEnv); + try { + Class peClass = Class.forName("java.lang.ProcessEnvironment"); + Field ci = peClass.getDeclaredField("theCaseInsensitiveEnvironment"); + ci.setAccessible(true); + java.util.Map cienv = (java.util.Map) ci.get(null); + cienv.clear(); + cienv.putAll(newEnv); + } catch (NoSuchFieldException ignored) { + // theCaseInsensitiveEnvironment is Windows-only. + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void invokeStatic(Class cls, String methodName) throws Exception { + Method m = cls.getDeclaredMethod(methodName); + m.setAccessible(true); + m.invoke(null); + } + + // ===================================================================== + // Test subclass: capture the fallback delegation + // ===================================================================== + + /** Subclass of {@link StreamExecCalc} that records super-codegen invocations and returns a + * stub Transformation instead of executing Flink's actual codegen path. */ + static class CapturingTranslator extends StreamExecCalc { + int fallbackCount; + final Transformation fallbackStub; + + CapturingTranslator( + org.apache.flink.configuration.ReadableConfig tableConfig, + List projection, + RexNode condition, + InputProperty inputProperty, + RowType outputType, + String description, + Transformation fallbackStub) { + super(tableConfig, projection, condition, inputProperty, outputType, description); + this.fallbackStub = fallbackStub; + } + + @Override + protected Transformation translateToFlinkCalc(PlannerBase planner, ExecNodeConfig config) { + fallbackCount++; + return fallbackStub; + } + } + + // ===================================================================== + // Fake upstream ExecNode and Transformation + // ===================================================================== + + /** + * Minimal subclass of {@link ExecNodeBase} used as the source of a fake {@link ExecEdge}. Its + * cached {@code transformation} field is set via reflection so {@code translateToPlan} + * short-circuits and never touches the planner. + */ + static class FakeSourceExecNode extends ExecNodeBase implements StreamExecNode { + FakeSourceExecNode(LogicalType outputType, String description) { + super( + ExecNodeContext.newNodeId(), + new ExecNodeContext("fake-src_1"), + new Configuration(), + Collections.emptyList(), + outputType, + description); + } + + void setCachedTransformation(Transformation t) throws Exception { + Field f = ExecNodeBase.class.getDeclaredField("transformation"); + f.setAccessible(true); + f.set(this, t); + } + + @Override + protected Transformation translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) { + // Never invoked because translateToPlan short-circuits on the cached transformation. + throw new UnsupportedOperationException(); + } + } + + /** + * Minimal stub {@link Transformation} subclass — we reuse {@link SourceTransformation}'s shape + * via a no-op subclass so {@code getParallelism()} returns a sane integer. + */ + static class FakeSourceTransformation extends Transformation { + FakeSourceTransformation() { + super("fake-src", InternalTypeInfo.of(TWO_INT_ROW), 1); + } + + @Override + public List> getTransitivePredecessors() { + return Collections.singletonList(this); + } + + @Override + public List> getInputs() { + return Collections.emptyList(); + } + } + + // ===================================================================== + // Unregistered RexNode subclass for the dedup test + // ===================================================================== + + /** {@link RexNode} subclass that is not registered in the factory so it routes through the + * "no converter registered" path of {@link FlinkNodeConverterFactory#convertRexNode}. */ + static class UnregisteredRex extends RexNode { + private final RelDataType type; + + UnregisteredRex(RelDataType type) { + this.type = type; + } + + @Override + public RelDataType getType() { + return type; + } + + @Override + public R accept(org.apache.calcite.rex.RexVisitor visitor) { + return null; + } + + @Override + public R accept(org.apache.calcite.rex.RexBiVisitor visitor, P arg) { + return null; + } + + @Override + public boolean equals(Object obj) { + return this == obj; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + } + + // ===================================================================== + // RawType serializer (test helper for the schema-conversion failure path) + // ===================================================================== + + /** No-op TypeSerializer required to construct a {@link RawType}. */ + static class ObjectSerializer extends org.apache.flink.api.common.typeutils.TypeSerializer { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public org.apache.flink.api.common.typeutils.TypeSerializer duplicate() { + return this; + } + + @Override + public Object createInstance() { + return new Object(); + } + + @Override + public Object copy(Object from) { + return from; + } + + @Override + public Object copy(Object from, Object reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(Object record, org.apache.flink.core.memory.DataOutputView target) { + throw new UnsupportedOperationException(); + } + + @Override + public Object deserialize(org.apache.flink.core.memory.DataInputView source) { + throw new UnsupportedOperationException(); + } + + @Override + public Object deserialize(Object reuse, org.apache.flink.core.memory.DataInputView source) { + return deserialize(source); + } + + @Override + public void copy( + org.apache.flink.core.memory.DataInputView source, org.apache.flink.core.memory.DataOutputView target) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof ObjectSerializer; + } + + @Override + public int hashCode() { + return ObjectSerializer.class.hashCode(); + } + + @Override + public org.apache.flink.api.common.typeutils.TypeSerializerSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/auron-flink-extension/auron-flink-planner/src/test/resources/META-INF/services/org.apache.auron.jni.AuronAdaptorProvider b/auron-flink-extension/auron-flink-planner/src/test/resources/META-INF/services/org.apache.auron.jni.AuronAdaptorProvider new file mode 100644 index 000000000..ae3315d5f --- /dev/null +++ b/auron-flink-extension/auron-flink-planner/src/test/resources/META-INF/services/org.apache.auron.jni.AuronAdaptorProvider @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.auron.jni.FlinkAuronAdaptorProvider diff --git a/auron-flink-extension/auron-flink-planner/src/test/resources/strict-mode-flink-conf/flink-conf.yaml b/auron-flink-extension/auron-flink-planner/src/test/resources/strict-mode-flink-conf/flink-conf.yaml new file mode 100644 index 000000000..c591a1145 --- /dev/null +++ b/auron-flink-extension/auron-flink-planner/src/test/resources/strict-mode-flink-conf/flink-conf.yaml @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +flink.auron.failback.flink.engine.enabled: false diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java index eb4bbfcb9..baee1338a 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java @@ -377,9 +377,13 @@ interface NativeRuntime extends AutoCloseable { void close(); } - /** Factory for {@link NativeRuntime} instances, swappable in tests. */ + /** + * Factory for {@link NativeRuntime} instances, swappable in tests. Extends {@link + * java.io.Serializable} because the field that holds an instance is part of the operator and + * Flink serializes operators to dispatch them to TaskManagers. + */ @VisibleForTesting - interface NativeRuntimeFactory { + interface NativeRuntimeFactory extends java.io.Serializable { /** * Creates a native runtime for the given plan. * From cce8ac29fc460764fd62f5095c7978473b7c17a6 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Fri, 22 May 2026 00:45:07 -0700 Subject: [PATCH 3/7] [AURON #1853] Add AuronCalcRewriteITCase for end-to-end Flink SQL coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four tests exercising distinct paths of the shadowed StreamExecCalc: - testMultiColumnArithmeticProjection — Auron path with multi-expression projection (`int + 1, int * 2`). - testFilterAndProjectEndToEnd — Calc-with-condition; GREATER_THAN is not yet converter-supported, so this verifies fallback-path correctness. The Auron-side Filter[FFIReader] plan-shape coverage lands once a predicate-returning converter does. - testFallbackOnUnsupportedExprStillExecutes — UPPER(string) triggers silent fallback; the job emits the correct UPPERed rows. - testMixedSupportedAndUnsupportedCalcs — UNION ALL of one convertible and one non-convertible Calc; verifies per-Calc granularity at the job-level correctness layer. No duplicate with AuronFlinkCalcITCase.testPlus (single-expression arithmetic). Two of the four tests pass without the native library; the other two share testPlus's native-lib prerequisite. Tests: compile clean, checkstyle 0 violations. Issue: https://github.com/apache/auron/issues/1853 --- .../table/runtime/AuronCalcRewriteITCase.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/runtime/AuronCalcRewriteITCase.java diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/runtime/AuronCalcRewriteITCase.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/runtime/AuronCalcRewriteITCase.java new file mode 100644 index 000000000..05ef8c663 --- /dev/null +++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/runtime/AuronCalcRewriteITCase.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.flink.table.runtime; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import org.apache.auron.flink.table.AuronFlinkTableTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; +import org.junit.jupiter.api.Test; + +/** + * End-to-end IT cases for the shadowed {@code StreamExecCalc}. Each test submits a real SQL job + * through {@link org.apache.flink.table.api.bridge.java.StreamTableEnvironment} over the {@code T1} + * table registered in {@link AuronFlinkTableTestBase} and asserts the final row set is correct + * regardless of whether the Calc executed natively or fell back to Flink's codegen. + */ +public class AuronCalcRewriteITCase extends AuronFlinkTableTestBase { + + /** Multi-column arithmetic projection exercises the projection loop with more than one + * convertible expression. */ + @Test + public void testMultiColumnArithmeticProjection() { + List rows = CollectionUtil.iteratorToList(tableEnvironment + .executeSql("select `int` + 1, `int` * 2 from T1") + .collect()); + rows.sort(Comparator.comparingInt(o -> (int) o.getField(0))); + assertThat(rows).isEqualTo(Arrays.asList(Row.of(2, 2), Row.of(3, 4), Row.of(3, 4))); + } + + /** A filter-plus-projection Calc whose condition uses a not-yet-supported comparison operator + * falls back to Flink's codegen path. Asserts the job still produces correct results; the + * Auron-side {@code Filter[FFIReader]} plan-shape coverage will be added when a + * predicate-returning converter lands. */ + @Test + public void testFilterAndProjectEndToEnd() { + List rows = CollectionUtil.iteratorToList(tableEnvironment + .executeSql("select `int` * 2 from T1 where `int` > 1") + .collect()); + rows.sort(Comparator.comparingInt(o -> (int) o.getField(0))); + assertThat(rows).isEqualTo(Arrays.asList(Row.of(4), Row.of(4))); + } + + /** Unsupported expression (a string function not in the converter set) triggers silent + * fallback. The job must still complete and emit the correct rows. */ + @Test + public void testFallbackOnUnsupportedExprStillExecutes() { + List rows = CollectionUtil.iteratorToList( + tableEnvironment.executeSql("select UPPER(`string`) from T1").collect()); + rows.sort(Comparator.comparing(o -> (String) o.getField(0))); + assertThat(rows).isEqualTo(Arrays.asList(Row.of("COMMENT#1"), Row.of("COMMENT#1"), Row.of("HI"))); + } + + /** A job containing two Calcs — one whose expressions are all converter-supported and one + * that uses an unsupported function — must run end-to-end and emit the correct union of rows. + * This asserts the job-level correctness contract; observability of which Calc fell back is + * surfaced through the per-fallback WARN log rather than the test's value assertion. */ + @Test + public void testMixedSupportedAndUnsupportedCalcs() { + List rows = CollectionUtil.iteratorToList(tableEnvironment + .executeSql("select `int` + 1 from T1 union all select CHAR_LENGTH(`string`) from T1") + .collect()); + rows.sort(Comparator.comparingInt(o -> (int) o.getField(0))); + assertThat(rows).isEqualTo(Arrays.asList(Row.of(2), Row.of(2), Row.of(3), Row.of(3), Row.of(9), Row.of(9))); + } +} From c84abc1aadabdbef15ec8019368abfc4ae7f66e0 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 24 May 2026 11:03:09 -0700 Subject: [PATCH 4/7] [AURON #1853] Decouple StreamExecCalcTest from javac's StreamExecCalc binding The shadowed StreamExecCalc shares its FQCN with Flink's stock class. Maven's scala-maven-plugin testCompile classpath ordering is not always deterministic across environments: on Linux CI runners, javac resolves StreamExecCalc to flink-table-planner_2.12-1.18.1.jar instead of the local target/classes, so symbols only present on the shadow (peekWarnEmitCountForTest, translateToFlinkCalc) are not visible at compile time. - Add invokeStaticInt helper and reach peekWarnEmitCountForTest via reflection, matching the existing pattern used for resetWarnDedupForTest. - Drop @Override on CapturingTranslator.translateToFlinkCalc so javac no longer requires the parent class to declare it. Runtime virtual dispatch is unaffected: the loaded StreamExecCalc is the shadow, signatures match, and translateToPlanInternal's invocation routes to the subclass override as before. Tested: StreamExecCalcTest 12/12 locally on JDK 8 + JDK 11; spotless and checkstyle clean; isolated javac run against only the stock Flink JAR (no local target/classes) compiles the test cleanly, reproducing the CI classpath condition. --- .../nodes/exec/stream/StreamExecCalcTest.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java index 5da71b88f..1da400a8b 100644 --- a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java +++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java @@ -268,7 +268,7 @@ void testFallsBackWhenSchemaConversionThrows() throws Exception { assertSame(stub, result, "Schema conversion failure must trigger fallback"); assertEquals(1, node.fallbackCount); - assertEquals(1, StreamExecCalc.peekWarnEmitCountForTest()); + assertEquals(1, invokeStaticInt(StreamExecCalc.class, "peekWarnEmitCountForTest")); } // ===================================================================== @@ -331,7 +331,7 @@ void testFallbackEmitsWarnLogOnce() throws Exception { wireFakeUpstream(b, TWO_INT_ROW); invokeTranslate(b); - assertEquals(1, StreamExecCalc.peekWarnEmitCountForTest()); + assertEquals(1, invokeStaticInt(StreamExecCalc.class, "peekWarnEmitCountForTest")); } /** Contract: two Calcs falling back on different unsupported RexNode classes emit two @@ -362,7 +362,7 @@ void testFallbackEmitsDistinctWarnLogsForDistinctRexClasses() throws Exception { wireFakeUpstream(b, TWO_INT_ROW); invokeTranslate(b); - assertEquals(2, StreamExecCalc.peekWarnEmitCountForTest()); + assertEquals(2, invokeStaticInt(StreamExecCalc.class, "peekWarnEmitCountForTest")); } // ===================================================================== @@ -507,6 +507,12 @@ private static void invokeStatic(Class cls, String methodName) throws Excepti m.invoke(null); } + private static int invokeStaticInt(Class cls, String methodName) throws Exception { + Method m = cls.getDeclaredMethod(methodName); + m.setAccessible(true); + return (int) m.invoke(null); + } + // ===================================================================== // Test subclass: capture the fallback delegation // ===================================================================== @@ -529,7 +535,9 @@ static class CapturingTranslator extends StreamExecCalc { this.fallbackStub = fallbackStub; } - @Override + // No @Override: javac on some classpaths resolves StreamExecCalc to Flink's stock + // class (which lacks translateToFlinkCalc). Runtime virtual dispatch still routes + // the shadow's call here because the loaded StreamExecCalc is our shadow. protected Transformation translateToFlinkCalc(PlannerBase planner, ExecNodeConfig config) { fallbackCount++; return fallbackStub; From 2f205bb02fcce530fff65588205b9fe7d0051b1b Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 24 May 2026 14:15:43 -0700 Subject: [PATCH 5/7] [AURON #1853] Complete native Calc runtime path for Flink Three end-to-end runtime defects surfaced once the shadowed StreamExecCalc began converting Calcs that other tests were already exercising: - FlinkAuronConfiguration was missing INPUT_BATCH_STATISTICS_ENABLE. The native execution context queries it on every Auron-native plan via JniBridge.getConfValue, which resolves config keys by reflection on the active AuronConfiguration class; absence of the field threw NPE before the first batch executed. - FlinkAuronCalcOperator.open built the FlinkMetricNode with an empty children list, but native finalization walks the metric tree in lockstep with the plan tree (Project[Filter?[FFIReader]]). The first MetricNode.getChild(0) call therefore threw IndexOutOfBoundsException. buildMetricTree now mirrors the plan shape so each plan node has a matching metric child. - The native runtime was constructed eagerly in open(), but JniBridge.callNative spawns a tokio task that immediately starts streaming output through the FFI Reader. On the first pull from an empty exporter the reader sees end-of-stream and calls exporter.close(), nulling the writer; the next processElement then NPE'd on writer.write(row). Construction is now deferred to the first non-empty drainNative call, and reinitExporterAndRuntime leaves nativeRuntime null between cycles so subsequent drains also start against a populated buffer. Tests updated: FlinkAuronCalcOperatorTest's multi-cycle drain contract asserts factoryCalls == drain cycles (no eager open()/reinit construction inflating the count). Issue: https://github.com/apache/auron/issues/1853 --- .../FlinkAuronConfiguration.java | 10 ++++ .../operator/FlinkAuronCalcOperator.java | 59 ++++++++++++++----- .../operator/FlinkAuronCalcOperatorTest.java | 20 +++---- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java index ec3e77983..bfbe151b1 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/configuration/FlinkAuronConfiguration.java @@ -53,6 +53,16 @@ public class FlinkAuronConfiguration extends AuronConfiguration { + "the Flink engine for execution?") .withDefaultValue(true); + /** + * Whether the native execution context records per-batch input statistics for monitoring. + * Queried by the native engine on every Auron-native operator path; the field must exist + * on this class so the JniBridge reflection lookup does not NPE. + */ + public static final ConfigOption INPUT_BATCH_STATISTICS_ENABLE = new ConfigOption<>(Boolean.class) + .withKey("auron.input.batch.statistics.enable") + .withDescription("Enable collection of additional metrics for input batch statistics.") + .withDefaultValue(false); + private final Configuration flinkConfig; public FlinkAuronConfiguration() { diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java index baee1338a..ea3f9d19f 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperator.java @@ -153,7 +153,7 @@ public void open() throws Exception { String opIdWithSubtask = rc.getOperatorUniqueID() + "-" + rc.getIndexOfThisSubtask(); this.childAllocator = FlinkArrowUtils.createChildAllocator("FlinkAuronCalc-" + opIdWithSubtask); - this.metricNode = new FlinkMetricNode(getMetricGroup(), Collections.emptyList()); + this.metricNode = buildMetricTree(plan, getMetricGroup()); this.exporter = new FlinkArrowFFIExporter(childAllocator, inputRowType, BATCH_ROW_LIMIT); // UUID disambiguates re-runs after operator restart so a stale registration cannot // collide with the new one. @@ -164,8 +164,10 @@ public void open() throws Exception { this.nativeMemory = AuronAdaptor.getInstance().getAuronConfiguration().getLong(FlinkAuronConfiguration.NATIVE_MEMORY_SIZE); - this.nativeRuntime = - nativeRuntimeFactory.create(FlinkArrowUtils.getRootAllocator(), runtimePlan, metricNode, nativeMemory); + // Native runtime construction starts a tokio task that immediately begins streaming + // output through the FFI Reader; on the first pull from an empty exporter the reader + // sees end-of-stream and calls exporter.close(), nulling the writer. Defer construction + // to the first non-empty drain so the runtime always starts against a populated buffer. this.outputRecord = new StreamRecord<>(null); } @@ -196,11 +198,11 @@ public void close() throws Exception { // we'd allocate a new wrapper + exporter root only to tear them down immediately. closing = true; try { - // Both exporter and nativeRuntime must be initialized before draining: if open() - // failed between exporter creation and nativeRuntime creation, drainNative() would - // NPE on nativeRuntime.loadNextBatch(...). Each resource is null-guarded - // individually in the nested finally so partial-init still releases what exists. - if (exporter != null && nativeRuntime != null) { + // drainNative is responsible for constructing the native runtime on demand when + // the exporter has rows, so close() only needs to guard against open() failing + // before the exporter was created. Each resource is null-guarded individually in + // the nested finally below so partial-init still releases what exists. + if (exporter != null) { drainNative(); } } finally { @@ -253,6 +255,10 @@ private void drainNative() { if (exporter.isEmpty()) { return; } + if (nativeRuntime == null) { + this.nativeRuntime = nativeRuntimeFactory.create( + FlinkArrowUtils.getRootAllocator(), runtimePlan, metricNode, nativeMemory); + } while (nativeRuntime.loadNextBatch(this::emitArrowBatch)) { // loop } @@ -262,16 +268,16 @@ private void drainNative() { } /** - * Re-prepares the exporter, re-registers it under {@link #resourceId}, and builds a fresh - * {@link NativeRuntime} via the factory. {@link #resourceId}, {@link #runtimePlan}, - * {@link #metricNode}, and {@link #nativeMemory} are all set once in {@link #open()} and - * reused across every reinit, so per-subtask operator identity is preserved. + * Re-prepares the exporter and re-registers it under {@link #resourceId} so the next drain + * cycle can buffer rows. The native runtime is intentionally left {@code null}: the next + * {@link #drainNative()} call constructs a fresh one once the exporter has rows, so the + * tokio batch producer never starts against an empty buffer (which would otherwise short the + * FFI Reader's read loop on its very first pull and close the exporter mid-cycle). */ private void reinitExporterAndRuntime() { exporter.reset(); JniBridge.putResource(resourceId, exporter); - this.nativeRuntime = - nativeRuntimeFactory.create(FlinkArrowUtils.getRootAllocator(), runtimePlan, metricNode, nativeMemory); + this.nativeRuntime = null; } /** @@ -330,6 +336,31 @@ static PhysicalPlanNode injectFfiReaderLeaf(PhysicalPlanNode node, String resour + node.getPhysicalPlanTypeCase()); } + /** + * Recursively constructs a {@link FlinkMetricNode} whose shape mirrors {@code node}'s plan + * tree. Native code walks the plan tree at finalization time and indexes into the parallel + * metric tree via {@link org.apache.auron.metric.MetricNode#getChild(int)}; an empty children + * list would throw {@link IndexOutOfBoundsException} on the first {@code getChild(0)} call. + * All levels share the same Flink {@link org.apache.flink.metrics.MetricGroup} so named + * counters aggregate at the operator scope. + */ + private static FlinkMetricNode buildMetricTree(PhysicalPlanNode node, org.apache.flink.metrics.MetricGroup mg) { + final List children; + if (node.hasFfiReader()) { + children = Collections.emptyList(); + } else if (node.hasProjection()) { + children = Collections.singletonList( + buildMetricTree(node.getProjection().getInput(), mg)); + } else if (node.hasFilter()) { + children = + Collections.singletonList(buildMetricTree(node.getFilter().getInput(), mg)); + } else { + throw new IllegalArgumentException( + "Unexpected plan node type for metric tree: " + node.getPhysicalPlanTypeCase()); + } + return new FlinkMetricNode(mg, children); + } + // ==================================================================== // SupportsAuronNative // ==================================================================== diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperatorTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperatorTest.java index 72791afe2..b9f5aab66 100644 --- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperatorTest.java +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/runtime/operator/FlinkAuronCalcOperatorTest.java @@ -257,13 +257,13 @@ public void testProcessElementDrainsWhenBatchFull() throws Exception { * Contract: the real {@link org.apache.auron.jni.AuronCallNativeWrapper#loadNextBatch} * auto-closes its native runtime when {@code JniBridge.nextBatch} returns false, and the * native FFI Reader's drop calls Java {@code close()} on the exporter. The operator - * therefore must reinitialize both {@link FlinkArrowFFIExporter} and the - * {@link FlinkAuronCalcOperator.NativeRuntime} after every drain cycle so subsequent - * {@code processElement} calls work correctly. Three consecutive batch-full triggers must - * produce no exception; the factory must be invoked once for {@code open()} plus once per - * drain cycle (= 4 total); the exporter must be re-registered under the same - * {@code resourceId} after each cycle; and the operator must still accept offers after the - * reinit cycles. + * therefore must re-prepare the exporter after every drain cycle so subsequent + * {@code processElement} calls work correctly. The native runtime is constructed lazily + * on the next drain (not eagerly between cycles) so the tokio batch producer never starts + * against an empty buffer. Three consecutive batch-full triggers must produce no + * exception; the factory must be invoked once per drain cycle (= 3 total); the exporter + * must be re-registered under the same {@code resourceId} after each cycle; and the + * operator must still accept offers after the reinit cycles. */ @Test public void testProcessElementMultiCycleDrainReinitializesWrapper() throws Exception { @@ -297,10 +297,10 @@ public void testProcessElementMultiCycleDrainReinitializesWrapper() throws Excep } assertEquals( - 4, + 3, factoryCalls.get(), - "Native runtime must be reinitialized after each batch-full drain cycle " - + "(1 for open() + 3 reinit)"); + "Native runtime must be constructed once per batch-full drain cycle " + + "(deferred from open() to first non-empty drain)"); // After three reinit cycles, the operator must still accept input. Offer one more // row and verify the tracker (re-registered each cycle, never closed by our fake) From d163005651098f6bbca0d34e85e082f9d547cf69 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 24 May 2026 14:59:33 -0700 Subject: [PATCH 6/7] [AURON #1853] Read Arrow timestamp unit from vector instead of assuming microseconds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ArrowTimestampColumnVector hard-coded a divide-by-1000 on every read, treating the underlying value as microseconds. When native produces a plan with Timestamp(Millisecond) output — e.g. a TUMBLE window's window_start column — the raw millisecond value was divided by 1000 again, shrinking 2026 timestamps down to 1970 (off-by-1000x). Now read the unit from the vector's ArrowType.Timestamp metadata at construction and convert per unit (SEC / MS / US / NS) in getTimestamp. Added FlinkArrowReaderTest coverage for the three units that were previously untested. Issue: https://github.com/apache/auron/issues/1853 --- .../vectors/ArrowTimestampColumnVector.java | 43 +++++++----- .../flink/arrow/FlinkArrowReaderTest.java | 69 +++++++++++++++++++ 2 files changed, 96 insertions(+), 16 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java index d10b2d307..a2961665c 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/vectors/ArrowTimestampColumnVector.java @@ -17,6 +17,8 @@ package org.apache.auron.flink.arrow.vectors; import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.columnar.vector.TimestampColumnVector; import org.apache.flink.util.Preconditions; @@ -25,26 +27,27 @@ * A Flink {@link TimestampColumnVector} backed by an Arrow {@link TimeStampVector}. * *

This wrapper delegates all reads to the underlying Arrow vector, providing zero-copy access - * to Arrow data from Flink's columnar batch execution engine. It handles both {@code - * TimeStampMicroVector} (TIMESTAMP) and {@code TimeStampMicroTZVector} (TIMESTAMP_LTZ) by - * accepting their common parent type {@link TimeStampVector}. The native engine (DataFusion) - * uses microsecond precision for all temporal types. Microsecond values are converted to Flink's + * to Arrow data from Flink's columnar batch execution engine. It handles all four Arrow + * timestamp time units (second, millisecond, microsecond, nanosecond) by reading the unit from + * the vector's {@link ArrowType.Timestamp} metadata and converting to Flink's * {@link TimestampData} representation (epoch millis + sub-millisecond nanos). */ public final class ArrowTimestampColumnVector implements TimestampColumnVector { private final TimeStampVector vector; + private final TimeUnit timeUnit; /** * Creates a new wrapper around the given Arrow {@link TimeStampVector}. * - *

Accepts both {@code TimeStampMicroVector} and {@code TimeStampMicroTZVector} since they - * share the same storage format and parent type. + *

Accepts any of {@code TimeStampSecVector}, {@code TimeStampMilliVector}, + * {@code TimeStampMicroVector}, {@code TimeStampNanoVector} (and their TZ variants). * * @param vector the Arrow vector to wrap, must not be null */ public ArrowTimestampColumnVector(TimeStampVector vector) { this.vector = Preconditions.checkNotNull(vector); + this.timeUnit = ((ArrowType.Timestamp) vector.getField().getType()).getUnit(); } /** {@inheritDoc} */ @@ -56,21 +59,29 @@ public boolean isNullAt(int i) { /** * Returns the timestamp at the given index as a {@link TimestampData}. * - *

The underlying Arrow vector stores microseconds since epoch. This method splits the value - * into epoch milliseconds and sub-millisecond nanoseconds to construct a {@link TimestampData}. + *

Reads the underlying Arrow value (whose unit is determined by the vector's + * {@link ArrowType.Timestamp} metadata, captured at construction) and converts to Flink's + * {@link TimestampData} representation (epoch millis + sub-millisecond nanos). * * @param i the row index - * @param precision the timestamp precision (unused; conversion is always from microseconds) + * @param precision the timestamp precision (unused; conversion is driven by the Arrow vector's unit) * @return the timestamp value */ @Override public TimestampData getTimestamp(int i, int precision) { - long micros = vector.get(i); - // Use floor-based division so that for negative micros (pre-epoch), the remainder is - // non-negative and nanoOfMillisecond stays within [0, 999_999], as required by - // TimestampData.fromEpochMillis. - long millis = Math.floorDiv(micros, 1000); - int nanoOfMillisecond = ((int) Math.floorMod(micros, 1000)) * 1000; - return TimestampData.fromEpochMillis(millis, nanoOfMillisecond); + long raw = vector.get(i); + switch (timeUnit) { + case SECOND: + return TimestampData.fromEpochMillis(raw * 1000L, 0); + case MILLISECOND: + return TimestampData.fromEpochMillis(raw, 0); + case MICROSECOND: + return TimestampData.fromEpochMillis(Math.floorDiv(raw, 1000), ((int) Math.floorMod(raw, 1000)) * 1000); + case NANOSECOND: + return TimestampData.fromEpochMillis( + Math.floorDiv(raw, 1_000_000), (int) Math.floorMod(raw, 1_000_000)); + default: + throw new IllegalStateException("Unsupported Arrow timestamp unit: " + timeUnit); + } } } diff --git a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java index 1816c6c87..6ca36944e 100644 --- a/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java +++ b/auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowReaderTest.java @@ -37,6 +37,9 @@ import org.apache.arrow.vector.TimeMicroVector; import org.apache.arrow.vector.TimeStampMicroTZVector; import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampSecVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; @@ -436,6 +439,72 @@ public void testTimestampLtzVector() { } } + @Test + public void testTimestampMilliVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTsMilli", 0, Long.MAX_VALUE)) { + TimeStampMilliVector vec = new TimeStampMilliVector("col", allocator); + vec.allocateNew(1); + vec.setSafe(0, 1_773_662_580_000L); // 2026-03-16T12:03:00.000 + vec.setValueCount(1); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimestampType(3)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + TimestampData ts = reader.read(0).getTimestamp(0, 3); + assertEquals(1_773_662_580_000L, ts.getMillisecond()); + assertEquals(0, ts.getNanoOfMillisecond()); + + reader.close(); + root.close(); + } + } + + @Test + public void testTimestampSecVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTsSec", 0, Long.MAX_VALUE)) { + TimeStampSecVector vec = new TimeStampSecVector("col", allocator); + vec.allocateNew(1); + vec.setSafe(0, 1_773_662_580L); // 2026-03-16T12:03:00 (seconds) + vec.setValueCount(1); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimestampType(0)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + TimestampData ts = reader.read(0).getTimestamp(0, 0); + assertEquals(1_773_662_580_000L, ts.getMillisecond()); + assertEquals(0, ts.getNanoOfMillisecond()); + + reader.close(); + root.close(); + } + } + + @Test + public void testTimestampNanoVector() { + try (BufferAllocator allocator = + FlinkArrowUtils.ROOT_ALLOCATOR.newChildAllocator("testTsNano", 0, Long.MAX_VALUE)) { + TimeStampNanoVector vec = new TimeStampNanoVector("col", allocator); + vec.allocateNew(1); + vec.setSafe(0, 1_672_531_200_000_123_456L); // 2023-01-01T00:00:00.000123456 + vec.setValueCount(1); + + VectorSchemaRoot root = new VectorSchemaRoot(Collections.singletonList(vec)); + RowType rowType = RowType.of(new TimestampType(9)); + FlinkArrowReader reader = FlinkArrowReader.create(root, rowType); + + TimestampData ts = reader.read(0).getTimestamp(0, 9); + assertEquals(1_672_531_200_000L, ts.getMillisecond()); + assertEquals(123_456, ts.getNanoOfMillisecond()); + + reader.close(); + root.close(); + } + } + @Test public void testNegativeTimestamp() { try (BufferAllocator allocator = From b52fe3868b208a2dd4bbd676c59b57df35891387 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 24 May 2026 23:37:37 -0700 Subject: [PATCH 7/7] [AURON #1853] Emit one-shot activation log when StreamExecCalc shadow is loaded The JAR-shadowing substitution is silent by design: if the classpath ordering puts flink-table-planner ahead of auron-flink-planner, Flink resolves its stock StreamExecCalc instead of this one and the rewriter quietly does nothing. Emit an INFO log on the first translateToPlanInternal call per JVM with the resolved class's code source so operators have a positive signal to grep for. Absence of the line under SQL load means the shadow is not active. Class Javadoc documents the failure mode and points at FAIL_BACK_FLINK_ENGINE_ENABLED=false as a complementary stricter signal. --- .../nodes/exec/stream/StreamExecCalc.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java index d0efc5672..9a65f05f9 100644 --- a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java +++ b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java @@ -16,12 +16,14 @@ */ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import java.security.CodeSource; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.apache.auron.flink.configuration.FlinkAuronConfiguration; import org.apache.auron.flink.runtime.operator.FlinkAuronCalcOperator; @@ -68,6 +70,14 @@ * {@link FlinkAuronConfiguration#FAIL_BACK_FLINK_ENGINE_ENABLED} is {@code true}, the default) or * throws {@link IllegalStateException}. * + *

Activation observability: the first time {@link #translateToPlanInternal} is invoked per JVM, + * an INFO log line {@code "Auron StreamExecCalc shadow active"} is emitted with the class's code + * source. Absence of this log under SQL load indicates that {@code auron-flink-planner} is not + * classpath-ordered ahead of {@code flink-table-planner} and Flink's stock {@code StreamExecCalc} + * is being resolved instead — Auron's Calc rewriter is then silently inactive. Setting + * {@link FlinkAuronConfiguration#FAIL_BACK_FLINK_ENGINE_ENABLED} to {@code false} provides a + * stricter complementary signal: per-Calc conversion failures throw rather than fall back. + * *

Per-fallback observability: the first time a given unsupported {@link RexNode} class is seen * (per planner thread), a WARN log line is emitted naming the failing node ID and the unsupported * class so users can grep for missing converter coverage. Subsequent fallbacks on the same @@ -98,6 +108,13 @@ public class StreamExecCalc extends CommonExecCalc implements StreamExecNode WARN_EMIT_COUNT = ThreadLocal.withInitial(() -> new int[1]); + /** + * One-shot guard for the activation INFO log so a busy planner doesn't repeat the line per + * Calc submission. Static-scoped (per JVM) because the activation signal is JVM-wide, unlike + * the per-planner-thread dedup used for fallback WARNs. + */ + private static final AtomicBoolean ACTIVATION_LOGGED = new AtomicBoolean(false); + /** * Constructs a stream Calc node. Matches Flink's stock {@code StreamExecCalc} primary * constructor signature so the planner's reflective instantiation finds this shadowed class @@ -169,6 +186,7 @@ public StreamExecCalc( @Override @SuppressWarnings("unchecked") protected Transformation translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) { + logActivationOnce(); final Transformation upstream = (Transformation) getInputEdges().get(0).translateToPlan(planner); final RowType inputRowType = (RowType) getInputEdges().get(0).getOutputType(); @@ -300,6 +318,21 @@ private Optional tryBuildAuronPlan(RowType inputRowType, RowTy } } + /** + * Emits the activation INFO log on the first call per JVM. Subsequent calls are no-ops. The + * code-source location is included to help diagnose classpath-ordering mistakes — if this log + * appears but points at an unexpected JAR, that JAR is what Flink resolved as + * {@code StreamExecCalc}. + */ + private static void logActivationOnce() { + if (ACTIVATION_LOGGED.compareAndSet(false, true)) { + final CodeSource cs = StreamExecCalc.class.getProtectionDomain().getCodeSource(); + LOG.info( + "Auron StreamExecCalc shadow active (loaded from {}).", + cs != null ? cs.getLocation() : ""); + } + } + /** * Logs a WARN line on the first occurrence of each unsupported {@link RexNode} class per * planner thread. Subsequent occurrences are silent.