From 73c41a5f67a6747cdfac16a0c203c5c1b24e4460 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Fri, 26 Jun 2026 14:56:49 +0800 Subject: [PATCH] [flink] add support of hybrid lookup with lake --- .../org/apache/fluss/metrics/MetricNames.java | 15 + .../Flink120HybridLakeLookupITCase.java | 21 + .../fluss/flink/FlinkConnectorOptions.java | 41 + .../flink/catalog/FlinkTableFactory.java | 15 + .../fluss/flink/source/FlinkTableSource.java | 197 +++++ .../lookup/HybridLakeAsyncLookupFunction.java | 743 ++++++++++++++++++ .../flink/catalog/FlinkTableFactoryTest.java | 180 ++++- .../flink/source/HybridLakeLookupITCase.java | 241 ++++++ .../TestingPaimonLakeStoragePlugin.java | 175 +++++ ...e.fluss.lake.lakestorage.LakeStoragePlugin | 3 +- 10 files changed, 1627 insertions(+), 4 deletions(-) create mode 100644 fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120HybridLakeLookupITCase.java create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/HybridLakeAsyncLookupFunction.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/HybridLakeLookupITCase.java create mode 100644 fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingPaimonLakeStoragePlugin.java diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index fe34deba95..c3ba879cc1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -284,6 +284,21 @@ public class MetricNames { public static final String SCANNER_REMOTE_FETCH_RATE = "remoteFetchRequestsPerSecond"; public static final String SCANNER_REMOTE_FETCH_ERROR_RATE = "remoteFetchErrorPerSecond"; + // for flink hybrid lake lookup + public static final String LOOKUP_HOT_FLUSS_HITS_TOTAL = "lookupHotFlussHitsTotal"; + public static final String LOOKUP_HOT_FLUSS_MISSES_TOTAL = "lookupHotFlussMissesTotal"; + public static final String LOOKUP_COLD_FLUSS_HITS_TOTAL = "lookupColdFlussHitsTotal"; + public static final String LOOKUP_COLD_FLUSS_MISSES_TOTAL = "lookupColdFlussMissesTotal"; + public static final String LAKE_FALLBACK_REQUESTS_TOTAL = "lakeFallbackRequestsTotal"; + public static final String LAKE_FALLBACK_HITS_TOTAL = "lakeFallbackHitsTotal"; + public static final String LAKE_FALLBACK_MISSES_TOTAL = "lakeFallbackMissesTotal"; + public static final String LAKE_FALLBACK_FAILURES_TOTAL = "lakeFallbackFailuresTotal"; + public static final String LAKE_FALLBACK_TIMEOUTS_TOTAL = "lakeFallbackTimeoutsTotal"; + public static final String LAKE_FALLBACK_REJECTED_TOTAL = "lakeFallbackRejectedTotal"; + public static final String LAKE_FALLBACK_LATENCY_MS = "lakeFallbackLatencyMs"; + public static final String LAKE_FALLBACK_PENDING_COUNT = "lakeFallbackPendingCount"; + public static final String LAKE_FALLBACK_QUEUE_SIZE = "lakeFallbackQueueSize"; + // for netty public static final String NETTY_USED_DIRECT_MEMORY = "usedDirectMemory"; public static final String NETTY_NUM_DIRECT_ARENAS = "numDirectArenas"; diff --git a/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120HybridLakeLookupITCase.java b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120HybridLakeLookupITCase.java new file mode 100644 index 0000000000..a74ca9ebe0 --- /dev/null +++ b/fluss-flink/fluss-flink-1.20/src/test/java/org/apache/fluss/flink/source/Flink120HybridLakeLookupITCase.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +/** IT case for hybrid lake lookup in Flink 1.20. */ +public class Flink120HybridLakeLookupITCase extends HybridLakeLookupITCase {} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java index 23d8f0b2e9..483f782bac 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java @@ -115,6 +115,47 @@ public class FlinkConnectorOptions { + "with the lookup key values. This feature cannot be used with PREFIX_LOOKUP type. " + "Default is false."); + public static final ConfigOption LOOKUP_LAKE_FALLBACK_ENABLED = + ConfigOptions.key("lookup.lake-fallback.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to fall back to the lake table when asynchronous lookup misses in Fluss."); + + public static final ConfigOption LOOKUP_HOT_WINDOW = + ConfigOptions.key("lookup.hot-window") + .durationType() + .noDefaultValue() + .withDescription( + "The hot data window for lake fallback lookup. Lookup keys inside this window only query Fluss."); + + public static final ConfigOption LOOKUP_TIME_ZONE = + ConfigOptions.key("lookup.time-zone") + .stringType() + .defaultValue(java.time.ZoneId.systemDefault().getId()) + .withDescription( + "The time zone used to interpret hour partition values for lake fallback lookup."); + + public static final ConfigOption LOOKUP_LAKE_FALLBACK_TIMEOUT = + ConfigOptions.key("lookup.lake-fallback.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription("The timeout for a single lake fallback lookup."); + + public static final ConfigOption LOOKUP_LAKE_FALLBACK_EXECUTOR_THREADS = + ConfigOptions.key("lookup.lake-fallback.executor-threads") + .intType() + .defaultValue(4) + .withDescription( + "The number of worker threads used for blocking lake fallback lookups."); + + public static final ConfigOption LOOKUP_LAKE_FALLBACK_MAX_CONCURRENCY = + ConfigOptions.key("lookup.lake-fallback.max-concurrency") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum number of active and queued lake fallback lookups per lookup function instance."); + // -------------------------------------------------------------------------------------------- // Scan specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index ab64959abd..04dbc8c05a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -51,6 +51,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.logical.RowType; +import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; @@ -148,6 +149,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { .toMillis(); int splitAssignmentBatchSize = tableOptions.get(FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE); + Duration lookupHotWindow = + tableOptions.getOptional(FlinkConnectorOptions.LOOKUP_HOT_WINDOW).orElse(null); LeaseContext leaseContext = LeaseContext.fromConf(tableOptions); return new FlinkTableSource( @@ -163,6 +166,12 @@ public DynamicTableSource createDynamicTableSource(Context context) { startupOptions, tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC), tableOptions.get(FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS), + tableOptions.get(FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_ENABLED), + lookupHotWindow, + tableOptions.get(FlinkConnectorOptions.LOOKUP_TIME_ZONE), + tableOptions.get(FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_TIMEOUT), + tableOptions.get(FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_EXECUTOR_THREADS), + tableOptions.get(FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_MAX_CONCURRENCY), cache, partitionDiscoveryIntervalMs, splitAssignmentBatchSize, @@ -242,6 +251,12 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION, FlinkConnectorOptions.LOOKUP_ASYNC, FlinkConnectorOptions.LOOKUP_INSERT_IF_NOT_EXISTS, + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_ENABLED, + FlinkConnectorOptions.LOOKUP_HOT_WINDOW, + FlinkConnectorOptions.LOOKUP_TIME_ZONE, + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_TIMEOUT, + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_EXECUTOR_THREADS, + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_MAX_CONCURRENCY, FlinkConnectorOptions.SINK_IGNORE_DELETE, FlinkConnectorOptions.SINK_BUCKET_SHUFFLE, FlinkConnectorOptions.SINK_DISTRIBUTION_MODE, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 24a8d00d59..5b2c5df16f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -26,6 +26,7 @@ import org.apache.fluss.flink.source.deserializer.RowDataDeserializationSchema; import org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction; import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; +import org.apache.fluss.flink.source.lookup.HybridLakeAsyncLookupFunction; import org.apache.fluss.flink.source.lookup.LookupNormalizer; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; @@ -36,6 +37,7 @@ import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.ChangelogImage; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.TablePath; @@ -54,6 +56,7 @@ import org.apache.flink.api.connector.source.Source; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.RowLevelModificationScanContext; @@ -87,6 +90,8 @@ import javax.annotation.Nullable; +import java.time.Duration; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -134,6 +139,12 @@ public class FlinkTableSource // options for lookup source private final boolean lookupAsync; private final boolean insertIfNotExists; + private final boolean lakeFallbackEnabled; + @Nullable private final Duration lookupHotWindow; + private final ZoneId lookupTimeZone; + private final Duration lakeFallbackTimeout; + private final int lakeFallbackExecutorThreads; + private final int lakeFallbackMaxConcurrency; @Nullable private final LookupCache cache; private final long scanPartitionDiscoveryIntervalMs; @@ -205,6 +216,62 @@ public FlinkTableSource( startupOptions, lookupAsync, insertIfNotExists, + false, + null, + FlinkConnectorOptions.LOOKUP_TIME_ZONE.defaultValue(), + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_TIMEOUT.defaultValue(), + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_EXECUTOR_THREADS.defaultValue(), + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_MAX_CONCURRENCY.defaultValue(), + cache, + scanPartitionDiscoveryIntervalMs, + isDataLakeEnabled, + mergeEngineType, + tableOptions, + leaseContext); + } + + public FlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + TableConfig tableConfig, + org.apache.flink.table.types.logical.RowType tableOutputType, + int[] primaryKeyIndexes, + int[] bucketKeyIndexes, + int[] partitionKeyIndexes, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + boolean lookupAsync, + boolean insertIfNotExists, + boolean lakeFallbackEnabled, + @Nullable Duration lookupHotWindow, + String lookupTimeZone, + Duration lakeFallbackTimeout, + int lakeFallbackExecutorThreads, + int lakeFallbackMaxConcurrency, + @Nullable LookupCache cache, + long scanPartitionDiscoveryIntervalMs, + boolean isDataLakeEnabled, + @Nullable MergeEngineType mergeEngineType, + Map tableOptions, + LeaseContext leaseContext) { + this( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + primaryKeyIndexes, + bucketKeyIndexes, + partitionKeyIndexes, + streaming, + startupOptions, + lookupAsync, + insertIfNotExists, + lakeFallbackEnabled, + lookupHotWindow, + lookupTimeZone, + lakeFallbackTimeout, + lakeFallbackExecutorThreads, + lakeFallbackMaxConcurrency, cache, scanPartitionDiscoveryIntervalMs, FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE.defaultValue(), @@ -233,6 +300,58 @@ public FlinkTableSource( @Nullable MergeEngineType mergeEngineType, Map tableOptions, LeaseContext leaseContext) { + this( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + primaryKeyIndexes, + bucketKeyIndexes, + partitionKeyIndexes, + streaming, + startupOptions, + lookupAsync, + insertIfNotExists, + false, + null, + FlinkConnectorOptions.LOOKUP_TIME_ZONE.defaultValue(), + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_TIMEOUT.defaultValue(), + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_EXECUTOR_THREADS.defaultValue(), + FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_MAX_CONCURRENCY.defaultValue(), + cache, + scanPartitionDiscoveryIntervalMs, + splitPerAssignmentBatchSize, + isDataLakeEnabled, + mergeEngineType, + tableOptions, + leaseContext); + } + + public FlinkTableSource( + TablePath tablePath, + Configuration flussConfig, + TableConfig tableConfig, + org.apache.flink.table.types.logical.RowType tableOutputType, + int[] primaryKeyIndexes, + int[] bucketKeyIndexes, + int[] partitionKeyIndexes, + boolean streaming, + FlinkConnectorOptionsUtils.StartupOptions startupOptions, + boolean lookupAsync, + boolean insertIfNotExists, + boolean lakeFallbackEnabled, + @Nullable Duration lookupHotWindow, + String lookupTimeZone, + Duration lakeFallbackTimeout, + int lakeFallbackExecutorThreads, + int lakeFallbackMaxConcurrency, + @Nullable LookupCache cache, + long scanPartitionDiscoveryIntervalMs, + int splitPerAssignmentBatchSize, + boolean isDataLakeEnabled, + @Nullable MergeEngineType mergeEngineType, + Map tableOptions, + LeaseContext leaseContext) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableOutputType = tableOutputType; @@ -245,6 +364,12 @@ public FlinkTableSource( this.lookupAsync = lookupAsync; this.insertIfNotExists = insertIfNotExists; + this.lakeFallbackEnabled = lakeFallbackEnabled; + this.lookupHotWindow = lookupHotWindow; + this.lookupTimeZone = ZoneId.of(lookupTimeZone); + this.lakeFallbackTimeout = lakeFallbackTimeout; + this.lakeFallbackExecutorThreads = lakeFallbackExecutorThreads; + this.lakeFallbackMaxConcurrency = lakeFallbackMaxConcurrency; this.cache = cache; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; @@ -471,6 +596,27 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { partitionKeyIndexes, tableOutputType, projectedFields); + if (lakeFallbackEnabled) { + validateLakeFallbackLookup(lookupNormalizer); + AsyncLookupFunction asyncLookupFunction = + new HybridLakeAsyncLookupFunction( + flussConfig, + tablePath, + tableOutputType, + primaryKeyIndexes, + partitionKeyIndexes, + lookupNormalizer, + projectedFields, + tableOptions, + checkNotNull( + lookupHotWindow, + "lookupHotWindow must not be null when lake fallback is enabled."), + lookupTimeZone, + lakeFallbackTimeout, + lakeFallbackExecutorThreads, + lakeFallbackMaxConcurrency); + return AsyncLookupFunctionProvider.of(asyncLookupFunction); + } if (lookupAsync) { AsyncLookupFunction asyncLookupFunction = new FlinkAsyncLookupFunction( @@ -502,6 +648,51 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { } } + private void validateLakeFallbackLookup(LookupNormalizer lookupNormalizer) { + if (!lookupAsync) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' requires 'lookup.async' to be true."); + } + if (insertIfNotExists) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' cannot be used with 'lookup.insert-if-not-exists'."); + } + if (cache != null) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' cannot be used with lookup cache."); + } + if (lookupHotWindow == null) { + throw new TableException( + "Option 'lookup.hot-window' must be configured when 'lookup.lake-fallback.enabled' is true."); + } + if (!tableConfig.getAutoPartitionStrategy().isAutoPartitionEnabled()) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' requires auto partition to be enabled."); + } + if (!isDataLakeEnabled) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' requires a datalake-enabled Fluss table."); + } + if (!tableConfig.getDataLakeFormat().isPresent() + || tableConfig.getDataLakeFormat().get() != DataLakeFormat.PAIMON) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' currently only supports Paimon lake tables."); + } + if (lookupNormalizer.getLookupType() != org.apache.fluss.client.lookup.LookupType.LOOKUP) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' only supports full primary-key lookup."); + } + if (partitionKeyIndexes.length == 0) { + throw new TableException( + "Option 'lookup.lake-fallback.enabled' requires a partitioned table."); + } + if (lakeFallbackExecutorThreads <= 0 || lakeFallbackMaxConcurrency <= 0) { + throw new TableException( + "Options 'lookup.lake-fallback.executor-threads' and " + + "'lookup.lake-fallback.max-concurrency' must be positive."); + } + } + @Override public DynamicTableSource copy() { FlinkTableSource source = @@ -517,6 +708,12 @@ public DynamicTableSource copy() { startupOptions, lookupAsync, insertIfNotExists, + lakeFallbackEnabled, + lookupHotWindow, + lookupTimeZone.getId(), + lakeFallbackTimeout, + lakeFallbackExecutorThreads, + lakeFallbackMaxConcurrency, cache, scanPartitionDiscoveryIntervalMs, splitPerAssignmentBatchSize, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/HybridLakeAsyncLookupFunction.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/HybridLakeAsyncLookupFunction.java new file mode 100644 index 0000000000..18bba4b9da --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/lookup/HybridLakeAsyncLookupFunction.java @@ -0,0 +1,743 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.lookup; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.lookup.Lookup; +import org.apache.fluss.client.lookup.LookupResult; +import org.apache.fluss.client.lookup.Lookuper; +import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.config.AutoPartitionTimeUnit; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.flink.row.FlinkAsFlussRow; +import org.apache.fluss.flink.utils.FlinkConversions; +import org.apache.fluss.flink.utils.FlinkUtils; +import org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; +import org.apache.fluss.lake.source.Planner; +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.predicate.PredicateBuilder; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.types.DataTypeRoot; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.ExceptionUtils; +import org.apache.fluss.utils.PartitionUtils; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.AsyncLookupFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource; +import static org.apache.fluss.utils.Preconditions.checkNotNull; + +/** An async lookup function that falls back to lake data when cold Fluss lookup misses. */ +public class HybridLakeAsyncLookupFunction extends AsyncLookupFunction { + + private static final Logger LOG = LoggerFactory.getLogger(HybridLakeAsyncLookupFunction.class); + private static final long serialVersionUID = 1L; + + private final Configuration flussConfig; + private final TablePath tablePath; + private final RowType flinkRowType; + private final int[] primaryKeyIndexes; + private final int[] partitionKeyIndexes; + private final LookupNormalizer lookupNormalizer; + @Nullable private int[] projection; + private final Map tableOptions; + private final Duration hotWindow; + private final ZoneId lookupTimeZone; + @Nullable private final String autoPartitionKey; + private final AutoPartitionTimeUnit autoPartitionTimeUnit; + private final Duration lakeFallbackTimeout; + private final int lakeFallbackExecutorThreads; + private final int lakeFallbackMaxConcurrency; + + private transient FlussRowToFlinkRowConverter flussRowToFlinkRowConverter; + private transient Connection connection; + private transient Admin admin; + private transient Table table; + private transient Lookuper lookuper; + private transient FlinkAsFlussRow lookupRow; + private transient InternalRow.FieldGetter[] primaryKeyFieldGetters; + private transient int autoPartitionKeyPositionInPrimaryKey; + private transient org.apache.fluss.types.RowType flussFullRowType; + private transient ThreadPoolExecutor lakeLookupExecutor; + private transient ScheduledExecutorService timeoutExecutor; + private transient AtomicInteger lakeFallbackPendingCount; + private transient Counter lookupHotFlussHitsTotal; + private transient Counter lookupHotFlussMissesTotal; + private transient Counter lookupColdFlussHitsTotal; + private transient Counter lookupColdFlussMissesTotal; + private transient Counter lakeFallbackRequestsTotal; + private transient Counter lakeFallbackHitsTotal; + private transient Counter lakeFallbackMissesTotal; + private transient Counter lakeFallbackFailuresTotal; + private transient Counter lakeFallbackTimeoutsTotal; + private transient Counter lakeFallbackRejectedTotal; + private transient Histogram lakeFallbackLatencyMs; + + public HybridLakeAsyncLookupFunction( + Configuration flussConfig, + TablePath tablePath, + RowType flinkRowType, + int[] primaryKeyIndexes, + int[] partitionKeyIndexes, + LookupNormalizer lookupNormalizer, + @Nullable int[] projection, + Map tableOptions, + Duration hotWindow, + ZoneId lookupTimeZone, + Duration lakeFallbackTimeout, + int lakeFallbackExecutorThreads, + int lakeFallbackMaxConcurrency) { + this.flussConfig = flussConfig; + this.tablePath = tablePath; + this.flinkRowType = flinkRowType; + this.primaryKeyIndexes = primaryKeyIndexes; + this.partitionKeyIndexes = partitionKeyIndexes; + this.lookupNormalizer = lookupNormalizer; + this.projection = projection; + this.tableOptions = tableOptions; + this.hotWindow = hotWindow; + this.lookupTimeZone = lookupTimeZone; + Configuration tableConfig = Configuration.fromMap(tableOptions); + this.autoPartitionKey = tableConfig.getString(ConfigOptions.TABLE_AUTO_PARTITION_KEY); + this.autoPartitionTimeUnit = tableConfig.get(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT); + this.lakeFallbackTimeout = lakeFallbackTimeout; + this.lakeFallbackExecutorThreads = lakeFallbackExecutorThreads; + this.lakeFallbackMaxConcurrency = lakeFallbackMaxConcurrency; + } + + @Override + public void open(FunctionContext context) { + LOG.info("Start opening hybrid lake async lookup function for table {}.", tablePath); + flussFullRowType = FlinkConversions.toFlussRowType(flinkRowType); + validateLookupShape(); + connection = ConnectionFactory.createConnection(flussConfig); + admin = connection.getAdmin(); + table = connection.getTable(tablePath); + lookupRow = new FlinkAsFlussRow(); + + final RowType outputRowType; + if (projection == null) { + outputRowType = flinkRowType; + projection = IntStream.range(0, flinkRowType.getFieldCount()).toArray(); + } else { + outputRowType = FlinkUtils.projectRowType(flinkRowType, projection); + } + flussRowToFlinkRowConverter = + new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(outputRowType)); + + Lookup lookup = table.newLookup(); + lookuper = lookup.createLookuper(); + + org.apache.fluss.types.RowType primaryKeyRowType = + flussFullRowType.project(primaryKeyIndexes); + primaryKeyFieldGetters = new InternalRow.FieldGetter[primaryKeyIndexes.length]; + for (int i = 0; i < primaryKeyIndexes.length; i++) { + primaryKeyFieldGetters[i] = + InternalRow.createFieldGetter(primaryKeyRowType.getTypeAt(i), i); + } + autoPartitionKeyPositionInPrimaryKey = findAutoPartitionKeyPositionInPrimaryKey(); + + lakeLookupExecutor = + new ThreadPoolExecutor( + lakeFallbackExecutorThreads, + lakeFallbackExecutorThreads, + 0L, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(lakeFallbackMaxConcurrency), + new ExecutorThreadFactory("fluss-lake-fallback-lookup"), + new ThreadPoolExecutor.AbortPolicy()); + timeoutExecutor = + new ScheduledThreadPoolExecutor( + 1, new ExecutorThreadFactory("fluss-lake-fallback-timeout")); + lakeFallbackPendingCount = new AtomicInteger(); + registerMetrics(context); + LOG.info("Finished opening hybrid lake async lookup function for table {}.", tablePath); + } + + @Override + public CompletableFuture> asyncLookup(RowData keyRow) { + RowData normalizedKeyRow = lookupNormalizer.normalizeLookupKey(keyRow); + LookupNormalizer.RemainingFilter remainingFilter = + lookupNormalizer.createRemainingFilter(keyRow); + FlussLookupKey lookupKey = createLookupKey(normalizedKeyRow); + InternalRow flussKeyRow = lookupRow.replace(normalizedKeyRow); + + CompletableFuture> future = new CompletableFuture<>(); + lookuper.lookup(flussKeyRow) + .whenComplete( + (result, throwable) -> { + if (throwable != null) { + LOG.error( + "Fluss async lookup failed for table {}.", + tablePath, + throwable); + future.completeExceptionally( + new RuntimeException( + "Execution of Fluss async lookup failed: " + + throwable.getMessage(), + throwable)); + } else if (!isColdPartition(lookupKey.partitionValue)) { + if (result.getRowList().isEmpty()) { + lookupHotFlussMissesTotal.inc(); + } else { + lookupHotFlussHitsTotal.inc(); + } + handleLookupSuccess(future, result, remainingFilter); + } else if (!result.getRowList().isEmpty()) { + lookupColdFlussHitsTotal.inc(); + handleLookupSuccess(future, result, remainingFilter); + } else { + lookupColdFlussMissesTotal.inc(); + lookupLakeAsync(lookupKey, remainingFilter, future); + } + }); + return future; + } + + private void lookupLakeAsync( + FlussLookupKey lookupKey, + @Nullable LookupNormalizer.RemainingFilter remainingFilter, + CompletableFuture> future) { + lakeFallbackRequestsTotal.inc(); + lakeFallbackPendingCount.incrementAndGet(); + long startMs = System.currentTimeMillis(); + scheduleTimeout(future, startMs); + try { + lakeLookupExecutor.execute( + () -> { + try { + Collection rows = lookupLake(lookupKey, remainingFilter); + completeLakeFallbackSuccessfully(future, rows, startMs); + } catch (Throwable t) { + completeLakeFallbackExceptionally( + future, + new RuntimeException( + "Execution of lake fallback lookup failed: " + + t.getMessage(), + t), + lakeFallbackFailuresTotal, + startMs); + } + }); + } catch (RuntimeException e) { + completeLakeFallbackExceptionally( + future, + new RuntimeException("Lake fallback lookup executor is overloaded.", e), + lakeFallbackRejectedTotal, + startMs); + } + } + + private void scheduleTimeout(CompletableFuture> future, long startMs) { + timeoutExecutor.schedule( + () -> + completeLakeFallbackExceptionally( + future, + new TimeoutException( + "Lake fallback lookup timed out after " + + lakeFallbackTimeout), + lakeFallbackTimeoutsTotal, + startMs), + lakeFallbackTimeout.toMillis(), + TimeUnit.MILLISECONDS); + } + + private void completeLakeFallbackSuccessfully( + CompletableFuture> future, Collection rows, long startMs) { + if (future.complete(rows)) { + if (rows.isEmpty()) { + lakeFallbackMissesTotal.inc(); + } else { + lakeFallbackHitsTotal.inc(); + } + recordLakeFallbackCompletion(startMs); + } + } + + private void completeLakeFallbackExceptionally( + CompletableFuture> future, + Throwable throwable, + Counter failureCounter, + long startMs) { + if (future.completeExceptionally(throwable)) { + failureCounter.inc(); + recordLakeFallbackCompletion(startMs); + } + } + + private void recordLakeFallbackCompletion(long startMs) { + lakeFallbackLatencyMs.update(System.currentTimeMillis() - startMs); + lakeFallbackPendingCount.decrementAndGet(); + } + + private Collection lookupLake( + FlussLookupKey lookupKey, @Nullable LookupNormalizer.RemainingFilter remainingFilter) + throws Exception { + LakeSnapshot lakeSnapshot; + try { + lakeSnapshot = admin.getReadableLakeSnapshot(tablePath).get(); + } catch (Exception e) { + Throwable stripped = ExceptionUtils.stripExecutionException(e); + if (stripped instanceof LakeTableSnapshotNotExistException) { + return Collections.emptyList(); + } + throw e; + } + + LakeSource lakeSource = + checkNotNull( + createLakeSource(tablePath, tableOptions), + "Lake source must not be null for lake fallback lookup."); + if (projection != null) { + lakeSource.withProject(toNestedProjection(projection)); + } + Predicate predicate = createPrimaryKeyPredicate(lookupKey.primaryKeyValues); + LakeSource.FilterPushDownResult pushDownResult = + lakeSource.withFilters(Collections.singletonList(predicate)); + if (!pushDownResult.remainingPredicates().isEmpty()) { + throw new TableException( + "Lake fallback lookup requires primary-key predicates to be pushed down."); + } + + Planner planner = lakeSource.createPlanner(lakeSnapshot::getSnapshotId); + List splits = planner.plan(); + for (LakeSplit split : splits) { + RecordReader reader = + lakeSource.createRecordReader( + (LakeSource.ReaderContext) () -> split); + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + RowData row = + flussRowToFlinkRowConverter.toFlinkRowData( + maybeProject(iterator.next().getRow())); + if (remainingFilter == null || remainingFilter.isMatch(row)) { + return Collections.singletonList(row); + } + } + } + } + return Collections.emptyList(); + } + + private Predicate createPrimaryKeyPredicate(Object[] primaryKeyValues) { + PredicateBuilder builder = new PredicateBuilder(flussFullRowType); + List predicates = new ArrayList<>(); + for (int i = 0; i < primaryKeyIndexes.length; i++) { + predicates.add(builder.equal(primaryKeyIndexes[i], primaryKeyValues[i])); + } + return PredicateBuilder.and(predicates); + } + + private void handleLookupSuccess( + CompletableFuture> resultFuture, + LookupResult lookupResult, + @Nullable LookupNormalizer.RemainingFilter remainingFilter) { + if (lookupResult.getRowList().isEmpty()) { + resultFuture.complete(Collections.emptyList()); + return; + } + + List projectedRows = new ArrayList<>(); + for (InternalRow row : lookupResult.getRowList()) { + if (row != null) { + RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(maybeProject(row)); + if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { + projectedRows.add(flinkRow); + } + } + } + resultFuture.complete(projectedRows); + } + + private InternalRow maybeProject(InternalRow row) { + if (projection == null) { + return row; + } + return ProjectedRow.from(projection).replaceRow(row); + } + + private FlussLookupKey createLookupKey(RowData normalizedKeyRow) { + InternalRow row = new FlinkAsFlussRow(normalizedKeyRow); + Object[] primaryKeyValues = new Object[primaryKeyIndexes.length]; + for (int i = 0; i < primaryKeyIndexes.length; i++) { + primaryKeyValues[i] = primaryKeyFieldGetters[i].getFieldOrNull(row); + } + String partitionValue = + String.valueOf(primaryKeyValues[autoPartitionKeyPositionInPrimaryKey]); + if (primaryKeyValues[autoPartitionKeyPositionInPrimaryKey] instanceof BinaryString) { + partitionValue = primaryKeyValues[autoPartitionKeyPositionInPrimaryKey].toString(); + } + return new FlussLookupKey(primaryKeyValues, partitionValue); + } + + private boolean isColdPartition(String partitionValue) { + validateAutoPartitionTime(partitionValue); + String hotWindowStartPartition = + PartitionUtils.generateAutoPartitionTime( + ZonedDateTime.now(lookupTimeZone).minus(hotWindow), + 0, + autoPartitionTimeUnit); + return partitionValue.compareTo(hotWindowStartPartition) < 0; + } + + private void validateAutoPartitionTime(String partitionValue) { + try { + DateTimeFormatter.ofPattern(getPartitionTimeFormat()).parse(partitionValue); + } catch (DateTimeParseException e) { + throw new TableException( + "Lake fallback lookup requires the partition value to match auto partition time unit '" + + autoPartitionTimeUnit + + "' with format '" + + getPartitionTimeFormat() + + "', but was: " + + partitionValue, + e); + } + } + + private String getPartitionTimeFormat() { + switch (autoPartitionTimeUnit) { + case YEAR: + return "yyyy"; + case QUARTER: + return "yyyyQ"; + case MONTH: + return "yyyyMM"; + case DAY: + return "yyyyMMdd"; + case HOUR: + return "yyyyMMddHH"; + default: + throw new TableException( + "Unsupported auto partition time unit for lake fallback lookup: " + + autoPartitionTimeUnit); + } + } + + private int findAutoPartitionKeyPositionInPrimaryKey() { + int partitionKeyIndex = findAutoPartitionKeyIndex(); + for (int i = 0; i < primaryKeyIndexes.length; i++) { + if (primaryKeyIndexes[i] == partitionKeyIndex) { + return i; + } + } + throw new TableException( + "Lake fallback lookup requires auto partition key to be part of primary key."); + } + + private int findAutoPartitionKeyIndex() { + if (autoPartitionKey == null) { + return partitionKeyIndexes[0]; + } + + List fieldNames = flinkRowType.getFieldNames(); + for (int partitionKeyIndex : partitionKeyIndexes) { + if (fieldNames.get(partitionKeyIndex).equals(autoPartitionKey)) { + return partitionKeyIndex; + } + } + throw new TableException( + "Lake fallback lookup requires auto partition key '" + + autoPartitionKey + + "' to be one of the partition keys."); + } + + private void validateLookupShape() { + if (partitionKeyIndexes.length == 0) { + throw new TableException("Lake fallback lookup requires a partitioned table."); + } + if (lakeFallbackExecutorThreads <= 0 || lakeFallbackMaxConcurrency <= 0) { + throw new TableException("Lake fallback lookup executor settings must be positive."); + } + if (lakeFallbackExecutorThreads > lakeFallbackMaxConcurrency) { + throw new TableException( + "Option 'lookup.lake-fallback.executor-threads' must not exceed " + + "'lookup.lake-fallback.max-concurrency'."); + } + org.apache.fluss.types.DataType partitionType = + flussFullRowType.getTypeAt(partitionKeyIndexes[0]); + if (partitionType.getTypeRoot() != DataTypeRoot.STRING + && partitionType.getTypeRoot() != DataTypeRoot.CHAR) { + throw new TableException( + "Lake fallback lookup currently requires the partition key to be STRING/CHAR."); + } + if (!flinkRowType + .getTypeAt(partitionKeyIndexes[0]) + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + throw new TableException( + "Lake fallback lookup currently requires the partition key to be a character string."); + } + } + + private int[][] toNestedProjection(int[] projectedFields) { + int[][] nestedProjection = new int[projectedFields.length][1]; + for (int i = 0; i < projectedFields.length; i++) { + nestedProjection[i][0] = projectedFields[i]; + } + return nestedProjection; + } + + private void registerMetrics(@Nullable FunctionContext context) { + if (context == null) { + lookupHotFlussHitsTotal = NoOpCounter.INSTANCE; + lookupHotFlussMissesTotal = NoOpCounter.INSTANCE; + lookupColdFlussHitsTotal = NoOpCounter.INSTANCE; + lookupColdFlussMissesTotal = NoOpCounter.INSTANCE; + lakeFallbackRequestsTotal = NoOpCounter.INSTANCE; + lakeFallbackHitsTotal = NoOpCounter.INSTANCE; + lakeFallbackMissesTotal = NoOpCounter.INSTANCE; + lakeFallbackFailuresTotal = NoOpCounter.INSTANCE; + lakeFallbackTimeoutsTotal = NoOpCounter.INSTANCE; + lakeFallbackRejectedTotal = NoOpCounter.INSTANCE; + lakeFallbackLatencyMs = NoOpHistogram.INSTANCE; + return; + } + + MetricGroup metricGroup = context.getMetricGroup(); + lookupHotFlussHitsTotal = metricGroup.counter(MetricNames.LOOKUP_HOT_FLUSS_HITS_TOTAL); + lookupHotFlussMissesTotal = metricGroup.counter(MetricNames.LOOKUP_HOT_FLUSS_MISSES_TOTAL); + lookupColdFlussHitsTotal = metricGroup.counter(MetricNames.LOOKUP_COLD_FLUSS_HITS_TOTAL); + lookupColdFlussMissesTotal = + metricGroup.counter(MetricNames.LOOKUP_COLD_FLUSS_MISSES_TOTAL); + lakeFallbackRequestsTotal = metricGroup.counter(MetricNames.LAKE_FALLBACK_REQUESTS_TOTAL); + lakeFallbackHitsTotal = metricGroup.counter(MetricNames.LAKE_FALLBACK_HITS_TOTAL); + lakeFallbackMissesTotal = metricGroup.counter(MetricNames.LAKE_FALLBACK_MISSES_TOTAL); + lakeFallbackFailuresTotal = metricGroup.counter(MetricNames.LAKE_FALLBACK_FAILURES_TOTAL); + lakeFallbackTimeoutsTotal = metricGroup.counter(MetricNames.LAKE_FALLBACK_TIMEOUTS_TOTAL); + lakeFallbackRejectedTotal = metricGroup.counter(MetricNames.LAKE_FALLBACK_REJECTED_TOTAL); + lakeFallbackLatencyMs = + metricGroup.histogram( + MetricNames.LAKE_FALLBACK_LATENCY_MS, new SlidingWindowHistogram(1024)); + metricGroup.gauge(MetricNames.LAKE_FALLBACK_PENDING_COUNT, lakeFallbackPendingCount::get); + metricGroup.gauge( + MetricNames.LAKE_FALLBACK_QUEUE_SIZE, + () -> lakeLookupExecutor == null ? 0 : lakeLookupExecutor.getQueue().size()); + } + + @Override + public void close() throws Exception { + LOG.info("Closing hybrid lake async lookup function for table {}.", tablePath); + if (lakeLookupExecutor != null) { + lakeLookupExecutor.shutdownNow(); + } + if (timeoutExecutor != null) { + timeoutExecutor.shutdownNow(); + } + if (table != null) { + table.close(); + } + if (connection != null) { + connection.close(); + } + } + + private enum NoOpCounter implements Counter { + INSTANCE; + + @Override + public void inc() {} + + @Override + public void inc(long n) {} + + @Override + public void dec() {} + + @Override + public void dec(long n) {} + + @Override + public long getCount() { + return 0; + } + } + + private enum NoOpHistogram implements Histogram { + INSTANCE; + + @Override + public void update(long value) {} + + @Override + public long getCount() { + return 0; + } + + @Override + public HistogramStatistics getStatistics() { + return new SlidingWindowHistogramStatistics(new long[0]); + } + } + + private static class SlidingWindowHistogram implements Histogram { + private final long[] values; + private int position; + private long count; + + private SlidingWindowHistogram(int size) { + this.values = new long[size]; + } + + @Override + public synchronized void update(long value) { + values[position] = value; + position = (position + 1) % values.length; + count++; + } + + @Override + public synchronized long getCount() { + return count; + } + + @Override + public synchronized HistogramStatistics getStatistics() { + int size = (int) Math.min(count, values.length); + long[] snapshot = new long[size]; + for (int i = 0; i < size; i++) { + snapshot[i] = values[i]; + } + return new SlidingWindowHistogramStatistics(snapshot); + } + } + + private static class SlidingWindowHistogramStatistics extends HistogramStatistics { + private final long[] values; + + private SlidingWindowHistogramStatistics(long[] values) { + this.values = values; + Arrays.sort(this.values); + } + + @Override + public double getQuantile(double quantile) { + if (values.length == 0) { + return 0.0; + } + int index = (int) Math.ceil(quantile * values.length) - 1; + index = Math.max(0, Math.min(index, values.length - 1)); + return values[index]; + } + + @Override + public long[] getValues() { + return Arrays.copyOf(values, values.length); + } + + @Override + public int size() { + return values.length; + } + + @Override + public double getMean() { + if (values.length == 0) { + return 0.0; + } + long sum = 0; + for (long value : values) { + sum += value; + } + return (double) sum / values.length; + } + + @Override + public double getStdDev() { + if (values.length <= 1) { + return 0.0; + } + double mean = getMean(); + double sum = 0.0; + for (long value : values) { + double delta = value - mean; + sum += delta * delta; + } + return Math.sqrt(sum / values.length); + } + + @Override + public long getMax() { + return values.length == 0 ? 0 : values[values.length - 1]; + } + + @Override + public long getMin() { + return values.length == 0 ? 0 : values[0]; + } + } + + private static class FlussLookupKey { + private final Object[] primaryKeyValues; + private final String partitionValue; + + private FlussLookupKey(Object[] primaryKeyValues, String partitionValue) { + this.primaryKeyValues = primaryKeyValues; + this.partitionValue = partitionValue; + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java index 551d22ac23..b13f16dd58 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkTableFactoryTest.java @@ -17,17 +17,20 @@ package org.apache.fluss.flink.catalog; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.flink.FlinkConnectorOptions; import org.apache.fluss.flink.sink.FlinkTableSink; import org.apache.fluss.flink.source.FlinkTableSource; import org.apache.fluss.flink.source.lookup.FlinkAsyncLookupFunction; import org.apache.fluss.flink.source.lookup.FlinkLookupFunction; +import org.apache.fluss.flink.source.lookup.HybridLakeAsyncLookupFunction; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; @@ -187,6 +190,125 @@ void testLookupSource() { .hasMessageContaining("Full lookup caching is not supported yet."); } + @Test + void testLakeFallbackLookupSource() { + ResolvedSchema schema = createHourlyPartitionedPkSchema(); + Map properties = getLakeFallbackOptions(); + FlinkTableSource tableSource = + (FlinkTableSource) + createTableSource(schema, properties, Collections.singletonList("pt")); + + int[][] lookupKey = {{0}, {1}, {2}}; + LookupTableSource.LookupRuntimeProvider lookupProvider = + tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey)); + assertThat(lookupProvider).isInstanceOf(AsyncLookupFunctionProvider.class); + AsyncLookupFunction asyncLookupFunction = + ((AsyncLookupFunctionProvider) lookupProvider).createAsyncLookupFunction(); + assertThat(asyncLookupFunction).isInstanceOf(HybridLakeAsyncLookupFunction.class); + } + + @Test + void testLakeFallbackLookupSourceValidation() { + ResolvedSchema schema = createHourlyPartitionedPkSchema(); + + Map syncLookupProperties = getLakeFallbackOptions(); + syncLookupProperties.put(FlinkConnectorOptions.LOOKUP_ASYNC.key(), "false"); + assertThatThrownBy( + () -> + ((FlinkTableSource) + createTableSource( + schema, + syncLookupProperties, + Collections.singletonList("pt"))) + .getLookupRuntimeProvider( + new LookupRuntimeProviderContext( + new int[][] {{0}, {1}, {2}}))) + .isInstanceOf(TableException.class) + .hasMessageContaining( + "Option 'lookup.lake-fallback.enabled' requires 'lookup.async' to be true."); + + Map missingHotWindowProperties = getLakeFallbackOptions(); + missingHotWindowProperties.remove(FlinkConnectorOptions.LOOKUP_HOT_WINDOW.key()); + assertThatThrownBy( + () -> + ((FlinkTableSource) + createTableSource( + schema, + missingHotWindowProperties, + Collections.singletonList("pt"))) + .getLookupRuntimeProvider( + new LookupRuntimeProviderContext( + new int[][] {{0}, {1}, {2}}))) + .isInstanceOf(TableException.class) + .hasMessageContaining( + "Option 'lookup.hot-window' must be configured when 'lookup.lake-fallback.enabled' is true."); + + Map cacheProperties = getLakeFallbackOptions(); + cacheProperties.put("lookup.cache", "partial"); + cacheProperties.put(PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), "18000"); + cacheProperties.put(PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), "36000"); + cacheProperties.put(PARTIAL_CACHE_MAX_ROWS.key(), "100000"); + assertThatThrownBy( + () -> + ((FlinkTableSource) + createTableSource( + schema, + cacheProperties, + Collections.singletonList("pt"))) + .getLookupRuntimeProvider( + new LookupRuntimeProviderContext( + new int[][] {{0}, {1}, {2}}))) + .isInstanceOf(TableException.class) + .hasMessageContaining( + "Option 'lookup.lake-fallback.enabled' cannot be used with lookup cache."); + + Map nonLakeProperties = getLakeFallbackOptions(); + nonLakeProperties.put("table.datalake.enabled", "false"); + assertThatThrownBy( + () -> + ((FlinkTableSource) + createTableSource( + schema, + nonLakeProperties, + Collections.singletonList("pt"))) + .getLookupRuntimeProvider( + new LookupRuntimeProviderContext( + new int[][] {{0}, {1}, {2}}))) + .isInstanceOf(TableException.class) + .hasMessageContaining( + "Option 'lookup.lake-fallback.enabled' requires a datalake-enabled Fluss table."); + + Map nonAutoPartitionProperties = getLakeFallbackOptions(); + nonAutoPartitionProperties.put(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(), "false"); + assertThatThrownBy( + () -> + ((FlinkTableSource) + createTableSource( + schema, + nonAutoPartitionProperties, + Collections.singletonList("pt"))) + .getLookupRuntimeProvider( + new LookupRuntimeProviderContext( + new int[][] {{0}, {1}, {2}}))) + .isInstanceOf(TableException.class) + .hasMessageContaining( + "Option 'lookup.lake-fallback.enabled' requires auto partition to be enabled."); + + assertThatThrownBy( + () -> + ((FlinkTableSource) + createTableSource( + schema, + getLakeFallbackOptions(), + Collections.singletonList("pt"))) + .getLookupRuntimeProvider( + new LookupRuntimeProviderContext( + new int[][] {{0}, {2}}))) + .isInstanceOf(TableException.class) + .hasMessageContaining( + "Option 'lookup.lake-fallback.enabled' only supports full primary-key lookup."); + } + @Test void testVirtualLogTableSourceDoesNotSupportBatchMode() { ResolvedSchema schema = createBasicSchema(); @@ -242,6 +364,18 @@ private ResolvedSchema createBasicSchema() { UniqueConstraint.primaryKey("PK_first_third", Arrays.asList("first", "third"))); } + private ResolvedSchema createHourlyPartitionedPkSchema() { + return new ResolvedSchema( + Arrays.asList( + Column.physical("id", DataTypes.INT().notNull()), + Column.physical("sub_id", DataTypes.INT().notNull()), + Column.physical("pt", DataTypes.STRING().notNull()), + Column.physical("value", DataTypes.STRING())), + Collections.emptyList(), + UniqueConstraint.primaryKey( + "PK_id_sub_id_pt", Arrays.asList("id", "sub_id", "pt"))); + } + private ResolvedSchema createBinlogSchema() { return new ResolvedSchema( Arrays.asList( @@ -277,11 +411,35 @@ private static Map getBasicOptionsWithBucketKey() { return basicOptions; } + private static Map getLakeFallbackOptions() { + Map options = getBasicOptions(); + options.put(BUCKET_KEY.key(), "id"); + options.put("table.datalake.enabled", "true"); + options.put("table.datalake.format", "paimon"); + options.put(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(), "true"); + options.put(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT.key(), "HOUR"); + options.put(FlinkConnectorOptions.LOOKUP_ASYNC.key(), "true"); + options.put(FlinkConnectorOptions.LOOKUP_LAKE_FALLBACK_ENABLED.key(), "true"); + options.put(FlinkConnectorOptions.LOOKUP_HOT_WINDOW.key(), "12 h"); + return options; + } + private static DynamicTableSource createTableSource( ResolvedSchema schema, Map options) { return createTableSource(schema, options, Collections.emptyMap()); } + private static DynamicTableSource createTableSource( + ResolvedSchema schema, Map options, List partitionKeys) { + return createTableSource( + OBJECT_IDENTIFIER, + schema, + options, + Collections.emptyMap(), + new Configuration(), + partitionKeys); + } + private static DynamicTableSource createTableSource( ResolvedSchema schema, Map options, @@ -296,6 +454,24 @@ private static DynamicTableSource createTableSource( Map options, Map enrichmentOptions, Configuration configuration) { + return createTableSource( + objectIdentifier, + schema, + options, + enrichmentOptions, + configuration, + schema.getPrimaryKey() + .map(UniqueConstraint::getColumns) + .orElse(Collections.emptyList())); + } + + private static DynamicTableSource createTableSource( + ObjectIdentifier objectIdentifier, + ResolvedSchema schema, + Map options, + Map enrichmentOptions, + Configuration configuration, + List partitionKeys) { FlinkTableFactory tableFactory = createFlinkTableFactory(); FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext( @@ -304,9 +480,7 @@ private static DynamicTableSource createTableSource( CatalogTable.of( Schema.newBuilder().fromResolvedSchema(schema).build(), "mock source", - schema.getPrimaryKey() - .map(UniqueConstraint::getColumns) - .orElse(Collections.emptyList()), + partitionKeys, options), schema), enrichmentOptions, diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/HybridLakeLookupITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/HybridLakeLookupITCase.java new file mode 100644 index 0000000000..236a85b064 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/HybridLakeLookupITCase.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.values.TestingPaimonLakeStoragePlugin; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; +import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket; +import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.utils.clock.ManualClock; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; +import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions; +import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT case for hybrid Fluss lookup with lake fallback. */ +abstract class HybridLakeLookupITCase extends AbstractTestBase { + + private static final ManualClock CLOCK = new ManualClock(System.currentTimeMillis()); + private static final String CATALOG_NAME = "testcatalog"; + private static final String DEFAULT_DB = "defaultdb"; + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setClusterConf( + new Configuration() + .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE) + .set(ConfigOptions.DATALAKE_ENABLED, true) + .set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON)) + .setNumOfTabletServers(3) + .setClock(CLOCK) + .build(); + + private static Connection conn; + private static Admin admin; + private static Configuration clientConf; + + private StreamExecutionEnvironment execEnv; + private StreamTableEnvironment tEnv; + + @BeforeAll + static void beforeAll() { + clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); + } + + @BeforeEach + void before() { + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode()); + + String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS)); + tEnv.executeSql( + String.format( + "create catalog %s with ('type' = 'fluss', '%s' = '%s')", + CATALOG_NAME, ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers)); + tEnv.executeSql("use catalog " + CATALOG_NAME); + tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + tEnv.executeSql("create database " + DEFAULT_DB); + tEnv.useDatabase(DEFAULT_DB); + TestingPaimonLakeStoragePlugin.clear(); + } + + @AfterEach + void after() { + TestingPaimonLakeStoragePlugin.clear(); + tEnv.useDatabase(BUILTIN_DATABASE); + tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); + } + + @Test + void testFlinkSqlLookupFallbackToLakeForColdMiss() throws Exception { + String tableName = "hybrid_lake_lookup_cold"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + String coldPartition = + ZonedDateTime.now(ZoneId.systemDefault()) + .minus(Duration.ofDays(2)) + .format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHH")); + + createHybridLookupTable(tableName); + TestingPaimonLakeStoragePlugin.setRows( + tablePath, Collections.singletonList(row(1, coldPartition, "lake-name"))); + commitReadableLakeSnapshot(tablePath); + createLookupSource(Collections.singletonList(Row.of(1, coldPartition))); + + CloseableIterator collected = tEnv.executeSql(lookupJoinSql(tableName)).collect(); + + assertResultsIgnoreOrder( + collected, + Collections.singletonList("+I[1, " + coldPartition + ", lake-name]"), + true); + assertThat(TestingPaimonLakeStoragePlugin.plannedLookups(tablePath)).isEqualTo(1); + } + + @Test + void testFlinkSqlLookupDoesNotFallbackForHotMiss() throws Exception { + String tableName = "hybrid_lake_lookup_hot"; + TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); + String hotPartition = createHybridLookupTable(tableName); + TestingPaimonLakeStoragePlugin.setRows( + tablePath, Collections.singletonList(row(2, hotPartition, "lake-name"))); + commitReadableLakeSnapshot(tablePath); + createLookupSource(Collections.singletonList(Row.of(2, hotPartition))); + + CloseableIterator collected = tEnv.executeSql(lookupJoinSql(tableName)).collect(); + + assertResultsIgnoreOrder( + collected, Collections.singletonList("+I[2, " + hotPartition + ", null]"), true); + assertThat(TestingPaimonLakeStoragePlugin.plannedLookups(tablePath)).isZero(); + } + + private String createHybridLookupTable(String tableName) throws Exception { + tEnv.executeSql( + String.format( + "create table %s (" + + " id int not null," + + " pt varchar not null," + + " name varchar," + + " primary key (id, pt) NOT ENFORCED" + + ") partitioned by (pt) with (" + + " 'bucket.num' = '1'," + + " 'bucket.key' = 'id'," + + " 'lookup.async' = 'true'," + + " 'lookup.lake-fallback.enabled' = 'true'," + + " 'lookup.hot-window' = '12 h'," + + " 'table.datalake.enabled' = 'true'," + + " 'table.datalake.format' = 'paimon'," + + " 'table.auto-partition.enabled' = 'true'," + + " 'table.auto-partition.time-unit' = 'hour'" + + ")", + tableName)) + .await(); + Map partitionNamesById = + waitUntilPartitions( + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), + TablePath.of(DEFAULT_DB, tableName), + 1); + long tableId = admin.getTableInfo(TablePath.of(DEFAULT_DB, tableName)).get().getTableId(); + for (Long partitionId : partitionNamesById.keySet()) { + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady( + new TableBucket(tableId, partitionId, 0)); + } + return partitionNamesById.values().iterator().next(); + } + + private void createLookupSource(Collection rows) { + Schema sourceSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("pt", DataTypes.STRING()) + .columnByExpression("proc", "PROCTIME()") + .build(); + RowTypeInfo sourceTypeInfo = + new RowTypeInfo( + new TypeInformation[] {Types.INT, Types.STRING}, new String[] {"id", "pt"}); + DataStream source = execEnv.fromCollection(rows).returns(sourceTypeInfo); + tEnv.createTemporaryView("lookup_src", tEnv.fromDataStream(source, sourceSchema)); + } + + private String lookupJoinSql(String tableName) { + return String.format( + "SELECT src.id, src.pt, dim.name " + + "FROM lookup_src AS src " + + "LEFT JOIN %s FOR SYSTEM_TIME AS OF src.proc AS dim " + + "ON src.id = dim.id AND src.pt = dim.pt", + tableName); + } + + private void commitReadableLakeSnapshot(TablePath tablePath) throws Exception { + long tableId = admin.getTableInfo(tablePath).get().getTableId(); + long maxTimestamp = System.currentTimeMillis(); + CommitLakeTableSnapshotRequest request = new CommitLakeTableSnapshotRequest(); + PbLakeTableSnapshotInfo requestForTable = request.addTablesReq(); + requestForTable.setTableId(tableId); + requestForTable.setSnapshotId(1L); + for (PartitionInfo partitionInfo : admin.listPartitionInfos(tablePath).get()) { + PbLakeTableOffsetForBucket lakeTableOffsetForBucket = requestForTable.addBucketsReq(); + lakeTableOffsetForBucket.setPartitionId(partitionInfo.getPartitionId()); + lakeTableOffsetForBucket.setBucketId(0); + lakeTableOffsetForBucket.setLogEndOffset(0L); + lakeTableOffsetForBucket.setMaxTimestamp(maxTimestamp); + } + CoordinatorGateway coordinatorGateway = FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(); + coordinatorGateway.commitLakeTableSnapshot(request).get(); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingPaimonLakeStoragePlugin.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingPaimonLakeStoragePlugin.java new file mode 100644 index 0000000000..a36b65cc75 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingPaimonLakeStoragePlugin.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.values; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TableAlreadyExistException; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.lake.lakestorage.LakeStorage; +import org.apache.fluss.lake.lakestorage.LakeStoragePlugin; +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; +import org.apache.fluss.lake.source.LakeSource; +import org.apache.fluss.lake.source.LakeSplit; +import org.apache.fluss.lake.source.Planner; +import org.apache.fluss.lake.source.RecordReader; +import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.TableChange; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.GenericRecord; +import org.apache.fluss.record.LogRecord; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** Test-only Paimon lake storage plugin for hybrid lake lookup tests. */ +public class TestingPaimonLakeStoragePlugin implements LakeStoragePlugin { + + private static final Map> RECORDS_BY_TABLE = + new ConcurrentHashMap<>(); + private static final Map PLANNED_LOOKUPS_BY_TABLE = + new ConcurrentHashMap<>(); + + public static void setRows(TablePath tablePath, List rows) { + List records = new ArrayList<>(); + for (int i = 0; i < rows.size(); i++) { + records.add(new GenericRecord(i, 0, ChangeType.APPEND_ONLY, rows.get(i))); + } + RECORDS_BY_TABLE.put(tablePath, records); + PLANNED_LOOKUPS_BY_TABLE.put(tablePath, new AtomicInteger()); + } + + public static int plannedLookups(TablePath tablePath) { + AtomicInteger plannedLookups = PLANNED_LOOKUPS_BY_TABLE.get(tablePath); + return plannedLookups == null ? 0 : plannedLookups.get(); + } + + public static void clear() { + RECORDS_BY_TABLE.clear(); + PLANNED_LOOKUPS_BY_TABLE.clear(); + } + + @Override + public String identifier() { + return DataLakeFormat.PAIMON.toString(); + } + + @Override + public LakeStorage createLakeStorage(Configuration configuration) { + return new TestingPaimonLakeStorage(); + } + + private static class TestingPaimonLakeStorage implements LakeStorage { + @Override + public LakeTieringFactory createLakeTieringFactory() { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public LakeCatalog createLakeCatalog() { + return new TestingPaimonLakeCatalog(); + } + + @Override + public LakeSource createLakeSource(TablePath tablePath) { + return new TestingPaimonLakeSource(tablePath); + } + } + + private static class TestingPaimonLakeCatalog implements LakeCatalog { + @Override + public void createTable( + TablePath tablePath, TableDescriptor tableDescriptor, Context context) + throws TableAlreadyExistException {} + + @Override + public void alterTable(TablePath tablePath, List tableChanges, Context context) + throws TableNotExistException {} + } + + private static class TestingPaimonLakeSource implements LakeSource { + private final TablePath tablePath; + + private TestingPaimonLakeSource(TablePath tablePath) { + this.tablePath = tablePath; + } + + @Override + public void withProject(int[][] project) {} + + @Override + public void withLimit(int limit) {} + + @Override + public FilterPushDownResult withFilters(List predicates) { + return FilterPushDownResult.of(predicates, Collections.emptyList()); + } + + @Override + public Planner createPlanner(PlannerContext context) { + return () -> { + PLANNED_LOOKUPS_BY_TABLE + .computeIfAbsent(tablePath, ignored -> new AtomicInteger()) + .incrementAndGet(); + return Collections.singletonList(new TestingPaimonSplit()); + }; + } + + @Override + public RecordReader createRecordReader(ReaderContext context) { + return new RecordReader() { + @Override + public CloseableIterator read() throws IOException { + return CloseableIterator.wrap( + RECORDS_BY_TABLE + .getOrDefault(tablePath, Collections.emptyList()) + .iterator()); + } + }; + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + throw new UnsupportedOperationException("Not implemented."); + } + } + + private static class TestingPaimonSplit implements LakeSplit { + @Override + public int bucket() { + return 0; + } + + @Override + public List partition() { + return Collections.emptyList(); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin b/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin index 3fafafad01..1497369939 100644 --- a/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin +++ b/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin @@ -16,4 +16,5 @@ # limitations under the License. # -org.apache.fluss.lake.values.TestingValuesLakeStoragePlugin \ No newline at end of file +org.apache.fluss.lake.values.TestingValuesLakeStoragePlugin +org.apache.fluss.lake.values.TestingPaimonLakeStoragePlugin