Skip to content

[flink] add support for flink 2.3#3521

Open
sd4324530 wants to merge 3 commits into
apache:mainfrom
sd4324530:support-flink-2.3
Open

[flink] add support for flink 2.3#3521
sd4324530 wants to merge 3 commits into
apache:mainfrom
sd4324530:support-flink-2.3

Conversation

@sd4324530

@sd4324530 sd4324530 commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Purpose

Linked issue: close #3520

This PR adds Flink 2.3 engine support to Apache Fluss by introducing a new fluss-flink-2.3 module. To accommodate the API changes in CatalogMaterializedTable / IntervalFreshness introduced in Flink 2.3, corresponding version adapters are introduced in the common module so that cross-version connector code can compile against multiple Flink majors.

Brief change log

1. New fluss-flink-2.3 module

  • Added fluss-flink-2.3 as a sub-module in fluss-flink/pom.xml to provide connector support for Flink 2.3-based deployments.
  • The module mirrors the structure of fluss-flink-2.2 and ships with complete source and test directories. Version-specific adapters (e.g., MultipleParameterToolAdapter, SchemaAdapter, SinkAdapter, TypeInformationAdapter) follow the existing pattern and are provided alongside the module.

2. New version adapters in the common module

To handle the API changes of CatalogMaterializedTable / IntervalFreshness in Flink 2.3, the following adapters are introduced under fluss-flink-common/src/main/java/org/apache/fluss/flink/adapter/:

  • CatalogMaterializedTableAdapter: wraps CatalogMaterializedTable.Builder to abstract away differences introduced in Flink 2.3 (such as the new originalQuery / expandedQuery fields), so the shared common code does not depend on a specific Flink version.

  • IntervalFreshnessAdapter: a new adapter for IntervalFreshness and its inner TimeUnit enum. In Flink 2.3 the IntervalFreshness.TimeUnit type has been reworked (moved/repackaged), so this adapter provides a unified way to parse and serialize time units — converting between String names and the version-specific TimeUnit enum (via the TimeUnitAdapter wrapper). The common module no longer needs to depend on a Flink-version-specific IntervalFreshness.TimeUnit class, and the existing MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT config can keep its stringType() form unchanged.

    ⚠️ Note on the diff: the deletion of ResolvedCatalogMaterializedTableAdapter and the addition of IntervalFreshnessAdapter may show up in the GitHub PR diff as a rename (51% similarity). This is a false positive: their Apache 2.0 license headers happen to push the file-level similarity above Git's 50% rename-detection threshold, but the two classes have independent responsibilities and share no business logic. The change is in reality a pure add + delete:

    • Removed (fluss-flink-common/src/test/java/.../adapter/ResolvedCatalogMaterializedTableAdapter.java): the old test-only static helper that faked a 2-arg ResolvedCatalogMaterializedTable constructor call as a workaround for https://issues.apache.org/jira/browse/FLINK-38532. It is not removed because FLINK-38532 has been fixed — rather, the test code has been refactored to use a template-method pattern (see point 5 below), so the workaround is no longer referenced anywhere and the helper becomes dead code.
    • Added (fluss-flink-common/src/main/java/.../adapter/IntervalFreshnessAdapter.java): a fresh public adapter for IntervalFreshness.TimeUnit parsing/serialization, unrelated to the removed helper.

    Reviewers can confirm this by running git show 21c65a6c --no-renames --name-status, which reveals the real A + D pair.

3. Configuration and serialization compatibility

  • FlinkConnectorOptions.MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT is changed from enumType(IntervalFreshness.TimeUnit.class) to stringType(), avoiding a hard dependency on a Flink-version-specific enum class in the common module; concrete enum parsing is delegated to IntervalFreshnessAdapter.
  • FlinkConversions is updated to use the new adapters for materialization-table serialization/deserialization, ensuring consistent semantics across versions.

4. Back-port to fluss-flink-2.2

CatalogMaterializedTableAdapter is also added to fluss-flink-2.2, and Flink22CatalogTest is updated accordingly, so that the 2.2 module continues to share the same common code after the new adapter is introduced.

