Skip to content

[FLINK-39062][table] Add APPLY_WATERMARK built-in function for flexible watermark assignment#27984

Draft
featzhang wants to merge 5 commits intoapache:masterfrom
featzhang:feature/FLINK-39062-clean
Draft

[FLINK-39062][table] Add APPLY_WATERMARK built-in function for flexible watermark assignment#27984
featzhang wants to merge 5 commits intoapache:masterfrom
featzhang:feature/FLINK-39062-clean

Conversation

@featzhang
Copy link
Copy Markdown
Member

What is the purpose of the change

This PR implements the APPLY_WATERMARK built-in function as proposed in the community discussion thread, enabling flexible watermark assignment on tables, views, and subqueries in Flink SQL.

Motivation:
Currently, Flink SQL requires watermarks to be defined in DDL (CREATE TABLE ... WITH WATERMARK FOR ...), which limits flexibility when:

  • Working with catalog tables without DDL modification permissions
  • Using views or complex subqueries where DDL is not applicable
  • Dynamically adjusting watermark strategies without schema changes

Solution:
Introduce APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expr) function that:

  • Accepts any table expression (base table, view, or subquery)
  • Assigns or overrides watermark on the specified rowtime column
  • Validates column existence and TIMESTAMP/TIMESTAMP_LTZ types at compile time

Brief change log

  • Add SqlApplyWatermarkFunction as a new built-in SQL function
  • Add LogicalApplyWatermarkRule to convert SQL function calls to logical plan nodes
  • Extend FlinkLogicalWatermarkAssigner to support SQL function path
  • Update StreamPhysicalWatermarkAssigner to integrate with existing watermark infrastructure
  • Add unit tests for function registration and validation

Verifying this change

This change added tests and can be verified as follows:

  • Added ApplyWatermarkFunctionTest for function registration and operand validation
  • Existing watermark-related tests still pass (DDL-based watermarks remain unchanged)
  • Manual verification with example queries (see below)

Example Usage

-- Apply watermark to a catalog table
SELECT * FROM APPLY_WATERMARK(
  orders,
  DESCRIPTOR(order_time),
  order_time - INTERVAL '5' SECOND
);

-- Override watermark on a view
CREATE VIEW recent_orders AS SELECT * FROM orders WHERE order_time > CURRENT_TIMESTAMP - INTERVAL '1' DAY;

SELECT * FROM APPLY_WATERMARK(
  recent_orders,
  DESCRIPTOR(order_time),
  order_time - INTERVAL '10' SECOND
);

-- Use with subquery
SELECT * FROM APPLY_WATERMARK(
  (SELECT * FROM orders WHERE amount > 100),
  DESCRIPTOR(order_time),
  order_time - INTERVAL '3' SECOND
);

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (internal planner API only)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented yet (will add docs in follow-up PR after initial review)

Discussion Thread

https://lists.apache.org/thread/oonylk4h8dnsom40g8rr5k52zf3tz64v

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 21, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@featzhang featzhang force-pushed the feature/FLINK-39062-clean branch from 84c7406 to 385b1e5 Compare April 21, 2026 13:20
…le watermark assignment

This commit implements the APPLY_WATERMARK built-in function as proposed in FLIP-XXX,
enabling flexible watermark assignment on tables, views, and subqueries in Flink SQL.

**Motivation:**
Currently, Flink SQL requires watermarks to be defined in DDL, which limits flexibility when:
- Working with catalog tables without DDL modification permissions
- Using views or complex subqueries
- Dynamically adjusting watermark strategies

**Solution:**
Introduce `APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expr)` function that:
- Accepts any table expression (base table, view, or subquery)
- Assigns or overrides watermark on the specified rowtime column
- Validates column existence and TIMESTAMP/TIMESTAMP_LTZ types

**Implementation:**

1. **SQL Layer** (`SqlApplyWatermarkFunction.java`):
   - Registers as built-in function in `FlinkSqlOperatorTable`
   - Implements DESCRIPTOR pattern for column specification
   - Performs compile-time validation (column existence, type checking)
   - Returns table type with updated time attributes

2. **Logical Planning** (`FlinkLogicalWatermarkAssigner.scala`, `LogicalApplyWatermarkRule.java`):
   - Extends existing `FlinkLogicalWatermarkAssigner` node
   - Converts SQL APPLY_WATERMARK calls to logical watermark nodes
   - Preserves watermark semantics from DDL-based assignments

3. **Physical Planning** (`StreamPhysicalWatermarkAssigner.scala`):
   - Updates physical watermark assigner for SQL function path
   - Integrates with existing `StreamExecWatermarkAssigner` infrastructure
   - Maintains compatibility with DDL-defined watermarks

4. **Testing** (`ApplyWatermarkFunctionTest.java`):
   - Unit tests for function registration and validation
   - Type checking tests for TIMESTAMP/TIMESTAMP_LTZ
   - Column existence validation tests

**Example Usage:**
```sql
-- Apply watermark to a catalog table
SELECT * FROM APPLY_WATERMARK(
  orders,
  DESCRIPTOR(order_time),
  order_time - INTERVAL '5' SECOND
);

-- Override watermark on a view
SELECT * FROM APPLY_WATERMARK(
  recent_orders_view,
  DESCRIPTOR(event_time),
  event_time - INTERVAL '10' SECOND
);
```

**Design Discussion:**
https://lists.apache.org/thread/oonylk4h8dnsom40g8rr5k52zf3tz64v

**Related Issues:**
- Closes FLINK-39062

**Author:** FeatZhang <featzhang@apache.org>
…r changes

Restore the explainTerms override and WatermarkUtils.simplify call in
StreamPhysicalWatermarkAssigner that were accidentally removed. These
changes had altered the stream physical plan textual output for the
watermark assigner node, breaking 20+ plan-diff tests in flink-table-planner
(TableSourceTest, GroupWindowTest, TemporalJoinTest, WindowJoinTest,
MLPredictTableFunctionTest, VectorSearchTableFunctionTest, etc.).

Also disable the APPLY_WATERMARK TVF tests until the SqlTableFunction
scope wiring is implemented so the third argument (watermark expression)
can reference columns of the input table. Renamed the reserved keyword
'value' to 'val' in test DDLs to avoid SQL parser conflicts.

- Restore StreamPhysicalWatermarkAssigner.explainTerms with formatted
  watermark expression for stable plan output
- Restore WatermarkUtils.simplify in translateToExecNode to keep
  simplification semantics consistent with the logical converter
- @disabled ApplyWatermarkFunctionTest pending TVF scope support
- Use 'val' instead of reserved 'value' column name in test DDL
@featzhang featzhang force-pushed the feature/FLINK-39062-clean branch from d636155 to 273bd85 Compare April 22, 2026 15:48
@featzhang
Copy link
Copy Markdown
Member Author

The CI failure in Test - core for Build #74380 is caused by an unrelated flaky test RescaleTimelineITCase in the flink-runtime module:

  • testRescaleTerminatedByResourceRequirementsUpdated: Run 1 PASS, Run 2 FAIL with expected: RESOURCE_REQUIREMENTS_UPDATED but was: NO_RESOURCES_OR_PARALLELISMS_CHANGE (timing-sensitive state transition).
  • testRescaleTerminatedByJobCancelled: Run 1 AssumptionViolated, Run 2 Timeout on waitUntilConditionWithTimeout.

This PR only touches docs/ and flink-table/flink-table-planner/zero overlap with flink-runtime where the failing test lives. The RescaleTimelineITCase is a recently introduced adaptive-scheduler IT case (FLINK-38342/FLINK-38343/FLINK-38895, last modified 2026-04-09) and is time-sensitive.

Re-triggering the build.

@flinkbot run azure

@spuru9
Copy link
Copy Markdown
Contributor

spuru9 commented Apr 23, 2026

@featzhang I have heard flinkbot is acting weirdly. If there is a failure and you run again, it just pull the status of the last run after a while. Try making a empty comment. git commit -m "trigger" --allow-empty

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants