[FLINK-39732] Introduce TableDiscoverer SPI for flexible table subscription (with default JdbcTableDiscoverer)#4409
Open
loserwang1024 wants to merge 2 commits into
Open
Conversation
…iption (with default JdbcTableDiscoverer)
Contributor
Author
|
@leonardBang , CC |
…iption (with default JdbcTableDiscoverer)
7d3ee3a to
8903047
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
Introduces a new, connector-agnostic TableDiscoverer SPI (loaded via ServiceLoader) to decouple “which tables to subscribe to” logic from individual CDC connectors, and ships a default JDBC-based implementation plus integration tests.
Changes:
- Add
TableDiscovererandTableDiscovererFactorySPI types with ServiceLoader-based resolution. - Provide the default
JdbcTableDiscoverer+JdbcTableDiscovererFactoryimplementation. - Add a MySQL Testcontainers IT case and Maven test dependencies to validate JDBC modes and SPI loading.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscoverer.java | Defines the public SPI interface and lifecycle for table discovery. |
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/TableDiscovererFactory.java | Implements SPI factory lookup and a default Context helper. |
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscoverer.java | Adds the default JDBC-backed discoverer (shared-table mode + custom-query mode). |
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/discover/JdbcTableDiscovererFactory.java | Registers the JDBC discoverer under identifier jdbc. |
| flink-cdc-common/src/main/resources/META-INF/services/org.apache.flink.cdc.common.source.discover.TableDiscovererFactory | Adds ServiceLoader registration for the JDBC factory. |
| flink-cdc-common/src/test/java/org/apache/flink/cdc/common/source/JdbcTableDiscovererITCase.java | Adds integration tests validating SPI loading and both JDBC discovery modes. |
| flink-cdc-common/pom.xml | Adds test dependencies for MySQL Testcontainers-based integration testing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+65
to
+68
| static TableDiscoverer.Context createContext( | ||
| Configuration configuration, ClassLoader classLoader) { | ||
| return new DefaultDiscovererContext(configuration, classLoader); | ||
| } |
Comment on lines
+80
to
+85
| static TableDiscoverer createDiscoverer(String type, ClassLoader classLoader) { | ||
| ClassLoader loader = | ||
| classLoader != null ? classLoader : Thread.currentThread().getContextClassLoader(); | ||
| ServiceLoader<TableDiscovererFactory> serviceLoader = | ||
| ServiceLoader.load(TableDiscovererFactory.class, loader); | ||
|
|
Comment on lines
+52
to
+53
| * <p>using a {@link PreparedStatement} (injection-safe), and only the rows whose subscribe-id | ||
| * matches the configured value are returned. |
Comment on lines
+86
to
+87
| * <p>Null values and rows that cannot be parsed into a valid {@link TableId} are silently skipped. | ||
| */ |
Comment on lines
+171
to
+173
| if (subscribeQuery != null && !subscribeQuery.isEmpty()) { | ||
| // Mode A — custom query takes priority. Filter options are intentionally ignored. | ||
| this.sql = subscribeQuery; |
Comment on lines
+179
to
+181
| } else { | ||
| // Mode B — shared-table filter; subscribe-id is mandatory. | ||
| String tableName = requireNonEmpty(config, TABLE_NAME); |
Comment on lines
+241
to
+248
| private static String requireNonEmpty(Configuration config, ConfigOption<String> option) { | ||
| String value = config.get(option); | ||
| if (value == null || value.isEmpty()) { | ||
| throw new IllegalArgumentException( | ||
| "'" + option.key() + "' is required for JdbcTableDiscoverer."); | ||
| } | ||
| return value; | ||
| } |
Comment on lines
+234
to
+237
| } catch (IllegalArgumentException e) { | ||
| LOG.warn( | ||
| "Skipping invalid table name '{}' returned by JdbcTableDiscoverer.", value); | ||
| } |
Comment on lines
+54
to
+56
| private static final MySQLContainer<?> MYSQL = | ||
| new MySQLContainer<>("mysql:8.0") | ||
| .withDatabaseName("meta_db") |
Comment on lines
+32
to
+37
| <dependency> | ||
| <groupId>mysql</groupId> | ||
| <artifactId>mysql-connector-java</artifactId> | ||
| <version>8.0.27</version> | ||
| <scope>test</scope> | ||
| </dependency> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
fix https://issues.apache.org/jira/browse/FLINK-39732
Summary
Today, every Flink CDC source connector hard-codes its own way of deciding which tables to subscribe to — typically a regex on table names, a hard-coded list, or a connector-specific include/exclude DSL. As Flink CDC grows beyond a single source family (MySQL, Postgres, Fluss, …), this duplication becomes painful:
This issue proposes a small, connector-agnostic, pluggable SPI —
TableDiscoverer— together with a default JDBC-backed implementation that covers the most common "subscription metadata table" pattern. The SPI is loaded via JavaServiceLoader, selected by a single config optiontable.discoverer.type, and shares a unifiedtable.discoverer.*configuration namespace.Motivation
The pluggable subscription mechanism aims to satisfy three demands at once:
A non-goal of this issue is to refactor existing source connectors to use the new SPI. That migration will happen incrementally in follow-up issues; this PR only introduces and ships the SPI plus a default implementation so that downstream connectors can opt in.
Public API
TableDiscovererLifecycle contract:
open(Context)is invoked exactly once before the firstdiscover().discover()may be called multiple times during the source lifetime; implementations are encouraged to be idempotent and fast.close()is called when the discoverer is no longer needed (job teardown, source restart) and must release I/O resources held inopen().TableDiscovererFactoryResolution rules:
ServiceLoader<TableDiscovererFactory>.table.discoverer.typeis matched case-insensitively againstidentifier().IllegalStateExceptionat startup (clear error message lists offending classes).IllegalArgumentExceptionlisting the set of known identifiers, which makes config typos easy to debug.Configuration namespace
All built-in and third-party discoverers are expected to use the unified prefix
table.discoverer.*. The single global key is:table.discoverer.typejdbc).Implementation-specific keys live under
table.discoverer.<identifier>.*.Default implementation:
JdbcTableDiscovererThe JDBC discoverer reads the subscription list from any JDBC-compatible database. It supports two modes, chosen implicitly by the configuration:
Mode A — Shared subscription table (default & recommended)
Designed for the common multi-tenant pattern where many CDC jobs share one metadata table and each job filters by a
subscribe-id.Effective query (built with a
PreparedStatement, injection-safe on the parameter):table.discoverer.jdbc.urltable.discoverer.jdbc.usernametable.discoverer.jdbc.passwordtable.discoverer.jdbc.table-nametable.discoverer.jdbc.subscribe-idtable.discoverer.jdbc.column-namesubscribe_table_nametable.discoverer.jdbc.subscribe-id-columnsubscribe_idWHEREclauseRecommended schema:
The legacy "unset
subscribe-id⇒ scan the whole metadata table" fallback has been intentionally removed: leavingsubscribe-idblank is now an error, which prevents silent over-subscription in shared deployments.Mode B — Custom SELECT (escape hatch)
For uncommon layouts (JOINs, soft filters, multi-column metadata) users may set:
table.discoverer.jdbc.subscribe-queryExample