Same git rename false positive applies here: the deletion of fluss-flink-2.2/src/test/.../ResolvedCatalogMaterializedTableAdapter.java and the addition of fluss-flink-2.2/src/main/.../CatalogMaterializedTableAdapter.java are also displayed as a 50%-similarity rename for the same license-header reason. They are independent changes.

5. Test refactor: template-method pattern for version-specific constructors

The old ResolvedCatalogMaterializedTableAdapter.create() helper masked the Flink-version-specific ResolvedCatalogMaterializedTable constructor signature with a 2-arg fake. To support Flink 2.3's new 5-arg constructor (which adds StartMode), the catalog test hierarchy is refactored to a template-method pattern:

  • The parent FlinkCatalogTest (in fluss-flink-common) now exposes a protected createResolvedCatalogMaterializedTable(...) method with a default 2-arg-constructor implementation.
  • Flink22CatalogTest overrides it to use the 4-arg constructor (origin, resolvedSchema, refreshMode, intervalFreshness).
  • Flink23CatalogTest overrides it to use the 5-arg constructor (additionally passing StartMode.of(StartMode.StartModeKind.FROM_BEGINNING)).

This lets each Flink version exercise its native constructor signature, removes the need for the static helper, and makes the test code self-documenting about which Flink version it targets. Note that the parent FlinkCatalogTest no longer imports ResolvedCatalogMaterializedTableAdapter.

6. Test coverage

  • A complete ITCase suite (Flink23*ITCase) is added under the fluss-flink-2.3 module, covering catalog, metrics, procedure, authorization, sink, source (including binlog/changelog virtual tables, delta join, failover), and tiering.
  • Flink23MultipleParameterToolTest is added to validate MultipleParameterToolAdapter behavior.
  • FlinkCatalogTest and Flink22CatalogTest are updated for the adapter-related cases.

7. Test compatibility fix: Flink 2.3 ON CONFLICT validation vs. Delta Join tests

While porting the Delta Join ITCases to Flink 2.3, an unexpected upstream planner behavior change was discovered. This PR works around it in the test code only; the broader impact on Fluss end-users upgrading to Flink 2.3 is left for community discussion.

  • What changed in Flink 2.3. Flink 2.3 introduces a new planner option ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT (table.exec.sink.require-on-conflict, default true). Inside FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor.analyzeUpsertMaterializeStrategy, Flink now throws ValidationException("The query has an upsert key that differs from the primary key of the sink table ...") whenever:

    • TABLE_EXEC_SINK_UPSERT_MATERIALIZE = AUTO (default), and
    • the sink table's primary key does not contain the query's upsert keys, and
    • no ON CONFLICT clause is supplied.

    In Flink 2.2 this validation did not exist.

  • Why this affects Fluss's Delta Join tests. Delta Join semantics define the upsert key from the join condition, not from the sink's primary key. In several Flink23DeltaJoinITCase scenarios (e.g., testDeltaJoinWithJoinKeyExceedsPrimaryKey with join condition c1=c2 AND d1=d2 AND e1=e2 into a sink whose PK is (c1, d1)), the upsert key (c1, d1, e1) legitimately exceeds the sink PK. Under Flink 2.2 these tests asserted that StreamPhysicalDeltaJoinForceValidator throws The current sql doesn't support to do delta join optimization. Under Flink 2.3 the new ON CONFLICT validator fires before the Delta Join validator is reached, so the original error message is never produced and the assertions fail.

  • Why this PR only touches the tests. The new Flink validation is the correct behavior in the general case — silently allowing mismatched upsert keys leads to non-deterministic results at the sink, which is exactly what Flink 2.3 is trying to prevent. Disabling it in the connector would silently regress that protection for all Fluss users. Whether/how Fluss should expose this knob (e.g., as a Fluss-level connector option, or by injecting a ConflictStrategy from FlinkTableSink when the underlying Fluss table has a last_row merge engine) is a product-level decision that should be discussed with the community and is out of scope for this PR.

    This is the minimal, scoped workaround. Flink22DeltaJoinITCase is untouched (the option did not exist in 2.2).

8. Test compatibility fix: Flink 2.3 ON CONFLICT validation vs. Table Sink partial-upsert tests

