[FLINK-20454][formats] Add metadata support for Debezium Avro#28498
[FLINK-20454][formats] Add metadata support for Debezium Avro#28498p-eye wants to merge 1 commit into
Conversation
Changes: - Add DebeziumAvroDecodingFormat with metadata support - Implement 6 metadata fields: ingestion-timestamp, source.timestamp, source.database, source.schema, source.table, source.properties - Add metadata deserialization test - Update documentation with Avro format examples
1424b4d to
02c4a1b
Compare
weiqingy
left a comment
There was a problem hiding this comment.
Thanks for taking this on — bringing the Avro Debezium format to metadata parity with debezium-json is a useful gap to close, and extracting the anonymous ProjectableDecodingFormat into a named DebeziumAvroDecodingFormat that mirrors the JSON side reads cleanly. A few questions inline, mostly around how the source block holds up across Debezium connectors.
| } | ||
| } | ||
|
|
||
| private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = { |
There was a problem hiding this comment.
debezium-json models source as a connector-agnostic MAP<STRING, STRING> (DebeziumJsonDecodingFormat's SOURCE_* entries all use DataTypes.FIELD("source", DataTypes.MAP(...))), so it reads whatever fields a given Debezium connector emits. Here source is a fixed-order ROW of 13 named fields, and several (scn, commit_scn, lcr_position) are Oracle/XStream-specific while schema is absent from MySQL's source.
Since AvroToRowDataConverters.createRowConverter reads positionally (record.get(i)), this reader ROW has to line up — via Avro name resolution — with the actual writer schema in the registry. Different connectors (MySQL: server_id/gtid/file/pos; Postgres: lsn/xmin; MongoDB: different again) emit different source shapes. How do you see this behaving when the registered schema's source doesn't match this list — does Avro resolution fill the missing fields as null, or does it reject? I'm wondering whether mirroring the JSON side's flexible map would sidestep the per-connector coupling entirely. Asking rather than asserting here — you may have a constraint I'm not seeing.
There was a problem hiding this comment.
I looked into using MAP like debezium-json, but hit a constraint:
Debezium's source is an Avro record type (not map). Flink's AvroToRowDataConverters maps Avro types to Flink types — map → MAP, record → ROW. There's no record → MAP converter.
If we define source as MAP in Flink schema, AvroSchemaConverter will generate an Avro map schema, but the registered schema has soucre as a record type. Based on Avro's schema resolution rules (record ↔ MAP not compatible), this should fail during deserialization.
To make it connector-agnostic, two options:
-
Add record → MAP converter to
AvroToRowDataConverters— extracts fields from runtime schema dynamically. Clean, but touches flink-avro core. -
Handle
sourceinDebeziumAvroDeserializationSchema— keep asGenericRecord, convert to MAP using runtime schema. No core changes, but bypasses standard layer.
Which fits better with Flink's direction?
There was a problem hiding this comment.
Thanks for digging into this — the record-vs-map constraint is real and correctly diagnosed. The converter switch only wires record → ROW (AvroToRowDataConverters.java:148-149) and map → MAP (:150-152), so declaring source as MAP would have AvroSchemaConverter build a valid Avro map schema that then can't resolve against the registry's record — per Avro's resolution rules a writer record and a reader map aren't compatible, so it fails at decode. Good catch.
To close the open question from my first comment (mismatched source → null or reject): it resolves by name — RegistryAvroDeserializationSchema sets both writer and reader schema on the datum reader (RegistryAvroDeserializationSchema.java:102-103), so the record comes back in reader order and the positional reads stay aligned. Resolution then splits by nullability: the connector-specific fields are nullable → withDefault(null) (AvroSchemaConverter.java:554-557), so scn/commit_scn/lcr_position come back null when a connector omits them; but version/connector/name/db/table are non-nullable → noDefault(), and a default-less reader field the writer lacks makes Avro throw and fail the whole message. So the fixed ROW's real limitation is that it can't surface fields it doesn't list (MySQL gtid, Postgres lsn) — and, latently, the non-nullable version/connector/name/db/table would hard-fail any connector whose source doesn't carry those exact names. A connector-agnostic map sidesteps both.
I'd lean to option (2). Both options stringify the source fields into a MAP<STRING,STRING> — that lossiness is inherent to record→map either way — so the deciding factor is blast radius: (1) puts an opinionated record→map coercion into shared AvroToRowDataConverters, where any Avro user declaring a MAP target over a record field would hit it implicitly, to serve a Debezium-specific need; (2) keeps it local and explicit. And the registered source schema isn't available at planning time anyway (the schema option is the optional user override, normally null; the real one resolves from the registry by ID at runtime), so a runtime handler in DebeziumAvroDeserializationSchema is its natural home.
One note if you take (2): to actually gain agnosticism it should build the map from the connector's writer source record — before it's projected onto the 13-field reader ROW — otherwise the connector-only fields are already dropped and it's just the same fixed list re-shaped as a map. The typed accessors (source.database, …) could then key into that map by name, and the positional SOURCE_PROPERTY_POSITION assumption goes away.
On scope, my read is that the current typed set is enough to land this PR for the initial parity — assuming the supported connectors are documented — with the connector-agnostic map as a follow-up; that keeps the change focused and unblocks the relational connectors now. The final scope and merge call is of course the committers' to make.
| assertThat(row.getDouble(3)).isEqualTo(21.799999237060547); | ||
|
|
||
| // Metadata: ingestion-timestamp (field index 4) | ||
| assertThat(row.getTimestamp(4, 3)).isNotNull(); |
There was a problem hiding this comment.
The two timestamp metadata fields are asserted only with isNotNull(), so a converter that extracted the wrong field (or a constant) would still pass — could these assert the exact expected values from the fixture, the way the physical columns do? Relatedly, source.schema isn't asserted at all (the comment notes it may be null for MySQL), and the fixture is a single connector. Given the source shape is the riskiest part of this change, a second connector's fixture would buy a lot of confidence — though I'd understand deferring that if test data is hard to generate.
| // Debezium Avro contains other information, e.g. "source", "ts_ms" | ||
| // but we don't need them | ||
| return (RowType) | ||
| public static RowType createDebeziumAvroRowType( |
There was a problem hiding this comment.
This comment moved into the parameter list and now reads the opposite of what the code does — it says "but we don't need them," yet the method now does append source/ts_ms to extract metadata. Worth updating so it describes the current behavior (and moving it back above the params for readability).
|
|
||
| /** List of metadata that can be read with this format. */ | ||
| enum ReadableMetadata { | ||
| INGESTION_TIMESTAMP( |
There was a problem hiding this comment.
Minor: this inner convert returning row looks unreachable — the wrapper in DebeziumAvroDeserializationSchema only calls m.converter.convert(...) when the resolved field is a GenericRowData, and the top-level ts_ms is a timestamp scalar, so this branch never fires. Could drop the body to a no-op/null to avoid implying it runs.
| // ---------------------------------------------------------------------------------------- | ||
|
|
||
| /** List of metadata that can be read with this format. */ | ||
| enum ReadableMetadata { |
There was a problem hiding this comment.
debezium-json exposes a 7th metadata, schema (the inline Connect schema), which isn't in this enum. I'm assuming that's intentional because the Confluent format carries the schema in the registry rather than inline, so there's nothing to expose — is that the reasoning, or just out of scope for this PR?
There was a problem hiding this comment.
Yes, intentional.
The debezium-json format exposes the inline schema field because it is embedded in each JSON message. However, the Confluent Avro format stores schemas in the registry and messages only carry the schema ID. Since there is no inline schema field in the Avro payload itself, there is nothing to expose as metadata.
What is the purpose of the change
This pull request implements metadata reading support for the
debezium-avro-confluentformat, bringing feature parity with thedebezium-jsonformat. Users can now access Debezium metadata fields (such as source database, schema, table, and timestamps) when using Avro-encoded Debezium messages from Kafka.JIRA: FLINK-20454
Brief change log
DebeziumAvroDecodingFormatimplementingProjectableDecodingFormatwith metadata supportingestion-timestamp,source.timestamp,source.databases,source.schema,schema name,source.table,source.properties)Verifying this change
This change added tests and can be verified as follows:
DebeziumAvroSerDeSchemaTest.testDeserializationWithMetadata()that validates all 6 metadata fields (ingestion-timestamp, source.timestamp, source.database, source.schema, source.table, source.properties) are correctly extracted from Debezium Avro CDC messages.DebeziumAvroFormatFactoryTest.testSeDeSchema()andtestSeDeSchemaWithSchemaOption()to verify the factory creates deserialization schemas with correct metadata support parameterstestDeleteDataDeserialization(),testSeDeSchemaWithInvalidSchemaOption()) pass without modification, ensuring backward compatibilityDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Sonnect 4.5