diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java index 110365c76ee4..a7699d578b8c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -19,13 +19,19 @@ package org.apache.paimon.flink.lineage; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageVertex; import org.apache.flink.streaming.api.lineage.SourceLineageVertex; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -39,10 +45,23 @@ public class LineageUtils { private static final String PAIMON_DATASET_PREFIX = "paimon://"; + private static final String CATALOG_KEY = "catalog-key"; private static final Set PAIMON_OPTION_KEYS = CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet()); + /** Extracts the {@link CatalogContext} from a table, or null if not available. */ + @Nullable + private static CatalogContext catalogContext(Table table) { + if (table instanceof FileStoreTable) { + return ((FileStoreTable) table).catalogEnvironment().catalogContext(); + } + if (table instanceof FormatTable) { + return ((FormatTable) table).catalogContext(); + } + return null; + } + /** * Builds the config map for a dataset facet from a {@link Table}. Includes filtered Paimon * {@link CoreOptions}, partition keys, primary keys, and the table comment (if present). @@ -78,12 +97,36 @@ public static String getNamespace(Table table) { public static SourceLineageVertex sourceLineageVertex( String name, boolean isBounded, Table table) { LineageDataset dataset = - new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + new PaimonLineageDataset( + name, getNamespace(table), buildConfigMap(table), table.rowType()); Boundedness boundedness = isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED; return new PaimonSourceLineageVertex(boundedness, Collections.singletonList(dataset)); } + private static String getFullName(Table table) { + String name = table.fullName(); + CatalogContext ctx = catalogContext(table); + if (ctx != null) { + String catalogKey = ctx.options().toMap().get(CATALOG_KEY); + if (!StringUtils.isNullOrWhitespaceOnly(catalogKey)) { + name = catalogKey + "." + name; + } + } + return name; + } + + /** + * Creates a {@link SourceLineageVertex} for a Paimon DataStream source table. The table name is + * derived from the table's full name, prefixed with the {@code catalog-key} if available. + * + * @param isBounded whether the source is bounded (batch) or unbounded (streaming) + * @param table the Paimon table + */ + public static SourceLineageVertex sourceLineageVertex(boolean isBounded, Table table) { + return sourceLineageVertex(getFullName(table), isBounded, table); + } + /** * Creates a {@link LineageVertex} for a Paimon sink table. * @@ -92,7 +135,18 @@ public static SourceLineageVertex sourceLineageVertex( */ public static LineageVertex sinkLineageVertex(String name, Table table) { LineageDataset dataset = - new PaimonLineageDataset(name, getNamespace(table), buildConfigMap(table)); + new PaimonLineageDataset( + name, getNamespace(table), buildConfigMap(table), table.rowType()); return new PaimonSinkLineageVertex(Collections.singletonList(dataset)); } + + /** + * Creates a {@link LineageVertex} for a Paimon DataStream sink table. The table name is derived + * from the table's full name, prefixed with the {@code catalog-key} if available. + * + * @param table the Paimon table + */ + public static LineageVertex sinkLineageVertex(Table table) { + return sinkLineageVertex(getFullName(table), table); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java index 5e99df0b2d4d..c122d14cc83e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/PaimonLineageDataset.java @@ -18,11 +18,19 @@ package org.apache.paimon.flink.lineage; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaField; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; +import javax.annotation.Nullable; + import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -34,11 +42,21 @@ public class PaimonLineageDataset implements LineageDataset { private final String name; private final String namespace; private final Map tableOptions; + @Nullable private final RowType rowType; public PaimonLineageDataset(String name, String namespace, Map tableOptions) { + this(name, namespace, tableOptions, null); + } + + public PaimonLineageDataset( + String name, + String namespace, + Map tableOptions, + @Nullable RowType rowType) { this.name = name; this.namespace = namespace; this.tableOptions = tableOptions; + this.rowType = rowType; } @Override @@ -67,6 +85,39 @@ public Map config() { return tableOptions; } }); + if (rowType != null) { + facets.put( + "schema", + new DatasetSchemaFacet() { + @Override + public String name() { + return "schema"; + } + + @Override + public Map> fields() { + Map> result = new LinkedHashMap<>(); + for (DataField field : rowType.getFields()) { + String fieldName = field.name(); + String fieldType = field.type().asSQLString(); + result.put( + fieldName, + new DatasetSchemaField() { + @Override + public String name() { + return fieldName; + } + + @Override + public String type() { + return fieldType; + } + }); + } + return result; + } + }); + } return facets; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java index 3d133a9ceace..a223ad51295e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSink.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.FlinkRowWrapper; +import org.apache.paimon.flink.lineage.LineageUtils; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.format.FormatTableWrite; import org.apache.paimon.table.sink.BatchTableCommit; @@ -32,6 +33,8 @@ import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; import org.apache.flink.table.data.RowData; import java.util.List; @@ -55,7 +58,7 @@ public DataStreamSink sinkFrom(DataStream dataStream) { return dataStream.sinkTo(new FormatTableSink(table, overwrite, staticPartitions)); } - private static class FormatTableSink implements Sink { + private static class FormatTableSink implements Sink, LineageVertexProvider { private final FormatTable table; private final boolean overwrite; @@ -68,6 +71,11 @@ public FormatTableSink( this.staticPartitions = staticPartitions; } + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(table); + } + /** * Do not annotate with @override here to maintain compatibility with Flink * 2.0+. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 959132ad58e0..9948863c547b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; @@ -240,7 +239,7 @@ public DataStreamSink doCommit(DataStream written, String commit } configureSlotSharingGroup( committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY)); - return committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1); + return committed.sinkTo(new PaimonDiscardingSink<>(table)).name("end").setParallelism(1); } public static void configureSlotSharingGroup( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PaimonDiscardingSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PaimonDiscardingSink.java new file mode 100644 index 000000000000..84c504166966 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PaimonDiscardingSink.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.flink.lineage.LineageUtils; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; + +/** + * A {@link DiscardingSink} that implements {@link LineageVertexProvider} so Flink's lineage graph + * discovers the Paimon sink table when using the DataStream API. + */ +public class PaimonDiscardingSink extends DiscardingSink implements LineageVertexProvider { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + + public PaimonDiscardingSink(FileStoreTable table) { + this.table = table; + } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sinkLineageVertex(table); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3e96dec1ea50..37346573a262 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -240,7 +240,7 @@ private DataStream buildAlignedContinuousFileSource() { private DataStream toDataStream(Source source) { DataStreamSource dataStream = env.fromSource( - source, + new PaimonDataStreamSource<>(source, table), watermarkStrategy == null ? WatermarkStrategy.noWatermarks() : watermarkStrategy, @@ -354,7 +354,8 @@ private DataStream buildDedicatedSplitGenSource(boolean isBounded) { unordered, outerProject(), isBounded, - limit); + limit, + table); if (parallelism != null) { dataStream.getTransformation().setParallelism(parallelism); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/PaimonDataStreamSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/PaimonDataStreamSource.java new file mode 100644 index 000000000000..bfef8bc6309d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/PaimonDataStreamSource.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.flink.lineage.LineageUtils; +import org.apache.paimon.table.Table; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; + +/** + * A {@link Source} wrapper that preserves the wrapped source behavior and exposes Paimon lineage + * for sources built through {@link FlinkSourceBuilder}. + */ +public class PaimonDataStreamSource + implements Source, LineageVertexProvider { + + private static final long serialVersionUID = 1L; + + private final Source source; + private final Table table; + + public PaimonDataStreamSource(Source source, Table table) { + this.source = source; + this.table = table; + } + + @Override + public Boundedness getBoundedness() { + return source.getBoundedness(); + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) + throws Exception { + return source.createReader(readerContext); + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext enumContext) throws Exception { + return source.createEnumerator(enumContext); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext enumContext, CheckpointT checkpoint) throws Exception { + return source.restoreEnumerator(enumContext, checkpoint); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return source.getSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return source.getEnumeratorCheckpointSerializer(); + } + + @Override + public LineageVertex getLineageVertex() { + return LineageUtils.sourceLineageVertex(getBoundedness() == Boundedness.BOUNDED, table); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java index d9b7d054cfcb..b6de64472b5b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java @@ -21,9 +21,12 @@ import org.apache.paimon.flink.NestedProjectedRowData; import org.apache.paimon.flink.source.AbstractNonCoordinatedSource; import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader; +import org.apache.paimon.flink.source.NoOpEnumState; +import org.apache.paimon.flink.source.PaimonDataStreamSource; import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.source.SplitListState; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; @@ -37,6 +40,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.java.tuple.Tuple2; @@ -242,13 +246,43 @@ public static DataStream buildSource( NestedProjectedRowData nestedProjectedRowData, boolean isBounded, @Nullable Long limit) { + return buildSource( + env, + name, + typeInfo, + readBuilder, + monitorInterval, + emitSnapshotWatermark, + shuffleBucketWithPartition, + unordered, + nestedProjectedRowData, + isBounded, + limit, + null); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + TypeInformation typeInfo, + ReadBuilder readBuilder, + long monitorInterval, + boolean emitSnapshotWatermark, + boolean shuffleBucketWithPartition, + boolean unordered, + NestedProjectedRowData nestedProjectedRowData, + boolean isBounded, + @Nullable Long limit, + @Nullable Table table) { + MonitorSource monitorSource = + new MonitorSource(readBuilder, monitorInterval, emitSnapshotWatermark, isBounded); + Source source = monitorSource; + if (table != null) { + source = new PaimonDataStreamSource<>(monitorSource, table); + } SingleOutputStreamOperator operator = env.fromSource( - new MonitorSource( - readBuilder, - monitorInterval, - emitSnapshotWatermark, - isBounded), + source, WatermarkStrategy.noWatermarks(), name + "-Monitor", new JavaTypeInfo<>(Split.class)) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java index 62d601ec1b23..ed78d5cec175 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java @@ -19,12 +19,18 @@ package org.apache.paimon.flink.lineage; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.flink.PaimonDataStreamScanProvider; -import org.apache.paimon.flink.PaimonDataStreamSinkProvider; +import org.apache.paimon.flink.sink.PaimonDiscardingSink; +import org.apache.paimon.flink.source.ContinuousFileStoreSource; +import org.apache.paimon.flink.source.PaimonDataStreamSource; +import org.apache.paimon.flink.source.operator.MonitorSource; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.types.IntType; @@ -33,6 +39,7 @@ import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.streaming.api.lineage.DatasetConfigFacet; +import org.apache.flink.streaming.api.lineage.DatasetSchemaFacet; import org.apache.flink.streaming.api.lineage.LineageDataset; import org.apache.flink.streaming.api.lineage.LineageDatasetFacet; import org.apache.flink.streaming.api.lineage.LineageVertex; @@ -78,6 +85,24 @@ private FileStoreTable createTable( return FileStoreTableFactory.create(LocalFileIO.create(), tablePath); } + private FileStoreTable createTableWithCatalogOptions(Map catalogOptions) + throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + CatalogEnvironment catalogEnvironment = + new CatalogEnvironment( + null, + null, + null, + null, + null, + CatalogContext.create(Options.fromMap(catalogOptions)), + false, + false); + return FileStoreTableFactory.create( + LocalFileIO.create(), tablePath, table.schema(), catalogEnvironment); + } + @Test void testGetNamespace() throws Exception { FileStoreTable table = @@ -116,6 +141,28 @@ void testSourceLineageVertexUnbounded() throws Exception { assertThat(vertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); } + @Test + void testSourceLineageVertexKeepsProvidedNameWhenCatalogKeyExists() throws Exception { + Map catalogOptions = new HashMap<>(); + catalogOptions.put("catalog-key", "jdbc-warehouse"); + FileStoreTable table = createTableWithCatalogOptions(catalogOptions); + + SourceLineageVertex vertex = LineageUtils.sourceLineageVertex("paimon.db.src", true, table); + + assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.src"); + } + + @Test + void testDataStreamSourceLineageVertexUsesCatalogKey() throws Exception { + Map catalogOptions = new HashMap<>(); + catalogOptions.put("catalog-key", "jdbc-warehouse"); + FileStoreTable table = createTableWithCatalogOptions(catalogOptions); + + SourceLineageVertex vertex = LineageUtils.sourceLineageVertex(true, table); + + assertThat(vertex.datasets().get(0).name()).isEqualTo("jdbc-warehouse." + table.fullName()); + } + @Test void testSinkLineageVertex() throws Exception { FileStoreTable table = @@ -131,6 +178,17 @@ void testSinkLineageVertex() throws Exception { assertThat(dataset.namespace()).startsWith("paimon://"); } + @Test + void testDataStreamSinkLineageVertexUsesCatalogKey() throws Exception { + Map catalogOptions = new HashMap<>(); + catalogOptions.put("catalog-key", "jdbc-warehouse"); + FileStoreTable table = createTableWithCatalogOptions(catalogOptions); + + LineageVertex vertex = LineageUtils.sinkLineageVertex(table); + + assertThat(vertex.datasets().get(0).name()).isEqualTo("jdbc-warehouse." + table.fullName()); + } + @Test void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception { FileStoreTable table = @@ -177,6 +235,24 @@ void testConfigFacetWithEmptyKeys() throws Exception { assertThat(config).containsEntry("primary-keys", ""); } + @Test + void testSchemaFacetContainsPaimonFields() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + LineageVertex vertex = LineageUtils.sinkLineageVertex("paimon.db.t", table); + LineageDataset dataset = vertex.datasets().get(0); + + Map facets = dataset.facets(); + assertThat(facets).containsKey("schema"); + + DatasetSchemaFacet schemaFacet = (DatasetSchemaFacet) facets.get("schema"); + assertThat(schemaFacet.fields()).containsOnlyKeys("f0", "f1", "f2"); + assertThat(schemaFacet.fields().get("f0").type()).isEqualTo("INT NOT NULL"); + assertThat(schemaFacet.fields().get("f1").type()).isEqualTo("VARCHAR(100)"); + assertThat(schemaFacet.fields().get("f2").type()).isEqualTo("INT"); + } + @Test void testScanProviderImplementsLineageVertexProvider() throws Exception { FileStoreTable table = @@ -193,16 +269,47 @@ void testScanProviderImplementsLineageVertexProvider() throws Exception { } @Test - void testSinkProviderImplementsLineageVertexProvider() throws Exception { + void testSinkLineageViaPaimonDiscardingSink() throws Exception { FileStoreTable table = createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); - PaimonDataStreamSinkProvider provider = - new PaimonDataStreamSinkProvider(dataStream -> null, "paimon.db.sink", table); + PaimonDiscardingSink sink = new PaimonDiscardingSink<>(table); - assertThat(provider).isInstanceOf(LineageVertexProvider.class); - LineageVertex vertex = provider.getLineageVertex(); + assertThat(sink).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = sink.getLineageVertex(); + assertThat(vertex.datasets()).hasSize(1); + } + + @Test + void testPaimonDataStreamSourceWrapsMonitorSourceLineageVertex() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + PaimonDataStreamSource source = + new PaimonDataStreamSource<>( + new MonitorSource(table.newReadBuilder(), 10, false, true), table); + + assertThat(source).isInstanceOf(LineageVertexProvider.class); + SourceLineageVertex vertex = (SourceLineageVertex) source.getLineageVertex(); + assertThat(vertex.boundedness()).isEqualTo(Boundedness.BOUNDED); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo(table.fullName()); + } + + @Test + void testPaimonDataStreamSourceWrapsFlinkSourceLineageVertex() throws Exception { + FileStoreTable table = + createTable(new HashMap<>(), Collections.emptyList(), Arrays.asList("f0")); + + PaimonDataStreamSource source = + new PaimonDataStreamSource<>( + new ContinuousFileStoreSource( + table.newReadBuilder(), table.options(), null), + table); + + SourceLineageVertex vertex = (SourceLineageVertex) source.getLineageVertex(); + assertThat(vertex.boundedness()).isEqualTo(Boundedness.CONTINUOUS_UNBOUNDED); assertThat(vertex.datasets()).hasSize(1); - assertThat(vertex.datasets().get(0).name()).isEqualTo("paimon.db.sink"); + assertThat(vertex.datasets().get(0).name()).isEqualTo(table.fullName()); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSinkTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSinkTest.java new file mode 100644 index 000000000000..b97792e82425 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkFormatTableDataStreamSinkTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FormatTable; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import org.apache.flink.streaming.api.lineage.LineageVertex; +import org.apache.flink.streaming.api.lineage.LineageVertexProvider; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.lang.reflect.Constructor; +import java.util.Collections; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FlinkFormatTableDataStreamSink}. */ +class FlinkFormatTableDataStreamSinkTest { + + @TempDir java.nio.file.Path temp; + + @Test + void testFormatTableSinkLineageVertex() throws Exception { + FormatTable table = + FormatTable.builder() + .fileIO(LocalFileIO.create()) + .identifier(Identifier.create("test_db", "test_table")) + .rowType(RowType.of(new IntType())) + .partitionKeys(Collections.emptyList()) + .location(new Path(temp.toUri().toString()).toString()) + .format(FormatTable.Format.PARQUET) + .options(Collections.singletonMap("path", temp.toUri().toString())) + .catalogContext(CatalogContext.create(new Options())) + .build(); + + Class sinkClass = + Class.forName( + "org.apache.paimon.flink.sink.FlinkFormatTableDataStreamSink$FormatTableSink"); + Constructor constructor = + sinkClass.getDeclaredConstructor(FormatTable.class, boolean.class, Map.class); + constructor.setAccessible(true); + Object sink = constructor.newInstance(table, false, Collections.emptyMap()); + + assertThat(sink).isInstanceOf(LineageVertexProvider.class); + LineageVertex vertex = ((LineageVertexProvider) sink).getLineageVertex(); + assertThat(vertex.datasets()).hasSize(1); + assertThat(vertex.datasets().get(0).name()).isEqualTo(table.fullName()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkBuilderLineageTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkBuilderLineageTest.java new file mode 100644 index 000000000000..a7ba7f687985 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkBuilderLineageTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for sink lineage in {@link FlinkSinkBuilder}. */ +class FlinkSinkBuilderLineageTest { + + @TempDir java.nio.file.Path temp; + + @Test + void testFlinkSinkBuilderUsesPaimonDiscardingSinkForLineage() throws Exception { + FileStoreTable table = createTable(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream input = + env.fromCollection( + Collections.singletonList((RowData) GenericRowData.of(1)), + InternalTypeInfo.of(toLogicalType(table.rowType()))); + + DataStreamSink sink = new FlinkSinkBuilder(table).forRowData(input).build(); + + assertThat(sink.getTransformation()).isInstanceOf(SinkTransformation.class); + SinkTransformation transformation = + (SinkTransformation) sink.getTransformation(); + assertThat(transformation.getSink()).isInstanceOf(PaimonDiscardingSink.class); + } + + private FileStoreTable createTable() throws Exception { + Path tablePath = new Path(temp.toUri().toString()); + Map options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), "-1"); + new SchemaManager(LocalFileIO.create(), tablePath) + .createTable( + new Schema( + RowType.of(new IntType()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + "")); + return FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java index bc2ccb0fed13..9f6c46c2a793 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java @@ -22,16 +22,25 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.source.operator.MonitorSource; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataTypes; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.nio.file.Path; +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -114,4 +123,63 @@ public void testUnawareBucket() throws Exception { builder = new FlinkSourceBuilder(table); assertTrue(builder.isUnordered()); } + + @Test + public void testBuildWrapsStaticSourceWithPaimonDataStreamSource() throws Exception { + Table table = createTable("static_source", false, -1, true); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + new FlinkSourceBuilder(table).env(env).sourceBounded(true).build(); + + assertThat(dataStream.getTransformation()).isInstanceOf(SourceTransformation.class); + SourceTransformation transformation = + (SourceTransformation) dataStream.getTransformation(); + assertThat(transformation.getSource()).isInstanceOf(PaimonDataStreamSource.class); + } + + @Test + public void testBuildWrapsContinuousSourceWithPaimonDataStreamSource() throws Exception { + Table table = createTable("continuous_source", false, -1, true); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + new FlinkSourceBuilder(table).env(env).sourceBounded(false).build(); + + assertThat(dataStream.getTransformation()).isInstanceOf(SourceTransformation.class); + SourceTransformation transformation = + (SourceTransformation) dataStream.getTransformation(); + assertThat(transformation.getSource()).isInstanceOf(PaimonDataStreamSource.class); + } + + @Test + public void testMonitorSourceBuildSourceWrapsWithPaimonDataStreamSource() throws Exception { + Table table = createTable("monitor_source", false, -1, true); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + MonitorSource.buildSource( + env, + "source", + InternalTypeInfo.of(toLogicalType(table.rowType())), + table.newReadBuilder(), + 10, + false, + false, + false, + null, + true, + null, + table); + + assertThat(dataStream.getTransformation().getTransitivePredecessors()) + .filteredOn(Transformation.class::isInstance) + .filteredOn(transformation -> transformation instanceof SourceTransformation) + .anySatisfy( + transformation -> + assertThat( + ((SourceTransformation) transformation) + .getSource()) + .isInstanceOf(PaimonDataStreamSource.class)); + } }