[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490
[Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter#62490JNSimba wants to merge 22 commits intoapache:masterfrom
Conversation
…tJob create and alter Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
… alter offset test
- CDC ALTER only allows JSON specific offset (reject named modes like initial/latest)
- ALTER offset uses PROPERTIES('offset'='{"file":"xxx","pos":"yyy"}') syntax
- Update regression cases to use PROPERTIES for ALTER
- Add cdc_stream TVF ALTER offset regression test
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
…urceType().name() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR extends StreamingInsertJob CDC offset support so users can specify named offsets (initial/snapshot/latest/earliest) and JSON “specific offsets” when creating jobs via FROM MYSQL/POSTGRES, and JSON offsets when altering jobs (with validation and BE parsing support).
Changes:
- FE: extend source config validation to accept additional offset modes and JSON offsets; wire
validateSourceto be data-source-type aware; allow CDC jobs to useoffsetproperty and restrict ALTER to JSON offsets for CDC. - FE: implement/adjust JDBC offset property deserialization and improve
JdbcOffsetbehaviors (isValidOffset,toSerializedJson). - BE: add PostgreSQL JSON LSN offset support and handle
SPECIFIC_OFFSETSstartup mode.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy | Adds TVF-path regression coverage for ALTER with JSON MySQL binlog offset. |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy | Adds PG regression coverage for create offsets + ALTER with JSON LSN, plus invalid cases. |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy | Adds FE-restart regression coverage for MySQL JSON binlog offset persistence. |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy | Adds MySQL regression coverage for earliest/latest/JSON offsets and ALTER behavior. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java | Adds JSON LSN startup offset handling for PostgreSQL. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Handles SPECIFIC_OFFSETS startup mode by constructing offset from config. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java | Passes source type into source config validation for offset-type-specific rules. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java | Passes data source type into source config validation during ALTER. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Implements offset property deserialization for named modes and JSON. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java | Adjusts offset validity and serialization behavior. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Enables offset property validation beyond S3 and restricts CDC ALTER to JSON offsets. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java | Adds data-source-type-aware offset validation and JSON detection helper. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Found 1 blocking issue.
- High - now serializes JDBC offsets in a different shape from what still reads. For binlog offsets the new JSON nests fields under , but the deserialize/replay path still expects a flattened list of string maps. persists this serialized value immediately after , so cloud replay / offset restore can no longer round-trip the altered CDC offset correctly.
Critical checkpoints:
- Goal of task: Support special offsets on create/alter. Partially achieved; create/alter paths were extended and regression tests were added, but the new serialized offset format breaks persisted/replayed CDC offsets in cloud mode.
- Modification size/focus: Focused overall.
- Concurrency: No new concurrency concerns identified in the touched code paths.
- Lifecycle/static init: No special lifecycle or static-init issues identified.
- Configuration: No new configs added.
- Compatibility/storage format: Blocking issue. The PR changes serialized JDBC offset shape without updating the corresponding deserialize/replay path.
- Parallel code paths: Checked both and job paths; the persistence problem sits in shared JDBC offset serialization.
- Special conditional checks: The new ALTER restriction to JSON offsets is understandable and commented.
- Test coverage: New regression tests cover MySQL/Postgres create/alter and one FE-restart scenario, but they do not cover cloud-mode reset/replay of an altered JSON offset.
- Observability: Existing logs seem adequate for this path.
- Transaction/persistence: Problematic because offset reset/replay persists a format the current reader does not consume.
- Data write/atomicity: No direct new write-atomicity issue beyond the persisted offset corruption risk.
- FE-BE variable passing: No new FE-BE field propagation issues identified.
- Performance: No material performance regressions identified in the reviewed diff.
- Other issues: None beyond the blocking serialization mismatch.
…vider, fix serialization
- Add validateAlterOffset() to SourceOffsetProvider interface, CDC impl
rejects non-JSON offset, S3 impl allows any format (default no-op)
- StreamingInsertJob delegates ALTER offset validation to provider
- AlterJobCommand.validateProps calls validateAlterOffset before validateOffset
- Restore S3-only guard in initInsertJob for PROPERTIES offset
- Fix JdbcOffset.toSerializedJson() to produce flat format compatible
with deserializeOffset(): [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}]
- Remove unnecessary ts_usec padding in PostgresSourceReader JSON offset
- Remove instanceof check in modifyPropertiesInternal (validation moved to AlterJobCommand)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
|
run buildall |
…izeOffsetProperty Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Found 2 blocking correctness issues.
- FE accepts any JSON offset on ALTER for JDBC jobs.
JdbcSourceOffsetProvider.validateAlterOffset()anddeserializeOffsetProperty()only gate onisJsonOffset(), so a PostgreSQL job can accept MySQL-style JSON like{"file":"binlog.000001","pos":"154"}(and vice versa). ALTER succeeds, but the resumed job then fails in the broker becausePostgresSourceReader.generatePostgresConfig()requireslsn. - ALTER on the
FROM MYSQL/POSTGRES ... TOpath does not persist the new startup offset intosourceProperties.modifyPropertiesInternal()updates only the in-memory provider, while replay/reconstruction usessourceProperties(for example ingsonPostProcess()and when creating the next multi-table task). If FE restarts before the first post-ALTER commit, the altered offset is lost and the job resumes from the old mode/offset.
Critical checkpoint conclusions:
- Goal of current task: Partially accomplished. The PR adds special-offset support, but the ALTER path is still incorrect in the two scenarios above, so the feature is not correct end-to-end.
- Modification size/clarity: Mostly focused, but the ALTER validation/persistence logic is split in a way that misses source-specific constraints and durable state updates.
- Concurrency: No new lock-order or thread-safety issue found in the touched code.
- Lifecycle/static initialization: No special lifecycle or static-init issue found.
- Configuration items: No new config items added.
- Compatibility: No incompatible wire/storage format change identified.
- Parallel code paths: The CREATE path and downstream readers enforce source-specific semantics, but the ALTER path does not; that inconsistency is the root cause of one blocker.
- Special conditional checks: The new JSON-only check is too broad and should be source-type-aware.
- Test coverage: New regression tests cover many happy paths, but they do not cover invalid cross-source JSON on ALTER or FE restart before the first post-ALTER commit on the
FROM ... TOpath. - Observability: Existing logging looks sufficient to diagnose these failures.
- Transaction/persistence: There is a persistence bug for altered offsets on the
FROM ... TOpath because replay reconstructs from stalesourceProperties. - Data writes/modifications: The restart case can resume consumption from the wrong offset, so data correctness is at risk.
- FE/BE variable passing: No new propagation issue beyond the stale
sourcePropertiesstate above. - Performance: No material performance regression found in the touched code.
- Other issues: None beyond the two blocking findings above.
|
run buildall |
FE Regression Coverage ReportIncrement line coverage |
…tJob create and alter Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… alter offset test
- CDC ALTER only allows JSON specific offset (reject named modes like initial/latest)
- ALTER offset uses PROPERTIES('offset'='{"file":"xxx","pos":"yyy"}') syntax
- Update regression cases to use PROPERTIES for ALTER
- Add cdc_stream TVF ALTER offset regression test
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…urceType().name() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…vider, fix serialization
- Add validateAlterOffset() to SourceOffsetProvider interface, CDC impl
rejects non-JSON offset, S3 impl allows any format (default no-op)
- StreamingInsertJob delegates ALTER offset validation to provider
- AlterJobCommand.validateProps calls validateAlterOffset before validateOffset
- Restore S3-only guard in initInsertJob for PROPERTIES offset
- Fix JdbcOffset.toSerializedJson() to produce flat format compatible
with deserializeOffset(): [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}]
- Remove unnecessary ts_usec padding in PostgresSourceReader JSON offset
- Remove instanceof check in modifyPropertiesInternal (validation moved to AlterJobCommand)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…izeOffsetProperty Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
a543e1b to
b7c7a15
Compare
…oris into improve_special_offset
…mprove test cases - MySQL BE: supplement kind=SPECIFIC for ALTER offset in createBinlogSplit, add WARN log for fallback to config/earliest offset - PG BE: supplement ts_usec=0 in createOffset for ALTER offset - FE: sync offsetProviderPersist after ALTER offset update, remove debug logs from hasMoreDataToConsume - Improve test cases: verify ALTER offset truly takes effect by checking data before/after the offset mark position - Add golden files for qt_select assertions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
There was a problem hiding this comment.
Findings:
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java: FE now accepts any JSON object as a JDBC offset, but the MySQL/PostgreSQL readers only support source-specific shapes (file+posfor MySQL,lsnfor PostgreSQL). That means malformed JSON offsets can pass DDL-time validation and only fail later at runtime.fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java:ALTER JOB ... FROM MYSQL/POSTGRES (... 'offset'=...)is validated as if it were supported, but the altered source offset is never applied tooffsetProvider.currentOffset, so the resumed job continues from the old position.
Critical checkpoint conclusions:
- Goal of the task: Partially accomplished. The PR adds offset support for CDC streaming jobs, but the two issues above leave important alter/validation paths incorrect. Existing regression tests cover happy paths, but they do not prove malformed JSON shape rejection or the
ALTER JOB ... FROM ...offset-reset behavior. - Small / clear / focused: Mostly yes. The change set is focused on CDC offset handling, FE validation, and regression coverage.
- Concurrency: No new blocking concurrency issue found in the reviewed FE/CDC paths. The changes stay on existing job/task flows and do not introduce new lock ordering problems in the touched code.
- Lifecycle / static initialization: No new special lifecycle or static initialization issue found.
- Configuration items: No new config item added.
- Compatibility / incompatible change: No storage-format or protocol compatibility break was identified, but FE/BE offset-shape validation is now inconsistent.
- Parallel code paths: Not fully handled. The TVF
PROPERTIES('offset'=...)path and theFROM MYSQL/POSTGRESalter path are not updated consistently. - Special conditional checks: Not sufficient. JSON-offset checks currently validate only “is object JSON”, not whether the shape is valid for the selected source.
- Test coverage: Incomplete for the new behavior. Missing negative coverage for source-specific malformed JSON offsets and for
ALTER JOB ... FROM MYSQL/POSTGRES (... 'offset'=...)actually taking effect. - Modified test results: The added
.outfiles look consistent with the checked-in suites, but they do not cover the blocking cases above. - Observability: Existing logging looks adequate for these paths.
- Transaction / persistence: No new edit-log format issue found, but the
FROM ...alter path fails to update the provider state that actually drives subsequent consumption. - Data writes / modifications: Blocking issue present. A user can request an offset reset through the
FROM MYSQL/POSTGRESalter syntax and the job can continue consuming from the previous offset instead. - FE/BE variable passing: The logical contract for offset format is incomplete; unsupported JSON shapes are allowed through FE and then rejected later by the CDC reader implementation.
- Performance: No new blocking performance issue found.
- Other issues: None beyond the two findings above.
Requesting changes because both findings can produce incorrect user-visible behavior for the new offset feature.
… instead of 2 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… guide user to PROPERTIES Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… add test cases Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…rease PG await timeout Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
Found 3 blocking correctness issues.
- High -
StreamingInsertJob.replayOnUpdated()reusesmodifyPropertiesInternal()during edit-log and image replay, and the new CDC path now drivesresetCloudProgress()from that helper. In cloud modereset_streaming_job_offsetoverwrites the commit-attach offset stored in meta-service. Replaying an old ALTER entry can therefore rewind the cloud progress back to the ALTER offset, so the nextreplayOnCloudMode()may resume from stale progress and re-consume data. - High -
JdbcSourceOffsetProvider.validateAlterOffset()anddeserializeOffsetProperty()still treat any JSON object as a valid JDBC offset. For example, a MySQL job can now be altered toPROPERTIES("offset"="{"lsn":"123"}"). FE accepts it, butMySqlSourceReader.createBinlogSplit()cannot build a specific binlog position from that meta and falls back to the old startup offset or earliest, so the requested reset is silently ignored. - High - the new offset validation only covers
PROPERTIES("offset"=...).ALTER JOB <name> DO INSERT ... SELECT ... FROM cdc_stream(...)can still change the TVF SQL offset, becausecheckUnmodifiableProperties()does not forbid it andalterJob()only refreshesoriginTvfProps. The provider state is never validated or reset from that new SQL, so the resumed job keeps consuming from the old offset while the SQL now shows a different one.
Critical checkpoint conclusions:
- Goal of the current task: Partially accomplished. The PR adds special-offset support, but the three paths above still leave ALTER and replay behavior incorrect end to end.
- Modification size and focus: Focused overall, but the ALTER logic is now split across multiple surfaces and they are not kept consistent.
- Concurrency: No new lock-order or thread-safety bug found in the touched FE and CDC code paths.
- Lifecycle and static initialization: No special lifecycle or static-init issue found.
- Configuration items: No new config item added.
- Compatibility and persistence: Blocking issue present in cloud mode because replay now performs a destructive progress-reset RPC from an edit-log replay path.
- Parallel code paths: Not fully handled.
PROPERTIES("offset"=...)got new validation, but the SQL rewrite path forcdc_streamstill bypasses it. - Special conditional checks: The new JSON-only check is too broad. It validates only that the value is a JSON object, not that the shape matches the selected source.
- Test coverage: New regression tests cover happy paths, but they do not cover cloud replay after ALTER, malformed or cross-source JSON on ALTER, or SQL-based ALTER of
cdc_streamoffsets. - Modified test results: The added
.outfiles are consistent with the checked-in suites, but they do not exercise the blocking cases above. - Observability: Existing logs are sufficient to diagnose these failures.
- Transaction and persistence: Blocking issue present in cloud replay, because progress in meta-service can be rewound during journal replay.
- Data writes and modifications: Blocking issue present. Both the replay regression and the malformed-offset acceptance can make the job resume from the wrong position.
- FE and BE variable passing: No new field-propagation issue found, but the FE and BE offset contract is still incomplete because FE accepts unsupported JSON shapes.
- Performance: No material performance regression found in the reviewed diff.
- Other issues: None beyond the three blockers above.
Requesting changes because these issues can rewind or ignore the requested CDC offset.
| this.offsetProviderPersist = offsetProvider.getPersistInfo(); | ||
| log.info("modifyPropertiesInternal: offset updated to {}, job {}", | ||
| inputStreamProps.getOffsetProperty(), getJobId()); | ||
| if (Config.isCloudMode()) { |
There was a problem hiding this comment.
replayOnUpdated() calls modifyPropertiesInternal() while the update-job edit log is being replayed. With this change, any CDC job that has properties["offset"] now reaches resetCloudProgress() here as well. In cloud mode that RPC is not read-only: reset_streaming_job_offset overwrites the commit-attach offset stored in meta-service. Replaying an old ALTER entry on follower startup can therefore rewind the job progress back to the ALTER offset, and the next replayOnCloudMode() will resume from stale progress. This helper needs a replay-safe path that skips the RPC, or replay should restore only persisted provider state.
| } | ||
|
|
||
| @Override | ||
| public void validateAlterOffset(String offset) throws Exception { |
There was a problem hiding this comment.
isJsonOffset() is too weak for JDBC ALTER validation. deserializeOffsetProperty() plus JdbcOffset.isValidOffset() treat any non-empty JSON object as valid, so a MySQL job can be altered to something like {"lsn":"123"} or {"foo":"bar"}. On resume, MySqlSourceReader.createBinlogSplit() cannot build a specific binlog offset from that meta and falls back to the old startup offset or earliest, so the requested reset is silently ignored. This validator needs to enforce source-specific keys such as file plus pos for MySQL and lsn for PostgreSQL before FE accepts the ALTER.
| // from to job no need valiate offset in job properties | ||
| if (streamingJob.getDataSourceType() == null | ||
| && jobProperties.getOffsetProperty() != null) { | ||
| if (jobProperties.getOffsetProperty() != null) { |
There was a problem hiding this comment.
This only validates the PROPERTIES("offset"=...) path. ALTER JOB <name> DO INSERT ... SELECT ... FROM cdc_stream(...) still goes through the SQL-rewrite branch: checkUnmodifiableProperties() allows changing offset, alterJob() refreshes originTvfProps, but it never calls validateAlterOffset(), validateOffset(), or offsetProvider.updateOffset(). After resume, JdbcTvfSourceOffsetProvider.getNextOffset() keeps using the old provider state, so the SQL-level offset change is silently ignored and it also bypasses the new JSON-only rule. Please either reject SQL-side offset changes for cdc_stream or route them through the same validated reset path.
Summary
FROM MYSQL/POSTGRESpath.DataSourceConfigValidatorto validate offset formats (initial/snapshot/latest/earliest/JSON);earliestis MySQL-only, rejected for PostgreSQL. ImplementJdbcSourceOffsetProvider.deserializeOffsetProperty()for named modes and JSON offset parsing. Remove S3-only restriction inStreamingInsertJob.initInsertJob()andmodifyPropertiesInternal()so CDC jobs can also use offset property. On ALTER, sync offset tosourcePropertiesfor the FROM...TO path.{"lsn":"N"}inPostgresSourceReader.generatePostgresConfig(), and handleSPECIFIC_OFFSETSmode inJdbcIncrementalSourceReader.getStartOffsetFromConfig().JdbcOffset.isValidOffset()andtoSerializedJson()to return meaningful values instead of hardcoded false/null.Test plan
test_streaming_mysql_job_special_offset— earliest/latest/JSON binlog offset with data sync verification, ALTER JOB offset change, invalid format rejectiontest_streaming_postgres_job_special_offset— initial/latest with data sync, ALTER JOB with JSON LSN offset and data sync verification, earliest rejection for PG, invalid format rejectiontest_streaming_mysql_job_special_offset_restart_fe— create job with JSON binlog offset, verify data sync, restart FE, verify job recovery and continued sync🤖 Generated with Claude Code