From a3ba6376700662f8284a7482d71d32b5641e94ad Mon Sep 17 00:00:00 2001 From: wilmerdooley Date: Fri, 19 Jun 2026 19:03:31 +0000 Subject: [PATCH] [FLINK-39724][python] Support InternalTypeInfo in PyFlink type extraction When a PyFlink DataStream is produced by sources such as CsvReaderFormat, its underlying Java TypeInformation is an InternalTypeInfo wrapping a RowData logical type. _from_java_type did not recognize this class, so DataStream.get_type() and assign_timestamps_and_watermarks() raised TypeError. This adds a branch in _from_java_type that converts InternalTypeInfo through its logical type back to a legacy TypeInformation that maps to a PyFlink type, with a unit test covering the round-trip from InternalTypeInfo to Types.ROW(...). Signed-off-by: wilmerdooley Generated-by: Claude Code --- .../pyflink/common/tests/test_typeinfo.py | 20 +++++++++++++++++++ flink-python/pyflink/common/typeinfo.py | 10 ++++++++++ 2 files changed, 30 insertions(+) diff --git a/flink-python/pyflink/common/tests/test_typeinfo.py b/flink-python/pyflink/common/tests/test_typeinfo.py index c40f91d5cb26c..c909d863b59dd 100644 --- a/flink-python/pyflink/common/tests/test_typeinfo.py +++ b/flink-python/pyflink/common/tests/test_typeinfo.py @@ -17,7 +17,9 @@ ################################################################################ from pyflink.common.typeinfo import Types, RowTypeInfo, TupleTypeInfo, _from_java_type +from pyflink.java_gateway import get_gateway from pyflink.testing.test_case_utils import PyFlinkTestCase +from pyflink.util.java_utils import to_jarray class TypeInfoTests(PyFlinkTestCase): @@ -143,3 +145,21 @@ def test_from_java_type(self): list_type_info = Types.LIST(Types.INT()) self.assertEqual(list_type_info, _from_java_type(list_type_info.get_java_type_info())) + + def test_internal_type_info(self): + # InternalTypeInfo is produced by sources such as CsvReaderFormat. It bridges to + # RowData internally, so it must be converted through its logical type back to a + # PyFlink type. See FLINK-39724. + gateway = get_gateway() + JInternalTypeInfo = \ + gateway.jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo + JVarCharType = gateway.jvm.org.apache.flink.table.types.logical.VarCharType + JDoubleType = gateway.jvm.org.apache.flink.table.types.logical.DoubleType + JRowType = gateway.jvm.org.apache.flink.table.types.logical.RowType + JLogicalType = gateway.jvm.org.apache.flink.table.types.logical.LogicalType + + j_field_types = to_jarray(JLogicalType, [JVarCharType(), JDoubleType()]) + j_internal_type_info = JInternalTypeInfo.of(JRowType.of(j_field_types)) + + self.assertEqual(Types.ROW([Types.STRING(), Types.DOUBLE()]), + _from_java_type(j_internal_type_info)) diff --git a/flink-python/pyflink/common/typeinfo.py b/flink-python/pyflink/common/typeinfo.py index 861231ed167dc..2081ef61c7fc4 100644 --- a/flink-python/pyflink/common/typeinfo.py +++ b/flink-python/pyflink/common/typeinfo.py @@ -1115,6 +1115,16 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation: return ExternalTypeInfo(_from_java_type( TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType()))) + JInternalTypeInfo = gateway.jvm.org.apache.flink.table.runtime.typeutils.InternalTypeInfo + if _is_instance_of(j_type_info, JInternalTypeInfo): + LogicalTypeDataTypeConverter = \ + gateway.jvm.org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter + TypeInfoDataTypeConverter = \ + gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter + return _from_java_type( + TypeInfoDataTypeConverter.toLegacyTypeInfo( + LogicalTypeDataTypeConverter.toDataType(j_type_info.toLogicalType()))) + raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info)