Skip to content

[FLINK-39775][mysql] Release snapshot split metadata after entering the stream phase#4418

Open
spoorthibasu wants to merge 1 commit into
apache:masterfrom
spoorthibasu:FLINK-39775-release-snapshot-split-metadata
Open

[FLINK-39775][mysql] Release snapshot split metadata after entering the stream phase#4418
spoorthibasu wants to merge 1 commit into
apache:masterfrom
spoorthibasu:FLINK-39775-release-snapshot-split-metadata

Conversation

@spoorthibasu
Copy link
Copy Markdown

@spoorthibasu spoorthibasu commented May 29, 2026

TL;DR: Fix a JobManager OOM on large tables by releasing the snapshot split metadata once the source enters the binlog phase, with the release tied to the current assignment so a reader failover can't drop metadata the new reader still needs.

Root Cause

After the snapshot phase finishes and the binlog split is assigned, the source coordinator keeps the full snapshot split metadata in memory indefinitely:

  • MySqlSnapshotSplitAssigner holds assignedSplits, splitFinishedOffsets, and tableSchemas.
  • snapshotState() rebuilds the complete SnapshotPendingSplitsState on every checkpoint, MySqlHybridSplitAssigner wraps it, and notifyCheckpointComplete() never drops it.
  • MySqlSourceEnumerator keeps a second copy of the same finished-split metadata in its binlogSplitMeta cache.

With many finished splits (around 300K for a 2.5B-row table at the default chunk size of 8096), every post-snapshot checkpoint re-serializes the entire snapshot state. That keeps JobManager heap high, and because the large coordinator state is copied during checkpoint serialization and transfer, it also drives up direct memory and can OOM-kill the container even when heap alone stays under its limit.

Fix

Release the metadata once it is no longer needed for correctness: after the binlog split has been assigned, its metadata fully assembled by the reader, and that is covered by a completed checkpoint.

Reader-side (completion signal)

  • When a complete binlog split enters reading (whether its metadata arrived inline or was assembled over the divided meta-group requests), MySqlSourceReader sends a BinlogSplitMetaAssembledEvent to the coordinator.
  • The event is idempotent: it is re-sent whenever a complete binlog split (re-)enters reading, including after a restore.

Coordinator-side (gated release)

  • On the event, MySqlHybridSplitAssigner records that the reader has assembled the metadata.
  • snapshotState() schedules the release at the current checkpoint id; the actual clearing happens in notifyCheckpointComplete() once that checkpoint completes.
  • releaseSnapshotMetadata() clears assignedSplits, splitFinishedOffsets, and tableSchemas. alreadyProcessedTables, the assigner status, and the chunk splitter state are kept so a restore does not re-discover tables.
  • MySqlSourceEnumerator clears its cached binlogSplitMeta at the same point.
  • Release is skipped entirely when scan.newly-added-table.enabled is set, since that flow may still need the metadata to extend the binlog split.

Why the release is safe

  • The clearing happens only in notifyCheckpointComplete(), never in snapshotState(). Flink only re-delivers (addSplitsBack) assignments not covered by the last completed checkpoint. The binlog split was assigned before the release-scheduling checkpoint, so by the time release runs it is checkpoint-covered and can never be added back into an emptied assigner. createBinlogSplit() also asserts the metadata has not been released, turning any future regression into a fast failure instead of silent data loss.
  • If the binlog split is added back before release (reader failover between scheduling and the checkpoint completing), the scheduled release is reset and the metadata is still intact, so the split is recreated normally.
  • On restore from a released ("light") state, the maps are empty, isBinlogSplitAssigned is true, and alreadyProcessedTables is non-empty, so the assigner does not re-discover tables or recreate the binlog split. The reader restores its own complete split, requests no metadata, and re-sends the assembled event, which makes the release idempotent.
  • The state and serializer format are unchanged. A released state is a normal SnapshotPendingSplitsState with empty maps, so it serializes under the existing version and stays compatible on restore.

Tying the assembled event to its assignment

The assembled event has to belong to a specific assignment. Flink does not drop source events from a failed reader attempt (in the non-speculative path SourceCoordinator hands the event to the enumerator with no attempt check), and the event itself carried no identity. So on a large table this interleaving was possible:

  1. The reader assembles the complete split over many meta-group requests and sends the assembled event.
  2. The reader fails; the split is added back and re-created with empty metadata.
  3. A fresh reader starts re-requesting meta groups, which spans several checkpoints for a large table.
  4. The stale event from the dead attempt arrives late and re-arms the release.
  5. The release fires while the fresh reader still needs the metadata, and its next meta request finds nothing.

