Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ public static AvroToRowDataConverter createRowConverter(RowType rowType) {

public static AvroToRowDataConverter createRowConverter(
RowType rowType, boolean legacyTimestampMapping) {
final List<RowType.RowField> fields = rowType.getFields();
final AvroToRowDataConverter[] fieldConverters =
rowType.getFields().stream()
fields.stream()
.map(RowType.RowField::getType)
.map(type -> createNullableConverter(type, legacyTimestampMapping))
.toArray(AvroToRowDataConverter[]::new);
Expand All @@ -84,10 +85,20 @@ public static AvroToRowDataConverter createRowConverter(
return avroObject -> {
IndexedRecord record = (IndexedRecord) avroObject;
GenericRowData row = new GenericRowData(arity);
for (int i = 0; i < arity; ++i) {
// avro always deserialize successfully even though the type isn't matched
// so no need to throw exception about which field can't be deserialized
row.setField(i, fieldConverters[i].convert(record.get(i)));

// Try to access fields by name if possible (for field projection support)
if (record instanceof GenericRecord) {
GenericRecord genericRecord = (GenericRecord) record;
for (int i = 0; i < arity; ++i) {
String fieldName = fields.get(i).getName();
Object fieldValue = genericRecord.get(fieldName);
row.setField(i, fieldConverters[i].convert(fieldValue));
}
} else {
// Fallback to positional access for SpecificRecord
for (int i = 0; i < arity; ++i) {
row.setField(i, fieldConverters[i].convert(record.get(i)));
}
}
return row;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ public T deserialize(@Nullable byte[] message) throws IOException {
GenericDatumReader<T> datumReader = getDatumReader();

datumReader.setSchema(writerSchema);
datumReader.setExpected(readerSchema);

// Create a merged expected schema that:
// 1. Uses reader schema field structure (correct field order for field projection)
// 2. But patches in writer schema types where incompatible (e.g., enum instead of string)
// This combined with name-based field access in AvroToRowDataConverters allows both
// Debezium-style field projection and enum handling to work correctly.
Schema expectedSchema =
readerSchema != null ? mergeSchemaTypes(readerSchema, writerSchema) : writerSchema;

datumReader.setExpected(expectedSchema);

if (getEncoding() == AvroEncoding.JSON) {
((JsonDecoder) getDecoder()).configure(getInputStream());
Expand All @@ -109,6 +118,112 @@ public T deserialize(@Nullable byte[] message) throws IOException {
return datumReader.read(null, getDecoder());
}

/**
* Merges reader and writer schemas to create a hybrid schema. Uses reader schema structure
* (field order) but patches in writer types for incompatible conversions.
*
* @param reader the reader schema (from Flink DDL)
* @param writer the writer schema (from schema registry)
* @return merged schema with reader structure but writer types where needed
*/
private Schema mergeSchemaTypes(Schema reader, Schema writer) {
// If same type, check for specific incompatibilities
if (reader.getType() == writer.getType()) {
if (reader.getType() == Schema.Type.RECORD) {
return mergeRecordSchemas(reader, writer);
}
return reader;
}

// Handle union types
if (reader.getType() == Schema.Type.UNION) {
return mergeUnionSchema(reader, writer);
}
if (writer.getType() == Schema.Type.UNION) {
return mergeUnionSchema(reader, writer);
}

// Type mismatch: prefer writer type if it's enum and reader is string
if (writer.getType() == Schema.Type.ENUM && reader.getType() == Schema.Type.STRING) {
return writer;
}

return reader;
}

private Schema mergeRecordSchemas(Schema reader, Schema writer) {
java.util.List<Schema.Field> mergedFields = new java.util.ArrayList<>();

for (Schema.Field readerField : reader.getFields()) {
Schema.Field writerField = writer.getField(readerField.name());

if (writerField != null) {
// Field exists in both - merge types
Schema mergedFieldSchema =
mergeSchemaTypes(readerField.schema(), writerField.schema());
mergedFields.add(
new Schema.Field(
readerField.name(),
mergedFieldSchema,
readerField.doc(),
readerField.defaultVal()));
} else {
// Field only in reader - keep as is
mergedFields.add(
new Schema.Field(
readerField.name(),
readerField.schema(),
readerField.doc(),
readerField.defaultVal()));
}
}

Schema merged =
Schema.createRecord(
reader.getName(), reader.getDoc(), reader.getNamespace(), reader.isError());
merged.setFields(mergedFields);
return merged;
}

private Schema mergeUnionSchema(Schema reader, Schema writer) {
java.util.List<Schema> readerTypes =
reader.getType() == Schema.Type.UNION
? reader.getTypes()
: java.util.Collections.singletonList(reader);
java.util.List<Schema> writerTypes =
writer.getType() == Schema.Type.UNION
? writer.getTypes()
: java.util.Collections.singletonList(writer);

// Find non-null types
Schema readerNonNull = null;
boolean readerHasNull = false;
for (Schema type : readerTypes) {
if (type.getType() == Schema.Type.NULL) {
readerHasNull = true;
} else {
readerNonNull = type;
}
}

Schema writerNonNull = null;
for (Schema type : writerTypes) {
if (type.getType() != Schema.Type.NULL) {
writerNonNull = type;
}
}

if (readerNonNull != null && writerNonNull != null) {
Schema mergedNonNull = mergeSchemaTypes(readerNonNull, writerNonNull);
if (readerHasNull) {
return Schema.createUnion(Schema.create(Schema.Type.NULL), mergedNonNull);
}
return mergedNonNull;
}

return reader;
}

@Override
void checkAvroInitialized() throws IOException {
super.checkAvroInitialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,93 @@ public void writeSchema(Schema schema, OutputStream out)
assertThat(simpleRecord.getName().toString()).isEqualTo("someName");
assertThat(simpleRecord.getOptionalField()).isNull();
}

@Test
void testNestedRecordWithEnumField() throws IOException {
// Writer schema with nested record containing enum field
// This pattern is common in CDC tools and schema registries
Schema writerSchema =
new Schema.Parser()
.parse(
"{\"namespace\": \"example.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"Message\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"metadata\", \"type\": {\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"Metadata\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"operation\", \"type\": {\n"
+ " \"type\": \"enum\",\n"
+ " \"name\": \"Operation\",\n"
+ " \"symbols\": [\"INSERT\", \"UPDATE\", \"DELETE\", \"REFRESH\"]\n"
+ " }},\n"
+ " {\"name\": \"timestamp\", \"type\": \"string\"}\n"
+ " ]\n"
+ " }}\n"
+ " ]\n"
+ "}");

// Reader schema with operation as string (simulates Flink DDL with VARCHAR)
Schema readerSchema =
new Schema.Parser()
.parse(
"{\"namespace\": \"example.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"Message\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"metadata\", \"type\": {\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"Metadata\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"operation\", \"type\": \"string\"},\n"
+ " {\"name\": \"timestamp\", \"type\": \"string\"}\n"
+ " ]\n"
+ " }}\n"
+ " ]\n"
+ "}");

// Create deserializer with reader schema (enum -> string incompatibility)
RegistryAvroDeserializationSchema<GenericRecord> deserializer =
new RegistryAvroDeserializationSchema<>(
GenericRecord.class,
readerSchema,
() ->
new SchemaCoder() {
@Override
public Schema readSchema(InputStream in) {
return writerSchema;
}

@Override
public void writeSchema(Schema schema, OutputStream out)
throws IOException {
// do nothing
}
});

// Create test record with enum value
GenericData.Record record = new GenericData.Record(writerSchema);
Schema metadataSchema = writerSchema.getField("metadata").schema();
GenericData.Record metadataRecord = new GenericData.Record(metadataSchema);

Schema operationEnumSchema = metadataSchema.getField("operation").schema();
metadataRecord.put("operation", new GenericData.EnumSymbol(operationEnumSchema, "UPDATE"));
metadataRecord.put("timestamp", "2024-01-15T10:30:00Z");

record.put("metadata", metadataRecord);

// Deserialize - should succeed with writer schema approach
GenericRecord result = deserializer.deserialize(writeRecord(record, writerSchema));

GenericRecord metadata = (GenericRecord) result.get("metadata");
assertThat(metadata.get("operation").toString()).isEqualTo("UPDATE");
assertThat(metadata.get("timestamp").toString()).isEqualTo("2024-01-15T10:30:00Z");

// Verify the operation field is an EnumSymbol that can be converted to string
// This is how Flink's AvroToRowDataConverters handles enum -> VARCHAR conversion
Object operation = metadata.get("operation");
assertThat(operation).isInstanceOf(GenericData.EnumSymbol.class);
assertThat(operation.toString()).isEqualTo("UPDATE");
}
}