A second, distinct impact of the same Flink 2.3 validation was uncovered while running Flink23TableSinkITCase on CI: the Fluss partial-upsert test path is also blocked. Same planner option, same root cause, but a different surface — addressed with the same minimal-scoped pattern.

  • The fix. Flink23TableSinkITCase was previously an empty subclass of FlinkTableSinkITCase. This PR turns it into a proper subclass with a single @BeforeEach that disables TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT on the streaming TableConfig, mirroring the precedent set in Section 7.

  • Why this PR only touches the tests. Same reasoning as Section 7: Flink 2.3's validation is the correct general-case behavior, and silently turning it off at the connector level would regress the protection for real users. The proper follow-up is for Fluss to participate in the new model — most naturally by having FlinkTableSink.applyOperations(...) (or getSinkRuntimeProvider) inject a ConflictStrategy (e.g., DEDUPLICATE, semantically equivalent to Fluss's current first-write-wins plus target-column overwrite) when the sink is a Fluss PK table, so partial upserts remain expressible from SQL. That is a product/API change and is explicitly out of scope for this engine-port PR.

    Flink22TableSinkITCase is untouched (the option did not exist in 2.2).

Tests

  • Unit tests:
    • Flink23MultipleParameterToolTest
    • Flink23CatalogTest
    • Flink23TieringCommitOperatorTest
    • FlinkCatalogTest, Flink22CatalogTest (adapter-related cases updated; Flink22/23CatalogTest now exercise their native ResolvedCatalogMaterializedTable constructors via the new template-method override)
  • Integration tests (ITCases, all under the fluss-flink-2.3 module):
    • Flink23CatalogITCase, Flink23MaterializedTableITCase
    • Flink23MetricsITCase
    • Flink23ProcedureITCase
    • Flink23AuthorizationITCase
    • Flink23ComplexTypeITCase, Flink23TableSinkITCase (now works around Flink 2.3's new ON CONFLICT planner validation for partial upserts, see Section 8), Flink23UndoRecoveryITCase
    • Flink23BinlogVirtualTableITCase, Flink23ChangelogVirtualTableITCase, Flink23DeltaJoinITCase (now works around Flink 2.3's new ON CONFLICT planner validation, see Section 7), Flink23TableSourceBatchITCase, Flink23TableSourceFailOverITCase, Flink23TableSourceITCase
    • Flink23TieringITCase

API and Format

No breaking changes to the public API.

Documentation

Signed-off-by: Pei Yu <125331682@qq.com>
@sd4324530 sd4324530 force-pushed the support-flink-2.3 branch 3 times, most recently from 781644b to d5a43c4 Compare June 26, 2026 03:37
…Case

Flink 2.3 introduces ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT
(table.exec.sink.require-on-conflict), defaulting to true. In
FlinkChangelogModeInferenceProgram, this triggers a ValidationException
("upsert key differs from primary key") before the
StreamPhysicalDeltaJoinForceValidator runs, so the Delta Join ITCases can no
longer reach the original "doesn't support to do delta join optimization"
error path.

Disable the option in Flink23DeltaJoinITCase#beforeEach so the existing
assertions remain valid. Production-side impact (real Fluss users hitting
this on multi-table joins / group-by + insert) is left to community
discussion.

Signed-off-by: Pei Yu <125331682@qq.com>
@sd4324530 sd4324530 force-pushed the support-flink-2.3 branch from d5a43c4 to 859d4b7 Compare June 26, 2026 04:52
…Case

Flink 2.3 introduces ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT
(table.exec.sink.require-on-conflict), defaulting to true. In
FlinkChangelogModeInferenceProgram, this triggers a ValidationException
("upsert key differs from primary key") before the partial-update handling
in FlinkTableSink#getSinkRuntimeProvider runs, so the partial upsert ITCases
(testPartialUpsert and testPartialUpsertDuringAddColumn) in
FlinkTableSinkITCase can no longer reach the Fluss sink layer.

Disable the option in Flink23TableSinkITCase#beforeEach so the existing
partial-upsert assertions remain valid.

Signed-off-by: Pei Yu <125331682@qq.com>
@sd4324530 sd4324530 force-pushed the support-flink-2.3 branch from bd9decf to 018112e Compare June 26, 2026 07:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[flink] support flink 2.3

1 participant