diff --git a/flink-python/pyflink/common/tests/test_typeinfo.py b/flink-python/pyflink/common/tests/test_typeinfo.py index c40f91d5cb26c..c909d863b59dd 100644 --- a/flink-python/pyflink/common/tests/test_typeinfo.py +++ b/flink-python/pyflink/common/tests/test_typeinfo.py @@ -17,7 +17,9 @@ ################################################################################ from pyflink.common.typeinfo import Types, RowTypeInfo, TupleTypeInfo, _from_java_type +from pyflink.java_gateway import get_gateway from pyflink.testing.test_case_utils import PyFlinkTestCase +from pyflink.util.java_utils import to_jarray class TypeInfoTests(PyFlinkTestCase): @@ -143,3 +145,21 @@ def test_from_java_type(self): list_type_info = Types.LIST(Types.INT()) self.assertEqual(list_type_info, _from_java_type(list_type_info.get_java_type_info())) + + def test_internal_type_info(self): + # InternalTypeInfo is produced by sources such as CsvReaderFormat. It bridges to + # RowData internally, so it must be converted through its logical type back to a + # PyFlink type. See FLINK-39724. + gateway = get_gateway() + JInternalTypeInfo = \ + gateway.jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo + JVarCharType = gateway.jvm.org.apache.flink.table.types.logical.VarCharType + JDoubleType = gateway.jvm.org.apache.flink.table.types.logical.DoubleType + JRowType = gateway.jvm.org.apache.flink.table.types.logical.RowType + JLogicalType = gateway.jvm.org.apache.flink.table.types.logical.LogicalType + + j_field_types = to_jarray(JLogicalType, [JVarCharType(), JDoubleType()]) + j_internal_type_info = JInternalTypeInfo.of(JRowType.of(j_field_types)) + + self.assertEqual(Types.ROW([Types.STRING(), Types.DOUBLE()]), + _from_java_type(j_internal_type_info)) diff --git a/flink-python/pyflink/common/typeinfo.py b/flink-python/pyflink/common/typeinfo.py index 861231ed167dc..2081ef61c7fc4 100644 --- a/flink-python/pyflink/common/typeinfo.py +++ b/flink-python/pyflink/common/typeinfo.py @@ -1115,6 +1115,16 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation: return ExternalTypeInfo(_from_java_type( TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType()))) + JInternalTypeInfo = gateway.jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo + if _is_instance_of(j_type_info, JInternalTypeInfo): + LogicalTypeDataTypeConverter = \ + gateway.jvm.org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter + TypeInfoDataTypeConverter = \ + gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter + return _from_java_type( + TypeInfoDataTypeConverter.toLegacyTypeInfo( + LogicalTypeDataTypeConverter.toDataType(j_type_info.toLogicalType()))) + raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info)