Kafka Connect: Make KafkaMetadataTransform configuration per-instance#16602
Open
wombatu-kun wants to merge 1 commit into
Open
Kafka Connect: Make KafkaMetadataTransform configuration per-instance#16602wombatu-kun wants to merge 1 commit into
wombatu-kun wants to merge 1 commit into
Conversation
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #16601
Problem
KafkaMetadataTransformheld its configuration (recordAppender) in astaticfield, so every instance in a worker JVM shared one configuration and the lastconfigure()call won. Two differently-configured instances (two connectors that use the SMT, or two transform aliases in one chain) would emit each other's metadata field names and structure. It is also a thread-safety hazard, since the field is mutated from each task'sconfigure()and read from each task'sapply(). The other SMTs in the module (CopyValue,DebeziumTransform) already use instance fields.Solution
Make
recordAppenderan instance field. All reads are in instance methods, so the change is minimal, and the appender is still built once per instance inconfigure(), so there is no added cost.Tests
Added
TestKafkaMetadataTransform.testConfigIsNotSharedAcrossInstances: it configures two instances with different field names and verifies each instance uses its own configuration. It fails before the change and passes after.