To close this, each assignment carries a generation that the coordinator bumps on every add-back. It is stamped on the meta groups served to the reader and echoed back in the assembled event, and the coordinator only releases when the reported generation matches the current one. A stale event carries an older generation and is ignored; the fresh reader learns the current generation through its own meta requests. A split small enough to ship inline never requests meta groups, so its reader reports a sentinel the coordinator always accepts; that is safe because an inline reader never needs the coordinator's metadata. This stays entirely on the transient source events, so no serializer or checkpoint-state change is needed.

Parallelism

The release logic lives only in the singleton split enumerator, and there is a single binlog split assigned to one reader, so there are no per-subtask copies of the release state to reconcile. The one parallelism-relevant case is failover of the binlog-holding reader, which the generation check and the add-back reset above already cover. No extra cross-subtask handling is required.

Tests

  • MySqlSnapshotMetadataReleaseTest (new): release after the assembled event plus a completed checkpoint; no release without the event; no release before the checkpoint completes; no release when newly-added-table is enabled; add-back resets the schedule and the binlog split is recreated with full metadata; a stale old-generation event after a failover does not trigger release; an inline split still releases after a failover bumps the generation; and a restore from a released light state does not re-discover tables or recreate the binlog split.
  • PendingSplitsStateSerializerTest#testSerializeAndDeserializeReleasedHybridState: round-trips the released ("light") hybrid state, confirming empty maps and isBinlogSplitAssigned survive serialization under the current version.
  • MySqlSourceReaderTest: the reader emits the assembled event when a binlog split enters reading, echoing the generation it was served for a split assembled from divided meta groups, and the inline sentinel for a split that arrived complete.
  • MySqlSourceEnumeratorTest: the assembled event is a safe no-op when the assigner is binlog-only rather than hybrid.
  • MySqlSourceITCase: TaskManager and JobManager failover in the binlog phase, plus reads across multiple tables at higher parallelism, all pass with no data loss.

Notes

  • The common path is unchanged until the source is well into the binlog phase; release only happens after the binlog split is assigned and one covering checkpoint completes.
  • No new configuration, and no state or serializer version bump.

@spoorthibasu spoorthibasu force-pushed the FLINK-39775-release-snapshot-split-metadata branch 3 times, most recently from a7df816 to ef24453 Compare May 31, 2026 21:32
@spoorthibasu
Copy link
Copy Markdown
Author

spoorthibasu commented May 31, 2026

Hi @lvyanquan , this PR is ready for review when you have time. It fixes a JobManager OOM on large tables by releasing the snapshot split metadata once the source is in the binlog phase. The description walks through the reasoning and the failover cases. CI is waiting on approval, could you please trigger it when you get a chance? Thanks!

…he stream phase

After the snapshot phase finishes and the binlog split is assigned, the source
coordinator kept the full snapshot split metadata (assigned splits, finished
offsets, table schemas) around forever. snapshotState() rebuilt the whole
SnapshotPendingSplitsState on every checkpoint, notifyCheckpointComplete() never
dropped it, and the enumerator held a second copy in its binlog meta cache. For a
table with many finished splits (around 300K for a 2.5B-row table at the default
chunk size) this kept JobManager memory high and pushed up direct memory during
checkpoint serialization, eventually OOM-killing the container.

The reader now tells the coordinator (BinlogSplitMetaAssembledEvent) once it
holds the complete binlog split. The coordinator schedules the release in
snapshotState() and performs it in notifyCheckpointComplete() after the covering
checkpoint completes, so the binlog split assignment is always checkpoint-covered
and can never be added back into an emptied assigner. Release is skipped when
scan.newly-added-table is enabled, and the state/serializer format is unchanged
(a released state is just empty maps).

The assembled event has to be tied to a specific assignment. Flink does not drop
source events from a failed attempt, and the event carried no identity, so after
a reader failover a leftover event from the dead attempt could reach the
coordinator and re-arm the release while the new reader was still pulling meta
groups, which spans several checkpoints for a large table. The release would then
fire and the new reader's next meta request would come back empty. Each
assignment now has a generation that bumps on every add-back; the coordinator
stamps it on the meta groups it serves and the reader echoes it back, so release
only happens when the two match and a stale event is ignored. A split small
enough to ship inline requests no meta groups, so its reader reports a sentinel
the coordinator always accepts, since an inline reader never needs the metadata.
The release state lives only in the singleton coordinator and there is a single
binlog split, so parallel readers need no handling beyond this failover case.

Tests cover the release protocol (assembled event plus a completed checkpoint, no
release without the event or before the checkpoint completes, no release with
newly-added-table on, add-back resetting the schedule, restore from a released
light state), the stale-event case where an old-generation event after a failover
must not trigger release, the inline-split case where release still happens after
a generation bump, a reader-side check that the assembled event carries the served
generation or the inline sentinel, a serializer round-trip of the released light
state, and the assembled event being a no-op for a binlog-only assigner.
@spoorthibasu spoorthibasu force-pushed the FLINK-39775-release-snapshot-split-metadata branch from ef24453 to 78e0b7f Compare May 31, 2026 23:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant