From e1ba6d746dd6ab0c06bbb770266e2df289d37642 Mon Sep 17 00:00:00 2001 From: Christophe Rodriguez Date: Fri, 19 Jun 2026 15:33:12 +0200 Subject: [PATCH] [FLINK-24544][formats] Fix Avro enum deserialization failure with Confluent Schema Registry --- .../formats/avro/AvroToRowDataConverters.java | 21 +++- .../RegistryAvroDeserializationSchema.java | 117 +++++++++++++++++- ...RegistryAvroDeserializationSchemaTest.java | 89 +++++++++++++ 3 files changed, 221 insertions(+), 6 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java index 9c63c56c4dcba..9023466758642 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java @@ -74,8 +74,9 @@ public static AvroToRowDataConverter createRowConverter(RowType rowType) { public static AvroToRowDataConverter createRowConverter( RowType rowType, boolean legacyTimestampMapping) { + final List fields = rowType.getFields(); final AvroToRowDataConverter[] fieldConverters = - rowType.getFields().stream() + fields.stream() .map(RowType.RowField::getType) .map(type -> createNullableConverter(type, legacyTimestampMapping)) .toArray(AvroToRowDataConverter[]::new); @@ -84,10 +85,20 @@ public static AvroToRowDataConverter createRowConverter( return avroObject -> { IndexedRecord record = (IndexedRecord) avroObject; GenericRowData row = new GenericRowData(arity); - for (int i = 0; i < arity; ++i) { - // avro always deserialize successfully even though the type isn't matched - // so no need to throw exception about which field can't be deserialized - row.setField(i, fieldConverters[i].convert(record.get(i))); + + // Try to access fields by name if possible (for field projection support) + if (record instanceof GenericRecord) { + GenericRecord genericRecord = (GenericRecord) record; + for (int i = 0; i < arity; ++i) { + String fieldName = fields.get(i).getName(); + Object fieldValue = genericRecord.get(fieldName); + row.setField(i, fieldConverters[i].convert(fieldValue)); + } + } else { + // Fallback to positional access for SpecificRecord + for (int i = 0; i < arity; ++i) { + row.setField(i, fieldConverters[i].convert(record.get(i))); + } } return row; }; diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java index 0310582f6cae2..7258d98e7e38d 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java @@ -100,7 +100,16 @@ public T deserialize(@Nullable byte[] message) throws IOException { GenericDatumReader datumReader = getDatumReader(); datumReader.setSchema(writerSchema); - datumReader.setExpected(readerSchema); + + // Create a merged expected schema that: + // 1. Uses reader schema field structure (correct field order for field projection) + // 2. But patches in writer schema types where incompatible (e.g., enum instead of string) + // This combined with name-based field access in AvroToRowDataConverters allows both + // Debezium-style field projection and enum handling to work correctly. + Schema expectedSchema = + readerSchema != null ? mergeSchemaTypes(readerSchema, writerSchema) : writerSchema; + + datumReader.setExpected(expectedSchema); if (getEncoding() == AvroEncoding.JSON) { ((JsonDecoder) getDecoder()).configure(getInputStream()); @@ -109,6 +118,112 @@ public T deserialize(@Nullable byte[] message) throws IOException { return datumReader.read(null, getDecoder()); } + /** + * Merges reader and writer schemas to create a hybrid schema. Uses reader schema structure + * (field order) but patches in writer types for incompatible conversions. + * + * @param reader the reader schema (from Flink DDL) + * @param writer the writer schema (from schema registry) + * @return merged schema with reader structure but writer types where needed + */ + private Schema mergeSchemaTypes(Schema reader, Schema writer) { + // If same type, check for specific incompatibilities + if (reader.getType() == writer.getType()) { + if (reader.getType() == Schema.Type.RECORD) { + return mergeRecordSchemas(reader, writer); + } + return reader; + } + + // Handle union types + if (reader.getType() == Schema.Type.UNION) { + return mergeUnionSchema(reader, writer); + } + if (writer.getType() == Schema.Type.UNION) { + return mergeUnionSchema(reader, writer); + } + + // Type mismatch: prefer writer type if it's enum and reader is string + if (writer.getType() == Schema.Type.ENUM && reader.getType() == Schema.Type.STRING) { + return writer; + } + + return reader; + } + + private Schema mergeRecordSchemas(Schema reader, Schema writer) { + java.util.List mergedFields = new java.util.ArrayList<>(); + + for (Schema.Field readerField : reader.getFields()) { + Schema.Field writerField = writer.getField(readerField.name()); + + if (writerField != null) { + // Field exists in both - merge types + Schema mergedFieldSchema = + mergeSchemaTypes(readerField.schema(), writerField.schema()); + mergedFields.add( + new Schema.Field( + readerField.name(), + mergedFieldSchema, + readerField.doc(), + readerField.defaultVal())); + } else { + // Field only in reader - keep as is + mergedFields.add( + new Schema.Field( + readerField.name(), + readerField.schema(), + readerField.doc(), + readerField.defaultVal())); + } + } + + Schema merged = + Schema.createRecord( + reader.getName(), reader.getDoc(), reader.getNamespace(), reader.isError()); + merged.setFields(mergedFields); + return merged; + } + + private Schema mergeUnionSchema(Schema reader, Schema writer) { + java.util.List readerTypes = + reader.getType() == Schema.Type.UNION + ? reader.getTypes() + : java.util.Collections.singletonList(reader); + java.util.List writerTypes = + writer.getType() == Schema.Type.UNION + ? writer.getTypes() + : java.util.Collections.singletonList(writer); + + // Find non-null types + Schema readerNonNull = null; + boolean readerHasNull = false; + for (Schema type : readerTypes) { + if (type.getType() == Schema.Type.NULL) { + readerHasNull = true; + } else { + readerNonNull = type; + } + } + + Schema writerNonNull = null; + for (Schema type : writerTypes) { + if (type.getType() != Schema.Type.NULL) { + writerNonNull = type; + } + } + + if (readerNonNull != null && writerNonNull != null) { + Schema mergedNonNull = mergeSchemaTypes(readerNonNull, writerNonNull); + if (readerHasNull) { + return Schema.createUnion(Schema.create(Schema.Type.NULL), mergedNonNull); + } + return mergedNonNull; + } + + return reader; + } + @Override void checkAvroInitialized() throws IOException { super.checkAvroInitialized(); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java index 8a8fbd9d85f56..f6500a4fb1787 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java @@ -118,4 +118,93 @@ public void writeSchema(Schema schema, OutputStream out) assertThat(simpleRecord.getName().toString()).isEqualTo("someName"); assertThat(simpleRecord.getOptionalField()).isNull(); } + + @Test + void testNestedRecordWithEnumField() throws IOException { + // Writer schema with nested record containing enum field + // This pattern is common in CDC tools and schema registries + Schema writerSchema = + new Schema.Parser() + .parse( + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Message\",\n" + + " \"fields\": [\n" + + " {\"name\": \"metadata\", \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Metadata\",\n" + + " \"fields\": [\n" + + " {\"name\": \"operation\", \"type\": {\n" + + " \"type\": \"enum\",\n" + + " \"name\": \"Operation\",\n" + + " \"symbols\": [\"INSERT\", \"UPDATE\", \"DELETE\", \"REFRESH\"]\n" + + " }},\n" + + " {\"name\": \"timestamp\", \"type\": \"string\"}\n" + + " ]\n" + + " }}\n" + + " ]\n" + + "}"); + + // Reader schema with operation as string (simulates Flink DDL with VARCHAR) + Schema readerSchema = + new Schema.Parser() + .parse( + "{\"namespace\": \"example.avro\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Message\",\n" + + " \"fields\": [\n" + + " {\"name\": \"metadata\", \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Metadata\",\n" + + " \"fields\": [\n" + + " {\"name\": \"operation\", \"type\": \"string\"},\n" + + " {\"name\": \"timestamp\", \"type\": \"string\"}\n" + + " ]\n" + + " }}\n" + + " ]\n" + + "}"); + + // Create deserializer with reader schema (enum -> string incompatibility) + RegistryAvroDeserializationSchema deserializer = + new RegistryAvroDeserializationSchema<>( + GenericRecord.class, + readerSchema, + () -> + new SchemaCoder() { + @Override + public Schema readSchema(InputStream in) { + return writerSchema; + } + + @Override + public void writeSchema(Schema schema, OutputStream out) + throws IOException { + // do nothing + } + }); + + // Create test record with enum value + GenericData.Record record = new GenericData.Record(writerSchema); + Schema metadataSchema = writerSchema.getField("metadata").schema(); + GenericData.Record metadataRecord = new GenericData.Record(metadataSchema); + + Schema operationEnumSchema = metadataSchema.getField("operation").schema(); + metadataRecord.put("operation", new GenericData.EnumSymbol(operationEnumSchema, "UPDATE")); + metadataRecord.put("timestamp", "2024-01-15T10:30:00Z"); + + record.put("metadata", metadataRecord); + + // Deserialize - should succeed with writer schema approach + GenericRecord result = deserializer.deserialize(writeRecord(record, writerSchema)); + + GenericRecord metadata = (GenericRecord) result.get("metadata"); + assertThat(metadata.get("operation").toString()).isEqualTo("UPDATE"); + assertThat(metadata.get("timestamp").toString()).isEqualTo("2024-01-15T10:30:00Z"); + + // Verify the operation field is an EnumSymbol that can be converted to string + // This is how Flink's AvroToRowDataConverters handles enum -> VARCHAR conversion + Object operation = metadata.get("operation"); + assertThat(operation).isInstanceOf(GenericData.EnumSymbol.class); + assertThat(operation.toString()).isEqualTo("UPDATE"); + } }