diff --git a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java index a86101354..089b5ad0a 100644 --- a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java +++ b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java @@ -37,6 +37,10 @@ * (keyed by {@code AggregateCall} class) * * + *
The singleton returned by {@link #getInstance()} is initialized with the built-in + * {@link FlinkRexNodeConverter} implementations ({@link RexInputRefConverter}, + * {@link RexLiteralConverter}, {@link RexCallConverter}) registered. + * *
Usage: *
* FlinkNodeConverterFactory factory = FlinkNodeConverterFactory.getInstance();
@@ -52,6 +56,14 @@ public class FlinkNodeConverterFactory {
private static final FlinkNodeConverterFactory INSTANCE = new FlinkNodeConverterFactory();
+ static {
+ // Production callers reach RexNode converters through the singleton; register the
+ // built-ins eagerly so getInstance() is usable without setup.
+ INSTANCE.registerRexConverter(new RexInputRefConverter());
+ INSTANCE.registerRexConverter(new RexLiteralConverter());
+ INSTANCE.registerRexConverter(new RexCallConverter(INSTANCE));
+ }
+
private final Map, FlinkRexNodeConverter> rexConverterMap;
private final Map, FlinkAggCallConverter> aggConverterMap;
diff --git a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
new file mode 100644
index 000000000..9a65f05f9
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import java.security.CodeSource;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.apache.auron.flink.configuration.FlinkAuronConfiguration;
+import org.apache.auron.flink.runtime.operator.FlinkAuronCalcOperator;
+import org.apache.auron.flink.table.planner.converter.ConverterContext;
+import org.apache.auron.flink.table.planner.converter.FlinkNodeConverterFactory;
+import org.apache.auron.flink.utils.SchemaConverters;
+import org.apache.auron.jni.AuronAdaptor;
+import org.apache.auron.protobuf.FFIReaderExecNode;
+import org.apache.auron.protobuf.FilterExecNode;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.auron.protobuf.PhysicalPlanNode;
+import org.apache.auron.protobuf.ProjectionExecNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Shadows Flink's stock {@code StreamExecCalc} via fully-qualified-class-name resolution. Java's
+ * classloader resolves one class per FQCN; with {@code auron-flink-planner} ahead of {@code
+ * flink-table-planner} on the classpath, Flink's planner constructs this class whenever it builds
+ * a Calc {@code ExecNode}.
+ *
+ * {@link #translateToPlanInternal(PlannerBase, ExecNodeConfig)} attempts to translate the
+ * projection + condition into a native {@link PhysicalPlanNode} using the converter framework. On
+ * success it returns a {@link Transformation} wrapping a {@link FlinkAuronCalcOperator}; on
+ * failure it either delegates to {@code super.translateToPlanInternal} (when
+ * {@link FlinkAuronConfiguration#FAIL_BACK_FLINK_ENGINE_ENABLED} is {@code true}, the default) or
+ * throws {@link IllegalStateException}.
+ *
+ *
Activation observability: the first time {@link #translateToPlanInternal} is invoked per JVM,
+ * an INFO log line {@code "Auron StreamExecCalc shadow active"} is emitted with the class's code
+ * source. Absence of this log under SQL load indicates that {@code auron-flink-planner} is not
+ * classpath-ordered ahead of {@code flink-table-planner} and Flink's stock {@code StreamExecCalc}
+ * is being resolved instead — Auron's Calc rewriter is then silently inactive. Setting
+ * {@link FlinkAuronConfiguration#FAIL_BACK_FLINK_ENGINE_ENABLED} to {@code false} provides a
+ * stricter complementary signal: per-Calc conversion failures throw rather than fall back.
+ *
+ *
Per-fallback observability: the first time a given unsupported {@link RexNode} class is seen
+ * (per planner thread), a WARN log line is emitted naming the failing node ID and the unsupported
+ * class so users can grep for missing converter coverage. Subsequent fallbacks on the same
+ * {@link RexNode} class are silent to avoid log spam.
+ */
+@ExecNodeMetadata(
+ name = "stream-exec-calc",
+ version = 1,
+ minPlanVersion = FlinkVersion.v1_15,
+ minStateVersion = FlinkVersion.v1_15)
+public class StreamExecCalc extends CommonExecCalc implements StreamExecNode {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamExecCalc.class);
+
+ /**
+ * Dedup set for the per-fallback WARN. A given unsupported {@link RexNode} class is logged at
+ * most once per planner thread; reused across submissions because the next submission only
+ * adds noise if a brand-new class shows up. ThreadLocal keeps the bookkeeping off the static
+ * mutable state path (planner threads in session clusters are reused).
+ */
+ private static final ThreadLocal>> WARN_DEDUP = ThreadLocal.withInitial(HashSet::new);
+
+ /**
+ * Per-thread counter incremented every time a WARN log line is actually emitted (i.e. the
+ * fallback class was new in the dedup set). Tests inspect this through {@link
+ * #peekWarnEmitCountForTest()} to assert dedup behavior without depending on a particular
+ * SLF4J binding configuration.
+ */
+ private static final ThreadLocal WARN_EMIT_COUNT = ThreadLocal.withInitial(() -> new int[1]);
+
+ /**
+ * One-shot guard for the activation INFO log so a busy planner doesn't repeat the line per
+ * Calc submission. Static-scoped (per JVM) because the activation signal is JVM-wide, unlike
+ * the per-planner-thread dedup used for fallback WARNs.
+ */
+ private static final AtomicBoolean ACTIVATION_LOGGED = new AtomicBoolean(false);
+
+ /**
+ * Constructs a stream Calc node. Matches Flink's stock {@code StreamExecCalc} primary
+ * constructor signature so the planner's reflective instantiation finds this shadowed class
+ * without changes.
+ *
+ * @param tableConfig table-level configuration to persist into the node context
+ * @param projection projection expressions evaluated per row
+ * @param condition filter expression evaluated per row, or {@code null} for no filter
+ * @param inputProperty input edge property for the single upstream operator
+ * @param outputType output {@link RowType} of this node
+ * @param description human-readable description used for logging and metrics
+ */
+ public StreamExecCalc(
+ ReadableConfig tableConfig,
+ List projection,
+ @Nullable RexNode condition,
+ InputProperty inputProperty,
+ RowType outputType,
+ String description) {
+ super(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(StreamExecCalc.class),
+ ExecNodeContext.newPersistedConfig(StreamExecCalc.class, tableConfig),
+ projection,
+ condition,
+ TableStreamOperator.class,
+ true,
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
+ }
+
+ /**
+ * JSON deserialization constructor used for compiled plan restoration. Matches Flink's stock
+ * {@code StreamExecCalc} {@code @JsonCreator} constructor signature.
+ *
+ * @param id pre-assigned node ID
+ * @param context restored node context
+ * @param persistedConfig configuration persisted with the compiled plan
+ * @param projection projection expressions evaluated per row
+ * @param condition filter expression evaluated per row, or {@code null} for no filter
+ * @param inputProperties input edge properties for upstream operators
+ * @param outputType output {@link RowType} of this node
+ * @param description human-readable description used for logging and metrics
+ */
+ @JsonCreator
+ public StreamExecCalc(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_PROJECTION) List projection,
+ @Nullable @JsonProperty(FIELD_NAME_CONDITION) RexNode condition,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ persistedConfig,
+ projection,
+ condition,
+ TableStreamOperator.class,
+ true,
+ inputProperties,
+ outputType,
+ description);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Transformation translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
+ logActivationOnce();
+ final Transformation upstream =
+ (Transformation) getInputEdges().get(0).translateToPlan(planner);
+ final RowType inputRowType = (RowType) getInputEdges().get(0).getOutputType();
+ final RowType outputRowType = (RowType) getOutputType();
+
+ final Optional plan = tryBuildAuronPlan(inputRowType, outputRowType);
+
+ if (!plan.isPresent()) {
+ final boolean fallbackEnabled = AuronAdaptor.getInstance()
+ .getAuronConfiguration()
+ .get(FlinkAuronConfiguration.FAIL_BACK_FLINK_ENGINE_ENABLED);
+ if (fallbackEnabled) {
+ LOG.debug("Falling back to Flink's CodeGen Calc for node {}", getId());
+ return translateToFlinkCalc(planner, config);
+ }
+ throw new IllegalStateException(
+ "Auron Calc conversion failed for node " + getId() + " and fallback is disabled");
+ }
+
+ final FlinkAuronCalcOperator operator =
+ new FlinkAuronCalcOperator(plan.get(), inputRowType, outputRowType, "FlinkAuronCalc-" + getId());
+
+ return ExecNodeUtil.createOneInputTransformation(
+ upstream,
+ createTransformationMeta(CALC_TRANSFORMATION, config),
+ SimpleOperatorFactory.of(operator),
+ InternalTypeInfo.of(outputRowType),
+ upstream.getParallelism(),
+ 0L,
+ false);
+ }
+
+ /**
+ * Indirection over {@code super.translateToPlanInternal} so tests can stub the Flink
+ * codegen-Calc fallback path without running Flink's full code-generation machinery. Production
+ * delegates straight through.
+ *
+ * @param planner the planner forwarded from {@link #translateToPlanInternal}
+ * @param config the exec-node config forwarded from {@link #translateToPlanInternal}
+ * @return Flink's stock {@link Transformation} produced by {@code CommonExecCalc}
+ */
+ protected Transformation translateToFlinkCalc(PlannerBase planner, ExecNodeConfig config) {
+ return super.translateToPlanInternal(planner, config);
+ }
+
+ /**
+ * Attempts to compose a native {@code Project[Filter?[FFIReader-placeholder]]} plan from this
+ * node's projection and condition. Returns {@link Optional#empty()} if any {@link RexNode} is
+ * unsupported by the converter framework, or if plan composition throws — both signals are
+ * the same for the caller: fall back to Flink's codegen Calc.
+ *
+ * The outer {@link Throwable} catch is defence-in-depth: the converter framework already
+ * catches per-RexNode {@link Exception}, but {@link SchemaConverters} can throw {@link
+ * UnsupportedOperationException} on a {@link RowType} containing an unsupported logical type,
+ * and protobuf composition can theoretically throw on invalid inputs. Treating any failure as
+ * fallback keeps the rewriter safe.
+ *
+ * @param inputRowType the upstream row type used by the converter context
+ * @param outputRowType the row type of this Calc's output
+ * @return a composed plan, or empty if conversion failed
+ */
+ private Optional tryBuildAuronPlan(RowType inputRowType, RowType outputRowType) {
+ try {
+ final ConverterContext ctx = new ConverterContext(
+ getPersistedConfig(),
+ AuronAdaptor.getInstance().getAuronConfiguration(),
+ Thread.currentThread().getContextClassLoader(),
+ inputRowType);
+ final FlinkNodeConverterFactory converters = FlinkNodeConverterFactory.getInstance();
+
+ PhysicalExprNode filterExpr = null;
+ if (condition != null) {
+ final Optional c = converters.convertRexNode(condition, ctx);
+ if (!c.isPresent()) {
+ recordFallback(condition.getClass());
+ return Optional.empty();
+ }
+ filterExpr = c.get();
+ }
+
+ final List projectExprs = new ArrayList<>(projection.size());
+ for (RexNode rex : projection) {
+ final Optional c = converters.convertRexNode(rex, ctx);
+ if (!c.isPresent()) {
+ recordFallback(rex.getClass());
+ return Optional.empty();
+ }
+ projectExprs.add(c.get());
+ }
+
+ // numPartitions = 1 because each parallel FlinkAuronCalcOperator subtask owns a
+ // single Java-side exporter and one corresponding native partition; per-subtask
+ // parallelism is governed by Flink's outer Transformation parallelism, not by the
+ // FFI Reader's partition count.
+ final FFIReaderExecNode ffiReader = FFIReaderExecNode.newBuilder()
+ .setNumPartitions(1)
+ .setSchema(SchemaConverters.convertToAuronSchema(inputRowType, false))
+ .setExportIterProviderResourceId("placeholder")
+ .build();
+ PhysicalPlanNode current =
+ PhysicalPlanNode.newBuilder().setFfiReader(ffiReader).build();
+
+ if (filterExpr != null) {
+ final FilterExecNode filterNode = FilterExecNode.newBuilder()
+ .setInput(current)
+ .addExpr(filterExpr)
+ .build();
+ current = PhysicalPlanNode.newBuilder().setFilter(filterNode).build();
+ }
+
+ final ProjectionExecNode.Builder proj =
+ ProjectionExecNode.newBuilder().setInput(current);
+ for (int i = 0; i < projectExprs.size(); i++) {
+ proj.addExpr(projectExprs.get(i));
+ proj.addExprName(outputRowType.getFieldNames().get(i));
+ proj.addDataType(SchemaConverters.convertToAuronArrowType(outputRowType.getTypeAt(i)));
+ }
+ return Optional.of(
+ PhysicalPlanNode.newBuilder().setProjection(proj.build()).build());
+
+ } catch (Throwable t) {
+ WARN_EMIT_COUNT.get()[0]++;
+ LOG.warn(
+ "Auron StreamExecCalc fallback (node {}): plan composition threw {}; using Flink CodeGen Calc.",
+ getId(),
+ t.getClass().getName(),
+ t);
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Emits the activation INFO log on the first call per JVM. Subsequent calls are no-ops. The
+ * code-source location is included to help diagnose classpath-ordering mistakes — if this log
+ * appears but points at an unexpected JAR, that JAR is what Flink resolved as
+ * {@code StreamExecCalc}.
+ */
+ private static void logActivationOnce() {
+ if (ACTIVATION_LOGGED.compareAndSet(false, true)) {
+ final CodeSource cs = StreamExecCalc.class.getProtectionDomain().getCodeSource();
+ LOG.info(
+ "Auron StreamExecCalc shadow active (loaded from {}).",
+ cs != null ? cs.getLocation() : "");
+ }
+ }
+
+ /**
+ * Logs a WARN line on the first occurrence of each unsupported {@link RexNode} class per
+ * planner thread. Subsequent occurrences are silent.
+ */
+ private void recordFallback(Class extends RexNode> unsupportedRexClass) {
+ if (WARN_DEDUP.get().add(unsupportedRexClass)) {
+ WARN_EMIT_COUNT.get()[0]++;
+ LOG.warn(
+ "Auron StreamExecCalc fallback (node {}): unsupported RexNode {}; using Flink CodeGen Calc.",
+ getId(),
+ unsupportedRexClass.getName());
+ }
+ }
+
+ /** Test seam: clears the per-thread dedup set and emit counter so independent tests do not
+ * share state. */
+ static void resetWarnDedupForTest() {
+ WARN_DEDUP.get().clear();
+ WARN_EMIT_COUNT.get()[0] = 0;
+ }
+
+ /** Test seam: returns the number of WARN log lines actually emitted on the current thread
+ * since the last {@link #resetWarnDedupForTest()}. */
+ static int peekWarnEmitCountForTest() {
+ return WARN_EMIT_COUNT.get()[0];
+ }
+}
diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
index ac6451747..8fb904f17 100644
--- a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
+++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
@@ -27,6 +27,8 @@
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -143,6 +145,22 @@ void testGetConverterAbsent() {
assertFalse(found.isPresent());
}
+ @Test
+ void testSingletonHasBuiltInRexConvertersRegistered() {
+ FlinkNodeConverterFactory singleton = FlinkNodeConverterFactory.getInstance();
+
+ Optional> inputRefConverter = singleton.getConverter(RexInputRef.class);
+ Optional> literalConverter = singleton.getConverter(RexLiteral.class);
+ Optional> callConverter = singleton.getConverter(RexCall.class);
+
+ assertTrue(inputRefConverter.isPresent());
+ assertTrue(literalConverter.isPresent());
+ assertTrue(callConverter.isPresent());
+ assertTrue(inputRefConverter.get() instanceof RexInputRefConverter);
+ assertTrue(literalConverter.get() instanceof RexLiteralConverter);
+ assertTrue(callConverter.get() instanceof RexCallConverter);
+ }
+
// ---- Test stubs ----
/** Stub FlinkRexNodeConverter for testing. */
diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/runtime/AuronCalcRewriteITCase.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/runtime/AuronCalcRewriteITCase.java
new file mode 100644
index 000000000..05ef8c663
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/runtime/AuronCalcRewriteITCase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.runtime;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.auron.flink.table.AuronFlinkTableTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.Test;
+
+/**
+ * End-to-end IT cases for the shadowed {@code StreamExecCalc}. Each test submits a real SQL job
+ * through {@link org.apache.flink.table.api.bridge.java.StreamTableEnvironment} over the {@code T1}
+ * table registered in {@link AuronFlinkTableTestBase} and asserts the final row set is correct
+ * regardless of whether the Calc executed natively or fell back to Flink's codegen.
+ */
+public class AuronCalcRewriteITCase extends AuronFlinkTableTestBase {
+
+ /** Multi-column arithmetic projection exercises the projection loop with more than one
+ * convertible expression. */
+ @Test
+ public void testMultiColumnArithmeticProjection() {
+ List rows = CollectionUtil.iteratorToList(tableEnvironment
+ .executeSql("select `int` + 1, `int` * 2 from T1")
+ .collect());
+ rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
+ assertThat(rows).isEqualTo(Arrays.asList(Row.of(2, 2), Row.of(3, 4), Row.of(3, 4)));
+ }
+
+ /** A filter-plus-projection Calc whose condition uses a not-yet-supported comparison operator
+ * falls back to Flink's codegen path. Asserts the job still produces correct results; the
+ * Auron-side {@code Filter[FFIReader]} plan-shape coverage will be added when a
+ * predicate-returning converter lands. */
+ @Test
+ public void testFilterAndProjectEndToEnd() {
+ List rows = CollectionUtil.iteratorToList(tableEnvironment
+ .executeSql("select `int` * 2 from T1 where `int` > 1")
+ .collect());
+ rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
+ assertThat(rows).isEqualTo(Arrays.asList(Row.of(4), Row.of(4)));
+ }
+
+ /** Unsupported expression (a string function not in the converter set) triggers silent
+ * fallback. The job must still complete and emit the correct rows. */
+ @Test
+ public void testFallbackOnUnsupportedExprStillExecutes() {
+ List rows = CollectionUtil.iteratorToList(
+ tableEnvironment.executeSql("select UPPER(`string`) from T1").collect());
+ rows.sort(Comparator.comparing(o -> (String) o.getField(0)));
+ assertThat(rows).isEqualTo(Arrays.asList(Row.of("COMMENT#1"), Row.of("COMMENT#1"), Row.of("HI")));
+ }
+
+ /** A job containing two Calcs — one whose expressions are all converter-supported and one
+ * that uses an unsupported function — must run end-to-end and emit the correct union of rows.
+ * This asserts the job-level correctness contract; observability of which Calc fell back is
+ * surfaced through the per-fallback WARN log rather than the test's value assertion. */
+ @Test
+ public void testMixedSupportedAndUnsupportedCalcs() {
+ List rows = CollectionUtil.iteratorToList(tableEnvironment
+ .executeSql("select `int` + 1 from T1 union all select CHAR_LENGTH(`string`) from T1")
+ .collect());
+ rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
+ assertThat(rows).isEqualTo(Arrays.asList(Row.of(2), Row.of(2), Row.of(3), Row.of(3), Row.of(9), Row.of(9)));
+ }
+}
diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java
new file mode 100644
index 000000000..1da400a8b
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalcTest.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.auron.flink.runtime.operator.FlinkAuronCalcOperator;
+import org.apache.auron.protobuf.ArrowType;
+import org.apache.auron.protobuf.FFIReaderExecNode;
+import org.apache.auron.protobuf.FilterExecNode;
+import org.apache.auron.protobuf.PhysicalPlanNode;
+import org.apache.auron.protobuf.ProjectionExecNode;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for the shadowed {@link StreamExecCalc}.
+ *
+ * Lives in the same package as the production class so {@code @JsonCreator} field-name
+ * constants and the {@code protected translateToFlinkCalc} hook are reachable from tests.
+ *
+ *
Test strategy:
+ *
+ *
+ * - Plan-build success paths exercise the real {@code translateToPlanInternal} end-to-end. A
+ * fake upstream {@link ExecNode} is wired in via {@link ExecNodeBase#setInputEdges} with a
+ * pre-built {@link Transformation} cached in the source's {@code transformation} field so
+ * {@code ExecNodeBase.translateToPlan} returns it without consulting a real planner.
+ *
- Fallback paths use {@link CapturingTranslator} to subclass {@code StreamExecCalc} and
+ * override {@code translateToFlinkCalc}, recording whether the super-codegen path was
+ * invoked and short-circuiting the heavy Flink machinery.
+ *
- WARN log assertions capture {@code System.err} (the default SLF4J SimpleLogger target);
+ * the matching test class also resets the dedup ThreadLocal between tests so logging
+ * behavior is deterministic.
+ *
+ */
+class StreamExecCalcTest {
+
+ private static final RelDataTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl();
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+ private static final RowType TWO_INT_ROW =
+ RowType.of(new LogicalType[] {new IntType(), new IntType()}, new String[] {"f0", "f1"});
+
+ private TableConfig tableConfig;
+ private InputProperty inputProperty;
+ private ExecNodeConfig nodeConfig;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ // Reset id counter so getId()-derived strings are reproducible across tests.
+ ExecNodeContext.resetIdCounter();
+ // Reset the per-fallback WARN dedup so each test starts from a clean slate.
+ invokeStatic(StreamExecCalc.class, "resetWarnDedupForTest");
+
+ tableConfig = TableConfig.getDefault();
+ inputProperty = InputProperty.DEFAULT;
+ nodeConfig = ExecNodeConfig.ofNodeConfig(new Configuration(), false);
+ }
+
+ // =====================================================================
+ // Plan-build success paths
+ // =====================================================================
+
+ /** Contract: with a non-null condition + projection of supported RexNodes, the operator wired
+ * into the returned Transformation is {@link FlinkAuronCalcOperator}. */
+ @Test
+ void testProjectAndFilterEmitsAuronOperator() throws Exception {
+ StreamExecCalc node = newCalc(
+ Arrays.asList(intRef(0), intRef(1)),
+ makeBinary(intType(), SqlStdOperatorTable.PLUS, intRef(0), intRef(1)),
+ TWO_INT_ROW);
+ wireFakeUpstream(node, TWO_INT_ROW);
+
+ Transformation result = invokeTranslate(node);
+
+ assertTrue(operatorOf(result) instanceof FlinkAuronCalcOperator);
+ }
+
+ /** Contract: with a null condition and supported projection, the operator is still Auron. */
+ @Test
+ void testProjectOnlyEmitsAuronOperator() throws Exception {
+ StreamExecCalc node = newCalc(Arrays.asList(intRef(0), intRef(1)), null, TWO_INT_ROW);
+ wireFakeUpstream(node, TWO_INT_ROW);
+
+ Transformation result = invokeTranslate(node);
+
+ FlinkAuronCalcOperator op = (FlinkAuronCalcOperator) operatorOf(result);
+ // No filter → plan is Project[FFIReader], not Project[Filter[FFIReader]].
+ PhysicalPlanNode plan = op.getPhysicalPlanNodes().get(0);
+ assertTrue(plan.hasProjection(), "Root must be a Projection");
+ assertFalse(plan.getProjection().getInput().hasFilter(), "No filter wrapper expected when condition is null");
+ }
+
+ /** Contract: an identity projection (pure {@code RexInputRef}s) still converts. */
+ @Test
+ void testIdentityProjectionEmitsAuronOperator() throws Exception {
+ StreamExecCalc node = newCalc(Arrays.asList(intRef(0), intRef(1)), null, TWO_INT_ROW);
+ wireFakeUpstream(node, TWO_INT_ROW);
+
+ Transformation result = invokeTranslate(node);
+
+ assertTrue(operatorOf(result) instanceof FlinkAuronCalcOperator);
+ }
+
+ /** Contract: the {@link ProjectionExecNode} carries field names and arrow types matching
+ * {@code outputRowType}, so a downstream reader can rebuild the output schema without
+ * consulting the JVM types again. */
+ @Test
+ void testSchemaPropagatedToProjectionExecNode() throws Exception {
+ RowType outputType = RowType.of(new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"a", "b"});
+ StreamExecCalc node =
+ newCalc(Arrays.asList(intRef(0), REX_BUILDER.makeInputRef(bigintType(), 1)), null, outputType);
+ wireFakeUpstream(
+ node, RowType.of(new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"f0", "f1"}));
+
+ Transformation result = invokeTranslate(node);
+
+ FlinkAuronCalcOperator op = (FlinkAuronCalcOperator) operatorOf(result);
+ PhysicalPlanNode plan = op.getPhysicalPlanNodes().get(0);
+ ProjectionExecNode proj = plan.getProjection();
+ assertEquals(Arrays.asList("a", "b"), proj.getExprNameList());
+ assertEquals(2, proj.getDataTypeCount());
+ assertEquals(ArrowType.ArrowTypeEnumCase.INT32, proj.getDataType(0).getArrowTypeEnumCase());
+ assertEquals(ArrowType.ArrowTypeEnumCase.INT64, proj.getDataType(1).getArrowTypeEnumCase());
+ // FFI Reader leaf carries the input schema for the runtime.
+ FFIReaderExecNode ffi = plan.getProjection().getInput().getFfiReader();
+ assertEquals(1, ffi.getNumPartitions());
+ assertEquals("placeholder", ffi.getExportIterProviderResourceId());
+ assertEquals(2, ffi.getSchema().getColumnsCount());
+ }
+
+ /** Contract: when both projection and condition are supported, the constructed plan is
+ * {@code Project[Filter[FFIReader]]} (Filter wrapper present). */
+ @Test
+ void testPlanShapeIsProjectFilterFFIReaderWhenConditionIsPresent() throws Exception {
+ StreamExecCalc node = newCalc(
+ Arrays.asList(intRef(0)),
+ makeBinary(intType(), SqlStdOperatorTable.PLUS, intRef(0), intRef(1)),
+ RowType.of(new IntType()));
+ wireFakeUpstream(node, TWO_INT_ROW);
+
+ Transformation result = invokeTranslate(node);
+
+ PhysicalPlanNode plan = ((FlinkAuronCalcOperator) operatorOf(result))
+ .getPhysicalPlanNodes()
+ .get(0);
+ assertTrue(plan.hasProjection());
+ PhysicalPlanNode innerProj = plan.getProjection().getInput();
+ assertTrue(innerProj.hasFilter(), "Project's child must be a Filter when condition is non-null");
+ FilterExecNode filter = innerProj.getFilter();
+ assertEquals(1, filter.getExprCount());
+ assertTrue(filter.getInput().hasFfiReader(), "Filter's child must be FFIReader");
+ }
+
+ // =====================================================================
+ // Fallback paths (default config FAIL_BACK_FLINK_ENGINE_ENABLED=true)
+ // =====================================================================
+
+ /** Contract: an unsupported RexNode in the condition triggers fallback — Auron's
+ * {@code translateToFlinkCalc} hook is invoked and its stub Transformation is returned. */
+ @Test
+ void testFallsBackWhenUnsupportedRexNodeInCondition() throws Exception {
+ Transformation stub = new FakeSourceTransformation();
+ CapturingTranslator node = new CapturingTranslator(
+ tableConfig,
+ Arrays.asList(intRef(0)),
+ makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)),
+ inputProperty,
+ RowType.of(new IntType()),
+ "calc",
+ stub);
+ wireFakeUpstream(node, TWO_INT_ROW);
+
+ Transformation result = invokeTranslate(node);
+
+ assertSame(stub, result);
+ assertEquals(1, node.fallbackCount);
+ }
+
+ /** Contract: an unsupported RexNode in the projection triggers fallback. */
+ @Test
+ void testFallsBackWhenUnsupportedRexNodeInProjection() throws Exception {
+ Transformation stub = new FakeSourceTransformation();
+ // EQUALS produces a RexCall whose isSupported() returns false in RexCallConverter.
+ CapturingTranslator node = new CapturingTranslator(
+ tableConfig,
+ Arrays.asList(makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1))),
+ null,
+ inputProperty,
+ RowType.of(new IntType()),
+ "calc",
+ stub);
+ wireFakeUpstream(node, TWO_INT_ROW);
+
+ Transformation result = invokeTranslate(node);
+
+ assertSame(stub, result);
+ assertEquals(1, node.fallbackCount);
+ }
+
+ /** Contract: a {@code RowType} containing a RAW logical type causes
+ * {@code SchemaConverters.convertToAuronArrowType} to throw; the outer {@code Throwable}
+ * catch maps this to {@code Optional.empty()} and the operator falls back. */
+ @Test
+ void testFallsBackWhenSchemaConversionThrows() throws Exception {
+ Transformation stub = new FakeSourceTransformation();
+ // Output schema includes a RAW type — SchemaConverters does not handle this kind.
+ RowType outputWithRaw = RowType.of(
+ new LogicalType[] {new IntType(), new RawType<>(Object.class, new ObjectSerializer())},
+ new String[] {"a", "rawcol"});
+ // Use RexInputRefs that exist in the input schema (TWO_INT_ROW has 2 ints).
+ // Output type mismatch with projection type is fine for plan-build — schema conversion
+ // is the failure point.
+ CapturingTranslator node = new CapturingTranslator(
+ tableConfig, Arrays.asList(intRef(0), intRef(1)), null, inputProperty, outputWithRaw, "calc", stub);
+ wireFakeUpstream(node, TWO_INT_ROW);
+
+ Transformation result = invokeTranslate(node);
+
+ assertSame(stub, result, "Schema conversion failure must trigger fallback");
+ assertEquals(1, node.fallbackCount);
+ assertEquals(1, invokeStaticInt(StreamExecCalc.class, "peekWarnEmitCountForTest"));
+ }
+
+ // =====================================================================
+ // Strict mode (FAIL_BACK_FLINK_ENGINE_ENABLED=false)
+ // =====================================================================
+
+ /** Contract: with fallback disabled and an unsupported RexNode, the operator throws
+ * {@link IllegalStateException} rather than degrading silently. */
+ @Test
+ void testThrowsWhenFallbackDisabled() throws Exception {
+ withStrictModeConf(() -> {
+ CapturingTranslator node = new CapturingTranslator(
+ tableConfig,
+ Arrays.asList(intRef(0)),
+ makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)),
+ inputProperty,
+ RowType.of(new IntType()),
+ "calc",
+ new FakeSourceTransformation());
+ wireFakeUpstream(node, TWO_INT_ROW);
+ IllegalStateException ex = assertThrows(IllegalStateException.class, () -> invokeTranslate(node));
+ assertTrue(
+ ex.getMessage().contains("fallback is disabled"),
+ "IllegalStateException must mention fallback is disabled, got: " + ex.getMessage());
+ // Super was never invoked because the strict path threw before the hook.
+ assertEquals(0, node.fallbackCount);
+ return null;
+ });
+ }
+
+ // =====================================================================
+ // Observability (per-fallback WARN log)
+ // =====================================================================
+
+ /** Contract: two Calcs in a single submission falling back on the same unsupported RexNode
+ * class emit exactly one WARN log line. Verified via the test-only emit counter on
+ * {@link StreamExecCalc}, which is incremented only when the dedup set treats the class as
+ * new. */
+ @Test
+ void testFallbackEmitsWarnLogOnce() throws Exception {
+ CapturingTranslator a = new CapturingTranslator(
+ tableConfig,
+ Arrays.asList(intRef(0)),
+ makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)),
+ inputProperty,
+ RowType.of(new IntType()),
+ "calc-a",
+ new FakeSourceTransformation());
+ wireFakeUpstream(a, TWO_INT_ROW);
+ invokeTranslate(a);
+
+ CapturingTranslator b = new CapturingTranslator(
+ tableConfig,
+ Arrays.asList(intRef(0)),
+ makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)),
+ inputProperty,
+ RowType.of(new IntType()),
+ "calc-b",
+ new FakeSourceTransformation());
+ wireFakeUpstream(b, TWO_INT_ROW);
+ invokeTranslate(b);
+
+ assertEquals(1, invokeStaticInt(StreamExecCalc.class, "peekWarnEmitCountForTest"));
+ }
+
+ /** Contract: two Calcs falling back on different unsupported RexNode classes emit two
+ * distinct WARN log lines. */
+ @Test
+ void testFallbackEmitsDistinctWarnLogsForDistinctRexClasses() throws Exception {
+ // First fallback: RexCall (EQUALS) is unsupported by RexCallConverter.isSupported.
+ CapturingTranslator a = new CapturingTranslator(
+ tableConfig,
+ Arrays.asList(intRef(0)),
+ makeBinary(intType(), SqlStdOperatorTable.EQUALS, intRef(0), intRef(1)),
+ inputProperty,
+ RowType.of(new IntType()),
+ "calc-a",
+ new FakeSourceTransformation());
+ wireFakeUpstream(a, TWO_INT_ROW);
+ invokeTranslate(a);
+
+ // Second fallback: an UnregisteredRex subclass — converter map has no entry.
+ CapturingTranslator b = new CapturingTranslator(
+ tableConfig,
+ Arrays.asList(new UnregisteredRex(intType())),
+ null,
+ inputProperty,
+ RowType.of(new IntType()),
+ "calc-b",
+ new FakeSourceTransformation());
+ wireFakeUpstream(b, TWO_INT_ROW);
+ invokeTranslate(b);
+
+ assertEquals(2, invokeStaticInt(StreamExecCalc.class, "peekWarnEmitCountForTest"));
+ }
+
+ // =====================================================================
+ // Classpath shadowing (informational; only meaningful when an assembly JAR is in front)
+ // =====================================================================
+
+ /** Contract: when running with the assembled bundle, {@code Class.forName} on the FQCN
+ * resolves to Auron's class. Skipped by default in module-isolated tests where this module
+ * provides the only copy of the class. */
+ @Test
+ void testShadowedClassResolvesViaClasspath() throws Exception {
+ Class> resolved = Class.forName("org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc");
+ assertSame(StreamExecCalc.class, resolved);
+ }
+
+ // =====================================================================
+ // Helpers
+ // =====================================================================
+
+ private static RelDataType intType() {
+ return TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ }
+
+ private static RelDataType bigintType() {
+ return TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT);
+ }
+
+ private static RexNode intRef(int idx) {
+ return REX_BUILDER.makeInputRef(intType(), idx);
+ }
+
+ private static RexNode makeBinary(
+ RelDataType returnType, org.apache.calcite.sql.SqlOperator op, RexNode left, RexNode right) {
+ return REX_BUILDER.makeCall(returnType, op, Arrays.asList(left, right));
+ }
+
+ private StreamExecCalc newCalc(List projection, RexNode condition, RowType outputType) {
+ return new StreamExecCalc(tableConfig, projection, condition, inputProperty, outputType, "calc");
+ }
+
+ /**
+ * Wires a single fake upstream {@link ExecEdge} whose source's cached {@code transformation}
+ * field is a pre-built stub. {@link ExecNodeBase#translateToPlan} short-circuits on a non-null
+ * cached transformation and returns it without consulting the planner.
+ */
+ private static void wireFakeUpstream(StreamExecCalc node, RowType inputRowType) throws Exception {
+ FakeSourceExecNode source = new FakeSourceExecNode<>(inputRowType, "src");
+ source.setCachedTransformation(new FakeSourceTransformation());
+ ExecEdge edge = ExecEdge.builder().source(source).target(node).build();
+ node.setInputEdges(Collections.singletonList(edge));
+ }
+
+ private Transformation invokeTranslate(StreamExecCalc node) throws Exception {
+ Method m = ExecNodeBase.class.getDeclaredMethod(
+ "translateToPlanInternal", PlannerBase.class, ExecNodeConfig.class);
+ m.setAccessible(true);
+ try {
+ @SuppressWarnings("unchecked")
+ Transformation t = (Transformation) m.invoke(node, null, nodeConfig);
+ return t;
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ }
+ throw e;
+ }
+ }
+
+ /** Extracts the {@link org.apache.flink.streaming.api.operators.StreamOperator} wrapped in
+ * the {@link SimpleOperatorFactory} of a {@link OneInputTransformation}. */
+ private static Object operatorOf(Transformation result) {
+ assertTrue(
+ result instanceof OneInputTransformation, "Expected OneInputTransformation, got: " + result.getClass());
+ OneInputTransformation, ?> oit = (OneInputTransformation, ?>) result;
+ Object factory = oit.getOperatorFactory();
+ assertTrue(
+ factory instanceof SimpleOperatorFactory, "Expected SimpleOperatorFactory, got: " + factory.getClass());
+ return ((SimpleOperatorFactory>) factory).getOperator();
+ }
+
+ /**
+ * Runs {@code action} with the strict-mode {@code flink-conf.yaml} test resource pointed at
+ * by {@code FLINK_CONF_DIR}. The file disables fallback so the operator throws on conversion
+ * failure. Restores the prior environment after the action runs.
+ *
+ * {@link org.apache.auron.flink.configuration.FlinkAuronConfiguration} resolves its
+ * underlying Flink config via {@link org.apache.flink.configuration.GlobalConfiguration#loadConfiguration()},
+ * which consults the {@code FLINK_CONF_DIR} environment variable; flipping that pointer is
+ * the supported way to swap configurations across test cases.
+ */
+ private static T withStrictModeConf(java.util.concurrent.Callable action) throws Exception {
+ java.net.URL yaml =
+ StreamExecCalcTest.class.getClassLoader().getResource("strict-mode-flink-conf/flink-conf.yaml");
+ assertNotNull(yaml, "strict-mode-flink-conf/flink-conf.yaml test resource must exist");
+ String confDir = new java.io.File(yaml.getFile()).getParentFile().getAbsolutePath();
+ String previous = System.getenv(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR);
+ java.util.Map env = new java.util.HashMap<>(System.getenv());
+ env.put(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR, confDir);
+ mutateEnv(env);
+ try {
+ return action.call();
+ } finally {
+ java.util.Map restore = new java.util.HashMap<>(System.getenv());
+ if (previous == null) {
+ restore.remove(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR);
+ } else {
+ restore.put(org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR, previous);
+ }
+ mutateEnv(restore);
+ }
+ }
+
+ /** Reflectively replaces the JVM's environment map. Test-only — based on the same pattern
+ * used by {@code CommonTestUtils.setEnv} in {@code auron-flink-runtime}. */
+ @SuppressWarnings("unchecked")
+ private static void mutateEnv(java.util.Map newEnv) {
+ try {
+ java.util.Map currentEnv = System.getenv();
+ Field f = currentEnv.getClass().getDeclaredField("m");
+ f.setAccessible(true);
+ java.util.Map backing = (java.util.Map) f.get(currentEnv);
+ backing.clear();
+ backing.putAll(newEnv);
+ try {
+ Class> peClass = Class.forName("java.lang.ProcessEnvironment");
+ Field ci = peClass.getDeclaredField("theCaseInsensitiveEnvironment");
+ ci.setAccessible(true);
+ java.util.Map cienv = (java.util.Map) ci.get(null);
+ cienv.clear();
+ cienv.putAll(newEnv);
+ } catch (NoSuchFieldException ignored) {
+ // theCaseInsensitiveEnvironment is Windows-only.
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void invokeStatic(Class> cls, String methodName) throws Exception {
+ Method m = cls.getDeclaredMethod(methodName);
+ m.setAccessible(true);
+ m.invoke(null);
+ }
+
+ private static int invokeStaticInt(Class> cls, String methodName) throws Exception {
+ Method m = cls.getDeclaredMethod(methodName);
+ m.setAccessible(true);
+ return (int) m.invoke(null);
+ }
+
+ // =====================================================================
+ // Test subclass: capture the fallback delegation
+ // =====================================================================
+
+ /** Subclass of {@link StreamExecCalc} that records super-codegen invocations and returns a
+ * stub Transformation instead of executing Flink's actual codegen path. */
+ static class CapturingTranslator extends StreamExecCalc {
+ int fallbackCount;
+ final Transformation fallbackStub;
+
+ CapturingTranslator(
+ org.apache.flink.configuration.ReadableConfig tableConfig,
+ List projection,
+ RexNode condition,
+ InputProperty inputProperty,
+ RowType outputType,
+ String description,
+ Transformation fallbackStub) {
+ super(tableConfig, projection, condition, inputProperty, outputType, description);
+ this.fallbackStub = fallbackStub;
+ }
+
+ // No @Override: javac on some classpaths resolves StreamExecCalc to Flink's stock
+ // class (which lacks translateToFlinkCalc). Runtime virtual dispatch still routes
+ // the shadow's call here because the loaded StreamExecCalc is our shadow.
+ protected Transformation translateToFlinkCalc(PlannerBase planner, ExecNodeConfig config) {
+ fallbackCount++;
+ return fallbackStub;
+ }
+ }
+
+ // =====================================================================
+ // Fake upstream ExecNode and Transformation
+ // =====================================================================
+
+ /**
+ * Minimal subclass of {@link ExecNodeBase} used as the source of a fake {@link ExecEdge}. Its
+ * cached {@code transformation} field is set via reflection so {@code translateToPlan}
+ * short-circuits and never touches the planner.
+ */
+ static class FakeSourceExecNode extends ExecNodeBase implements StreamExecNode {
+ FakeSourceExecNode(LogicalType outputType, String description) {
+ super(
+ ExecNodeContext.newNodeId(),
+ new ExecNodeContext("fake-src_1"),
+ new Configuration(),
+ Collections.emptyList(),
+ outputType,
+ description);
+ }
+
+ void setCachedTransformation(Transformation t) throws Exception {
+ Field f = ExecNodeBase.class.getDeclaredField("transformation");
+ f.setAccessible(true);
+ f.set(this, t);
+ }
+
+ @Override
+ protected Transformation translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
+ // Never invoked because translateToPlan short-circuits on the cached transformation.
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Minimal stub {@link Transformation} subclass — we reuse {@link SourceTransformation}'s shape
+ * via a no-op subclass so {@code getParallelism()} returns a sane integer.
+ */
+ static class FakeSourceTransformation extends Transformation {
+ FakeSourceTransformation() {
+ super("fake-src", InternalTypeInfo.of(TWO_INT_ROW), 1);
+ }
+
+ @Override
+ public List> getTransitivePredecessors() {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public List> getInputs() {
+ return Collections.emptyList();
+ }
+ }
+
+ // =====================================================================
+ // Unregistered RexNode subclass for the dedup test
+ // =====================================================================
+
+ /** {@link RexNode} subclass that is not registered in the factory so it routes through the
+ * "no converter registered" path of {@link FlinkNodeConverterFactory#convertRexNode}. */
+ static class UnregisteredRex extends RexNode {
+ private final RelDataType type;
+
+ UnregisteredRex(RelDataType type) {
+ this.type = type;
+ }
+
+ @Override
+ public RelDataType getType() {
+ return type;
+ }
+
+ @Override
+ public R accept(org.apache.calcite.rex.RexVisitor visitor) {
+ return null;
+ }
+
+ @Override
+ public R accept(org.apache.calcite.rex.RexBiVisitor visitor, P arg) {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(this);
+ }
+ }
+
+ // =====================================================================
+ // RawType serializer (test helper for the schema-conversion failure path)
+ // =====================================================================
+
+ /** No-op TypeSerializer required to construct a {@link RawType}. */
+ static class ObjectSerializer extends org.apache.flink.api.common.typeutils.TypeSerializer