[FLINK-39724][python] Support InternalTypeInfo in PyFlink type extraction#28490
Open
wilmerdooley wants to merge 1 commit into
Open
[FLINK-39724][python] Support InternalTypeInfo in PyFlink type extraction#28490wilmerdooley wants to merge 1 commit into
wilmerdooley wants to merge 1 commit into
Conversation
…tion When a PyFlink DataStream is produced by sources such as CsvReaderFormat, its underlying Java TypeInformation is an InternalTypeInfo wrapping a RowData logical type. _from_java_type did not recognize this class, so DataStream.get_type() and assign_timestamps_and_watermarks() raised TypeError. This adds a branch in _from_java_type that converts InternalTypeInfo through its logical type back to a legacy TypeInformation that maps to a PyFlink type, with a unit test covering the round-trip from InternalTypeInfo to Types.ROW(...). Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com> Generated-by: Claude Code
Collaborator
15f00c4 to
a3ba637
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
When a PyFlink
DataStreamis produced by sources such asCsvReaderFormat, its underlying JavaTypeInformationis anInternalTypeInfowrapping aRowDatalogical type. The_from_java_typehelper inflink-pythondid not recognize this class, so calls likeDataStream.get_type()andDataStream.assign_timestamps_and_watermarks()raisedTypeError: The java type info: ... is not supported in PyFlink currently., forcing users to insert an identitymapas a workaround.Brief change log
flink-python/pyflink/common/typeinfo.py: in_from_java_type, detectInternalTypeInfo, convert it back to its logical type, then run it throughLogicalTypeDataTypeConverterandLegacyTypeInfoDataTypeConverterto obtain a legacyTypeInformationthat maps to a PyFlink type.Verifying this change
This change added a unit test:
test_internal_type_infoinflink-python/pyflink/common/tests/test_typeinfo.pybuilds anInternalTypeInfoover aRowType(VARCHAR, DOUBLE)and asserts_from_java_typeround-trips it toTypes.ROW([Types.STRING(), Types.DOUBLE()]). The test fails on the pre-fix code, which raisedTypeError.Does this pull request potentially affect one of the following parts
@Public(Evolving): no (the change is confined to the private_from_java_typehelper in PyFlink)Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code
JIRA: https://issues.apache.org/jira/browse/FLINK-39724