snapshots = null;
+ if (snapshotsProperty != null) {
+ snapshots =
+ (sourceDir != null && SnapshotUtils.isRegex(snapshotsProperty))
+ ? SnapshotUtils.getSnapshotsFromSnapshotPath(
+ sourceDir, gcsOptions.getGcsUtil(), snapshotsProperty)
+ : SnapshotUtils.getSnapshotsFromString(snapshotsProperty);
+ }
+
+ ImportConfig importConfig = new ImportConfig();
+ importConfig.setSourcepath(sourceDir);
+ if (snapshots != null) {
+ importConfig.setSnapshotsFromMap(snapshots);
+ }
+ importConfig.validate();
+ SnapshotUtils.setRestorePath(System.getProperty(RESTORE_PATH_PROPERTY), importConfig);
+
+ return importConfig;
+ }
+
+ @VisibleForTesting
+ static ImportConfig buildImportConfigFromConfigFile(String configFilePath) throws Exception {
+ Gson gson = new GsonBuilder().create();
+ ImportConfig importConfig =
+ gson.fromJson(SnapshotUtils.readFileContents(configFilePath), ImportConfig.class);
+ Preconditions.checkNotNull(importConfig, "ImportConfig parsed from file cannot be null.");
+ importConfig.validate();
+ SnapshotUtils.setRestorePath(importConfig.getRestorepath(), importConfig);
+ return importConfig;
+ }
+
+ @VisibleForTesting
+ /**
+ * Creates a copy of Snasphsot from the source path into restore path.
+ *
+ * @param snapshotConfig - Snapshot Configuration
+ * @throws IOException
+ */
+ static void restoreSnapshot(SnapshotConfig snapshotConfig) throws IOException {
+ Path sourcePath = snapshotConfig.getSourcePath();
+ Path restorePath = snapshotConfig.getRestorePath();
+ Configuration configuration = snapshotConfig.getConfiguration();
+ LOG.info(
+ String.format("RestoreSnapshot - sourcePath:%s restorePath: %s", sourcePath, restorePath));
+ FileSystem fileSystem = sourcePath.getFileSystem(configuration);
+ if (fileSystem.exists(restorePath)) {
+ LOG.info(
+ String.format(
+ "Restore path %s already exists, deleting it for idempotency", restorePath));
+ fileSystem.delete(restorePath, true);
+ }
+ RestoreSnapshotHelper.copySnapshotForScanner(
+ configuration, fileSystem, sourcePath, restorePath, snapshotConfig.getSnapshotName());
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java
index 064736a04b..40747862b7 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java
@@ -15,21 +15,38 @@
*/
package com.google.cloud.bigtable.beam.hbasesnapshots;
-import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
+import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
+import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.beam.TemplateUtils;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.HBaseSnapshotInputConfigBuilder;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.CleanupHBaseSnapshotRestoreFilesFn;
+import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.CleanupRestoredSnapshotsFn;
+import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.RestoreSnapshotFn;
+import com.google.cloud.bigtable.beam.hbasesnapshots.transforms.ListRegions;
+import com.google.cloud.bigtable.beam.hbasesnapshots.transforms.ReadRegions;
import com.google.cloud.bigtable.beam.sequencefiles.HBaseResultToMutationFn;
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob;
import com.google.cloud.bigtable.beam.sequencefiles.Utils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
@@ -37,6 +54,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -63,11 +81,32 @@
* Note that in the case of job failures, the temp files generated in the .restore-$JOB_NAME
* directory under the snapshot export bucket will not get deleted. Hence one need to either launch
* a replacement job with the same jobName to re-run the job or manually delete this directory.
+ * Additionally, it is highly recommended to set a GCS Lifecycle TTL (e.g., 7 days) on the bucket
+ * used for the restore path to automatically clean up any orphaned files.
+ *
+ * Running Parallel Sharded Jobs: To import a snapshot in parallel shards (using {@code
+ * --numShards} and {@code --shardIndex}), you must first run a single restore step to copy the
+ * files to the restore path (e.g., using this job with {@code --performOnlyRestoreStep=true} or
+ * using {@link HBaseSnapshotRestoreTool}). Once the restore is complete, launch the parallel
+ * sharded import jobs concurrently, making sure to set {@code --skipRestoreStep=true} and {@code
+ * --deleteRestoredSnapshots=false} on all shards. This prevents concurrent shards from deleting the
+ * restore path while other shards are still reading.
*/
@InternalExtensionOnly
public class ImportJobFromHbaseSnapshot {
private static final Log LOG = LogFactory.getLog(ImportJobFromHbaseSnapshot.class);
+ @VisibleForTesting
+ static final String MISSING_SNAPSHOT_SOURCEPATH =
+ "Source Path containing hbase snapshots must be specified.";
+
+ @VisibleForTesting
+ static final String MISSING_SNAPSHOT_NAMES =
+ "Snapshots must be specified. Allowed values are '*' (indicating all snapshots under source"
+ + " path) or 'prefix*' (snapshots matching certain prefix) or"
+ + " 'snapshotname1:tablename1,snapshotname2:tablename2' (comma seperated list of"
+ + " snapshots)";
+
public interface ImportOptions extends ImportJob.ImportOptions {
@Description("The HBase root dir where HBase snapshot files resides.")
String getHbaseSnapshotSourceDir();
@@ -87,24 +126,298 @@ public interface ImportOptions extends ImportJob.ImportOptions {
@SuppressWarnings("unused")
void setEnableSnappy(Boolean enableSnappy);
+
+ @Description("Path to config file containing snapshot source path/snapshot names.")
+ String getImportConfigFilePath();
+
+ void setImportConfigFilePath(String value);
+
+ @Description(
+ "Snapshots to be imported. Can be '*', 'prefix*' or 'snap1,snap2' or"
+ + " 'snap1:table1,snap2:table2'.")
+ String getSnapshots();
+
+ void setSnapshots(String value);
+
+ @Description("Specifies whether to use dynamic splitting while reading hbase region.")
+ @Default.Boolean(true)
+ boolean getUseDynamicSplitting();
+
+ void setUseDynamicSplitting(boolean value);
+
+ @Description("Specifies the threshold for number of cells per mutation written.")
+ @Default.Integer(100_000 - 1)
+ int getMaxMutationsPerRequestThreshold();
+
+ void setMaxMutationsPerRequestThreshold(int value);
+
+ @Description(
+ "Specifies whether to filter large rows that exceed FilterLargeRowsThresholdBytes should be"
+ + " logged and dropped.")
+ @Default.Boolean(false)
+ boolean getFilterLargeRows();
+
+ void setFilterLargeRows(boolean value);
+
+ @Description(
+ "Specifies the size in bytes of a row that should be logged and dropped before loading to"
+ + " Bigtable.")
+ @Default.Long(256 * 1024 * 1024)
+ long getFilterLargeRowsThresholdBytes();
+
+ void setFilterLargeRowsThresholdBytes(long value);
+
+ @Description(
+ "Specifies whether to filter large cells that exceed FilterLargeCellsThresholdBytes should"
+ + " be logged and dropped.")
+ @Default.Boolean(true)
+ boolean getFilterLargeCells();
+
+ void setFilterLargeCells(boolean value);
+
+ @Description(
+ "Specifies the size in bytes of a cell that should be logged and dropped before loading to"
+ + " Bigtable.")
+ @Default.Integer(100 * 1024 * 1024)
+ int getFilterLargeCellsThresholdBytes();
+
+ void setFilterLargeCellsThresholdBytes(int value);
+
+ @Description(
+ "Specifies whether to filter large row keys that exceed FilterLargeRowKeysThresholdBytes"
+ + " should be logged and dropped.")
+ @Default.Boolean(false)
+ boolean getFilterLargeRowKeys();
+
+ void setFilterLargeRowKeys(boolean value);
+
+ @Description(
+ "Drops wide rows exceeding MaxMutationsPerRequestThreshold to prevent atomicity loss from"
+ + " splitting (losing data), otherwise splits them to preserve data.")
+ @Default.Boolean(false)
+ boolean getFilterWideRows();
+
+ void setFilterWideRows(boolean value);
+
+ @Description(
+ "Specifies the size in bytes of a row key that should be logged and dropped before loading"
+ + " to Bigtable.")
+ @Default.Integer(4 * 1024)
+ int getFilterLargeRowKeysThresholdBytes();
+
+ void setFilterLargeRowKeysThresholdBytes(int value);
+
+ @Description(
+ "Specifies the number of shards to use when loading the snapshot. "
+ + "If set, shardIndex must also be set.")
+ Integer getNumShards();
+
+ void setNumShards(Integer value);
+
+ @Description("Specifies the shard index from [0, numShards) that this load represents.")
+ Integer getShardIndex();
+
+ void setShardIndex(Integer value);
+
+ @Description("Specifies the path to the restored Snapshot files.")
+ String getRestorePath();
+
+ void setRestorePath(String value);
+
+ @Description(
+ "Specifies whether the snapshots restored should be deleted. Note: Cleanup is best-effort"
+ + " and will not fail the job if deletion fails after retries. When running parallel"
+ + " sharded jobs concurrently, this must be set to false to prevent a shard from"
+ + " deleting the files while other shards are still reading.")
+ @Default.Boolean(false)
+ Boolean getDeleteRestoredSnapshots();
+
+ void setDeleteRestoredSnapshots(Boolean value);
+
+ @Description(
+ "Specifies whether the restore step should be skipped. When running parallel sharded jobs"
+ + " concurrently, this must be set to true (after running a single separate restore"
+ + " step beforehand) to prevent concurrent restore steps from deleting and corrupting"
+ + " active files.")
+ @Default.Boolean(false)
+ Boolean getSkipRestoreStep();
+
+ void setSkipRestoreStep(Boolean value);
+
+ @Description("Specifies whether to perform only restore step.")
+ @Default.Boolean(false)
+ Boolean getPerformOnlyRestoreStep();
+
+ void setPerformOnlyRestoreStep(Boolean value);
}
public static void main(String[] args) throws Exception {
PipelineOptionsFactory.register(ImportOptions.class);
- ImportOptions opts =
+ ImportOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportOptions.class);
+ // To determine the Google Cloud Storage file scheme (gs://)
+ FileSystems.setDefaultPipelineOptions(options);
+
LOG.info("Building Pipeline");
- Pipeline pipeline = buildPipeline(opts);
+ Pipeline pipeline = null;
+ ImportConfig importConfig = null;
+ // Maintain Backward compatibility until deprecation
+ if (options.getSnapshotName() != null && !options.getSnapshotName().isEmpty()) {
+ pipeline = buildPipeline(options);
+ } else {
+ importConfig =
+ options.getImportConfigFilePath() != null
+ ? buildImportConfigFromConfigFile(options.getImportConfigFilePath())
+ : buildImportConfigFromPipelineOptions(options, options.as(GcsOptions.class));
+
+ LOG.info(
+ String.format(
+ "SourcePath:%s, RestorePath:%s",
+ importConfig.getSourcepath(), importConfig.getRestorepath()));
+ pipeline = buildPipelineWithMultipleSnapshots(options, importConfig);
+ }
+
LOG.info("Running Pipeline");
PipelineResult result = pipeline.run();
-
- if (opts.getWait()) {
+ if (options.getWait()) {
Utils.waitForPipelineToFinish(result);
}
}
+ @VisibleForTesting
+ static ImportConfig buildImportConfigFromConfigFile(String configFilePath) throws Exception {
+ Gson gson = new GsonBuilder().create();
+ ImportConfig importConfig =
+ gson.fromJson(SnapshotUtils.readFileContents(configFilePath), ImportConfig.class);
+ importConfig.validate();
+ SnapshotUtils.setRestorePath(importConfig.getRestorepath(), importConfig);
+ return importConfig;
+ }
+
+ @VisibleForTesting
+ static ImportConfig buildImportConfigFromPipelineOptions(
+ ImportOptions options, GcsOptions gcsOptions) throws IOException {
+ String sourceDir = options.getHbaseSnapshotSourceDir();
+ String snapshotsProperty = options.getSnapshots();
+ Map snapshots = null;
+ if (snapshotsProperty != null) {
+ snapshots =
+ (sourceDir != null && SnapshotUtils.isRegex(snapshotsProperty))
+ ? SnapshotUtils.getSnapshotsFromSnapshotPath(
+ sourceDir, gcsOptions.getGcsUtil(), snapshotsProperty)
+ : SnapshotUtils.getSnapshotsFromString(snapshotsProperty);
+ }
+
+ ImportConfig importConfig = new ImportConfig();
+ importConfig.setSourcepath(sourceDir);
+ if (snapshots != null) {
+ importConfig.setSnapshotsFromMap(snapshots);
+ }
+ importConfig.validate();
+ SnapshotUtils.setRestorePath(options.getRestorePath(), importConfig);
+ return importConfig;
+ }
+
+ /**
+ * Builds the pipeline that supports loading multiple snapshots to BigTable.
+ *
+ * @param options - Pipeline options
+ * @param importConfig - Configuration representing snapshot source path, list of snapshots etc
+ * @return
+ * @throws Exception
+ */
+ static Pipeline buildPipelineWithMultipleSnapshots(
+ ImportOptions options, ImportConfig importConfig) throws Exception {
+ Map configurations =
+ SnapshotUtils.getConfiguration(
+ options.getRunner().getSimpleName(),
+ options.getProject(),
+ importConfig.getSourcepath(),
+ importConfig.getHbaseConfiguration());
+
+ List snapshotConfigs =
+ SnapshotUtils.buildSnapshotConfigs(
+ importConfig.getSnapshots(),
+ configurations,
+ options.getProject(),
+ importConfig.getSourcepath(),
+ importConfig.getRestorepath());
+ DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
+ // Disable GC thrashing detection to prevent Dataflow from prematurely killing
+ // workers during memory-intensive HBase snapshot scans.
+ debugOptions.setGCThrashingPercentagePerPeriod(100.00);
+
+ Pipeline pipeline = Pipeline.create(debugOptions);
+
+ PCollection restoredSnapshots =
+ pipeline.apply("Read Snapshot Configs", Create.of(snapshotConfigs));
+ if (!options.getSkipRestoreStep()) {
+ restoredSnapshots =
+ restoredSnapshots.apply("Restore Snapshots", ParDo.of(new RestoreSnapshotFn()));
+ }
+ if (options.getPerformOnlyRestoreStep()) {
+ return pipeline;
+ }
+ // Read records from hbase region files and write to Bigtable
+ PCollection>> hbaseRecords =
+ restoredSnapshots
+ .apply("List Regions", new ListRegions())
+ .apply(
+ "Read Regions",
+ new ReadRegions(
+ options.getUseDynamicSplitting(),
+ options.getMaxMutationsPerRequestThreshold(),
+ options.getFilterLargeRows(),
+ options.getFilterLargeRowsThresholdBytes(),
+ options.getFilterLargeCells(),
+ options.getFilterLargeCellsThresholdBytes(),
+ options.getFilterLargeRowKeys(),
+ options.getFilterLargeRowKeysThresholdBytes(),
+ options.getFilterWideRows(),
+ options.getNumShards(),
+ options.getShardIndex()));
+
+ options.setBigtableTableId(ValueProvider.StaticValueProvider.of("NA"));
+ CloudBigtableTableConfiguration bigtableConfiguration =
+ TemplateUtils.buildImportConfig(options, "HBaseSnapshotImportJob");
+ if (importConfig.getBigtableConfiguration() != null) {
+ CloudBigtableTableConfiguration.Builder builder = bigtableConfiguration.toBuilder();
+ for (Map.Entry entry : importConfig.getBigtableConfiguration().entrySet()) {
+ builder = builder.withConfiguration(entry.getKey(), entry.getValue());
+ }
+ bigtableConfiguration = builder.build();
+ }
+
+ hbaseRecords.apply(
+ "Write to BigTable", CloudBigtableIO.writeToMultipleTables(bigtableConfiguration));
+
+ // Clean up all the temporary restored snapshot HLinks after reading all the data
+ if (options.getDeleteRestoredSnapshots()) {
+ restoredSnapshots
+ .apply(Wait.on(hbaseRecords))
+ .apply(
+ "Clean restored files",
+ ParDo.of(
+ new CleanupRestoredSnapshotsFn(
+ importConfig.getBackoffInitialIntervalInMillis(),
+ importConfig.getBackoffMaxIntervalInMillis(),
+ importConfig.getBackoffMaxretries())));
+ }
+
+ return pipeline;
+ }
+
+ /**
+ * Builds the pipeline that supports loading single snapshot to BigTable. Maintained for backward
+ * compatiablity and will be deprecated merging the functionality to
+ * buildPipelineWithMultipleSnapshots method.
+ *
+ * @param opts - Pipeline options
+ * @return
+ * @throws Exception
+ */
@VisibleForTesting
static Pipeline buildPipeline(ImportOptions opts) throws Exception {
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java
new file mode 100644
index 0000000000..e2f4e2ecf9
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java
@@ -0,0 +1,436 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots;
+
+import com.google.api.core.InternalApi;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.common.base.Joiner;
+import com.google.common.io.CharStreams;
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains various helper methods to handle different tasks associated with importing of hbase
+ * snapshots
+ */
+@InternalApi("For internal usage only")
+public class SnapshotUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotUtils.class);
+ private static final String DIRECTRUNNER = "DirectRunner";
+ private static final String SNAPSHOT_MANIFEST_DIRECTORY = ".hbase-snapshot";
+ private static final String GCS_SCHEME = "gs";
+ private static final Sleeper sleeper = Sleeper.DEFAULT;
+
+ private SnapshotUtils() {}
+
+ private static String getParentDirectory(String hbaseSnapshotSourceDirectory) {
+ URI hbaseSnapshotSourceUri;
+ try {
+ hbaseSnapshotSourceUri = new URI(hbaseSnapshotSourceDirectory);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(
+ String.format(
+ "Invalid file path format for snapshot source directory: %s. Valid paths should have"
+ + " file scheme (gs://, file://)",
+ hbaseSnapshotSourceDirectory));
+ }
+
+ if (hbaseSnapshotSourceUri.getScheme() != null
+ && hbaseSnapshotSourceUri.getScheme().equals(GCS_SCHEME)) // i.e Cloud Storage file system
+ {
+ return GcsPath.fromUri(hbaseSnapshotSourceUri).getParent().toString();
+ }
+
+ return new File(hbaseSnapshotSourceDirectory).getParent();
+ }
+
+ static String removeSuffixSlashIfExists(String directory) {
+ return directory.endsWith("/") ? directory.substring(0, directory.length() - 1) : directory;
+ }
+
+ static String appendCurrentTimestamp(String directory) {
+ DateTimeFormatter formatter =
+ DateTimeFormatter.ofPattern("yyyyMMddHHmm").withZone(ZoneId.of("UTC"));
+ String uuid = UUID.randomUUID().toString();
+ return String.join(
+ "/", removeSuffixSlashIfExists(directory), formatter.format(Instant.now()) + "-" + uuid);
+ }
+
+ static String getNamedDirectory(String sourceDirectory, String subFoldername) {
+ String parentDirectory = getParentDirectory(sourceDirectory);
+ if (parentDirectory == null) {
+ throw new IllegalArgumentException(
+ "Source directory has no parent directory: " + sourceDirectory);
+ }
+ parentDirectory = removeSuffixSlashIfExists(parentDirectory);
+ return appendCurrentTimestamp(String.join("/", parentDirectory, subFoldername));
+ }
+
+ /** Builds the configuration combining default and user provided values. */
+ static Map getConfiguration(
+ String runner,
+ String project,
+ String sourcedir,
+ @Nullable Map hbaseConfiguration) {
+ Map configurations = new HashMap<>();
+
+ configurations.put(HConstants.HBASE_DIR, sourcedir);
+ configurations.put(
+ "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
+ configurations.put("fs.gs.project.id", project);
+ configurations.put("google.cloud.auth.service.account.enable", "true");
+
+ if (runner == null || runner.equals(DIRECTRUNNER)) {
+ // https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
+ configurations.put("fs.gs.auth.type", "APPLICATION_DEFAULT");
+ }
+
+ // Update the default configurations with user supplied configuration values
+ if (hbaseConfiguration != null) {
+ configurations.putAll(hbaseConfiguration);
+ }
+ return configurations;
+ }
+
+ public static Configuration getHBaseConfiguration(Map configurations) {
+ if (configurations == null) {
+ configurations = new java.util.HashMap<>();
+ }
+ return createHbaseConfiguration(configurations);
+ }
+
+ private static Configuration createHbaseConfiguration(Map configurations) {
+ LOG.info("Create HBase Configuration instance");
+ Configuration hbaseConfiguration = HBaseConfiguration.create();
+ for (Map.Entry entry : configurations.entrySet())
+ hbaseConfiguration.set(entry.getKey(), entry.getValue());
+ return hbaseConfiguration;
+ }
+
+ /**
+ * Build Snapshot Configurations.
+ *
+ * @param snapshotdetails - Snapshot details representing hbase snapshot name and corresponding
+ * bigtable table name.
+ * @param configurations - BigTable Configurations
+ * @param projectId - Google Cloud Project Id
+ * @param sourcePath - Source path containing snapshot files
+ * @param restorePath - Path snapshot files gets stored during job runs.
+ * @return
+ */
+ static List buildSnapshotConfigs(
+ List snapshotdetails,
+ Map configurations,
+ String projectId,
+ String sourcePath,
+ String restorePath) {
+
+ return snapshotdetails.stream()
+ .map(
+ snapshotInfo ->
+ SnapshotConfig.builder()
+ .setProjectId(projectId)
+ .setSourceLocation(sourcePath)
+ .setRestoreLocation(restorePath + "/" + snapshotInfo.getSnapshotName())
+ .setSnapshotName(snapshotInfo.getSnapshotName())
+ .setTableName(snapshotInfo.getbigtableTableName())
+ .setConfigurationDetails(configurations)
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ public static BackOff createBackOff(
+ long backoffInitialIntervalInMillis, long backoffMaxIntervalInMillis, int maxRetries) {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(backoffInitialIntervalInMillis))
+ .withMaxRetries(maxRetries)
+ .withMaxBackoff(Duration.millis(backoffMaxIntervalInMillis))
+ .backoff();
+ }
+
+ /**
+ * Creates restore path based on the input configuration
+ *
+ * @param importConfig - Job Configuration
+ */
+ public static void setRestorePath(ImportConfig importConfig) {
+ importConfig.setRestorepath(
+ formatRestorePath(importConfig.getRestorepath(), importConfig.getSourcepath()));
+ }
+
+ /**
+ * Creates restore path based on the input configuration
+ *
+ * @param restorePath - Restore path of the job.
+ * @param importConfig - Import config where we will set the restorePath property.
+ */
+ public static void setRestorePath(String restorePath, ImportConfig importConfig) {
+ if (restorePath != null) {
+ importConfig.setRestorepath(restorePath);
+ return;
+ }
+ importConfig.setRestorepath(
+ formatRestorePath(importConfig.getRestorepath(), importConfig.getSourcepath()));
+ }
+
+ /**
+ * Parses the provided input to generate snapshot names and corresponding bigtable names. For
+ * single snapshot names the following are valid formats: If both Snapshotname and bigtablename
+ * are same then only snapshotname can be provided If bigtablename is different then should be
+ * provided in the format snapshotname:bigtablename Multiple snapshots can be provided in
+ * snapshot1:table1,snapshot2:table2 format or snapshot1,snapshot2 format
+ *
+ * @param snapshotNames - Snapshot names and corresponding bigtable table names.
+ */
+ public static Map getSnapshotsFromString(String snapshotNames) {
+ Map snapshots = new HashMap<>();
+ for (String snapshotInfo : snapshotNames.split(",")) {
+ snapshotInfo = snapshotInfo.trim();
+ if (snapshotInfo.isEmpty()) {
+ continue;
+ }
+ String[] snapshotWithTableName = snapshotInfo.split(":");
+ if (snapshotWithTableName.length == 2) {
+ String snap = snapshotWithTableName[0].trim();
+ String table = snapshotWithTableName[1].trim();
+ if (snap.isEmpty() || table.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Snapshot name and table name cannot be empty: " + snapshotInfo);
+ }
+ snapshots.put(snap, table);
+ } else if (snapshotWithTableName.length == 1) {
+ String snap = snapshotWithTableName[0].trim();
+ if (snap.isEmpty()) {
+ throw new IllegalArgumentException("Snapshot name cannot be empty: " + snapshotInfo);
+ }
+ snapshots.put(snap, snap);
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid specification format for snapshots. Expected format is"
+ + " snapshot1:table1,snapshot2:table2");
+ }
+ }
+ return snapshots;
+ }
+
+ public static String formatRestorePath(String providedPath, String hbaseSnapshotsPath) {
+ return providedPath == null
+ ? SnapshotUtils.getNamedDirectory(hbaseSnapshotsPath, "restore")
+ : SnapshotUtils.appendCurrentTimestamp(providedPath);
+ }
+
+ /**
+ * Read list of Snapshot names from Snapshot Source Path
+ *
+ * @param importSnapshotpath - Path representing the snapshot source directory
+ * @param gcsUtil - GCS Instance
+ * @param prefix - Specific prefix to be matched or '*' for all files.
+ * @return
+ * @throws IOException
+ */
+ public static Map getSnapshotsFromSnapshotPath(
+ String importSnapshotpath, GcsUtil gcsUtil, String prefix) throws IOException {
+
+ importSnapshotpath =
+ Joiner.on("/")
+ .join(removeSuffixSlashIfExists(importSnapshotpath), SNAPSHOT_MANIFEST_DIRECTORY);
+
+ // Determine the filesystem scheme. The import path can be a GCS path (gs://) or a local
+ // filesystem path (file:// or raw absolute/relative path) to support running the offline
+ // HBaseSnapshotRestoreTool and running local tests.
+ URI uri;
+ try {
+ uri = new URI(importSnapshotpath);
+ } catch (URISyntaxException e) {
+ uri = null;
+ }
+ boolean isGcs = uri != null && GCS_SCHEME.equalsIgnoreCase(uri.getScheme());
+ Map snapshots = new HashMap<>();
+
+ if (isGcs) {
+ // Build GCS path from given string e.g:
+ // gs://sym-bucket/snapshots/20220309230526/.hbase-snapshot
+ GcsPath gcsPath = GcsPath.fromUri(importSnapshotpath);
+ String gcsPrefix = gcsPath.getObject();
+ Pattern prefixPattern = compilePrefixPattern(prefix);
+
+ // Optimize prefix matching by passing simple globs or literals to GCS API directly,
+ // reducing the number of objects fetched and processed in memory.
+ if (prefix.equals("*")) {
+ // List all
+ } else if (prefix.endsWith("*")
+ && !prefix.substring(0, prefix.length() - 1).contains("*")
+ && !prefix.contains("+")
+ && !prefix.contains("?")) {
+ // Simple glob like "prefix*"
+ String literal = prefix.substring(0, prefix.length() - 1);
+ gcsPrefix = gcsPrefix + "/" + literal;
+ } else if (!isRegex(prefix)) {
+ // Literal match
+ gcsPrefix = gcsPrefix + "/" + prefix;
+ }
+
+ List allObjects = new ArrayList<>();
+ String pageToken = null;
+ do {
+ Objects objects = gcsUtil.listObjects(gcsPath.getBucket(), gcsPrefix, pageToken);
+ if (objects == null || objects.getItems() == null) {
+ break;
+ }
+ allObjects.addAll(objects.getItems());
+ pageToken = objects.getNextPageToken();
+ } while (pageToken != null);
+
+ if (allObjects.isEmpty()) {
+ LOG.warn("Snapshot path {} does not contain any snapshots", importSnapshotpath);
+ return snapshots;
+ }
+
+ // Build a pattern for object portion e.g if path is
+ // gs://sym-bucket/snapshots/20220309230526/.hbase-snapshot
+ // the object portion would be snapshots/60G/20220309230526/.hbase-snapshot
+ Pattern pathPattern =
+ Pattern.compile(String.format("%s/(.+?/)", Pattern.quote(gcsPath.getObject())));
+ Matcher pathMatcher = null;
+ String snapshotName = null;
+ for (StorageObject object : allObjects) {
+ if (object == null || object.getName() == null) {
+ continue;
+ }
+ pathMatcher = pathPattern.matcher(object.getName());
+ if (pathMatcher.find()) {
+ // Group 1 represents the snapshot directory name along with suffix slash (e.g:
+ // snapshot1/)
+ snapshotName = pathMatcher.group(1).replace("/", "");
+ if (prefix.equals("*") || prefixPattern.matcher(snapshotName).find()) {
+ snapshots.put(snapshotName, snapshotName);
+ }
+ }
+ }
+ } else {
+ // Use local file system to find snapshots
+ File manifestDir =
+ (uri != null && uri.getScheme() != null && uri.getScheme().equalsIgnoreCase("file"))
+ ? new File(uri.getPath())
+ : new File(importSnapshotpath);
+
+ if (!manifestDir.exists() || !manifestDir.isDirectory()) {
+ LOG.warn("Snapshot path {} does not exist or is not a directory", importSnapshotpath);
+ return snapshots;
+ }
+ File[] files = manifestDir.listFiles();
+ if (files == null || files.length == 0) {
+ LOG.warn("Snapshot path {} does not contain any snapshots", importSnapshotpath);
+ return snapshots;
+ }
+
+ Pattern prefixPattern = compilePrefixPattern(prefix);
+ for (File file : files) {
+ if (file.isDirectory()) {
+ String snapshotName = file.getName();
+ if (prefix.equals("*") || prefixPattern.matcher(snapshotName).find()) {
+ snapshots.put(snapshotName, snapshotName);
+ }
+ }
+ }
+ }
+
+ return snapshots;
+ }
+
+ private static Pattern compilePrefixPattern(String prefix) {
+ if (prefix.equals("*")) {
+ return null;
+ } else if (prefix.endsWith("*")
+ && !prefix.substring(0, prefix.length() - 1).contains("*")
+ && !prefix.contains("+")
+ && !prefix.contains("?")) {
+ // Simple glob like "prefix*"
+ String literal = prefix.substring(0, prefix.length() - 1);
+ return Pattern.compile("^" + Pattern.quote(literal) + ".*");
+ } else if (!isRegex(prefix)) {
+ // Literal match
+ return Pattern.compile("^" + Pattern.quote(prefix) + "$");
+ } else {
+ // Complex regex
+ return Pattern.compile(prefix);
+ }
+ }
+
+ /**
+ * Reads the contents of file
+ *
+ * @param filePath - Path of the file.
+ * @return
+ * @throws IOException
+ */
+ public static String readFileContents(String filePath) throws IOException {
+ try (Reader reader =
+ Channels.newReader(
+ FileSystems.open(FileSystems.matchSingleFileSpec(filePath).resourceId()),
+ StandardCharsets.UTF_8.name())) {
+ return CharStreams.toString(reader);
+ }
+ }
+
+ /**
+ * Check if the given value contains any character in given meta characters list
+ *
+ * @param data - text value
+ * @return
+ */
+ public static boolean isRegex(String data) {
+ String[] metaChars = {"*", "+", "?"};
+ return Arrays.stream(metaChars).anyMatch(data::contains);
+ }
+
+ public static Sleeper getSleeper() {
+ return sleeper;
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/RegionConfigCoder.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/RegionConfigCoder.java
new file mode 100644
index 0000000000..17886d4ee1
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/RegionConfigCoder.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.coders;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.RegionConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
+
+/** Implementation of {@link Coder} for encoding and decoding of {@link RegionConfig} */
+@InternalApi("For internal usage only")
+public class RegionConfigCoder extends Coder {
+ private static final VarLongCoder longCoder = VarLongCoder.of();
+
+ private static final Coder snapshotConfigCoder =
+ SerializableCoder.of(SnapshotConfig.class);
+ private static final Coder byteArrayCoder = ByteArrayCoder.of();
+
+ @Override
+ public void encode(RegionConfig value, OutputStream outStream) throws IOException {
+ // 1. Encode SnapshotConfig using standard SerializableCoder
+ snapshotConfigCoder.encode(value.getSnapshotConfig(), outStream);
+
+ // 2. Encode RegionInfo by converting to HBase protobuf and then to byte array
+ HBaseProtos.RegionInfo regionInfo = ProtobufUtil.toRegionInfo(value.getRegionInfo());
+ byteArrayCoder.encode(regionInfo.toByteArray(), outStream);
+
+ // 3. Encode TableDescriptor by converting to HBase protobuf and then to byte array
+ HBaseProtos.TableSchema tableSchema = ProtobufUtil.toTableSchema(value.getTableDescriptor());
+ byteArrayCoder.encode(tableSchema.toByteArray(), outStream);
+
+ // 4. Encode region size using variable-length long coder
+ longCoder.encode(value.getRegionSize(), outStream);
+ }
+
+ @Override
+ public RegionConfig decode(InputStream inStream) throws IOException {
+ // 1. Decode SnapshotConfig
+ SnapshotConfig snapshotConfig = snapshotConfigCoder.decode(inStream);
+
+ // 2. Decode RegionInfo from bytes via HBase protobuf
+ byte[] regionInfoBytes = byteArrayCoder.decode(inStream);
+ RegionInfo regionInfo =
+ ProtobufUtil.toRegionInfo(HBaseProtos.RegionInfo.parseFrom(regionInfoBytes));
+
+ // 3. Decode TableDescriptor from bytes via HBase protobuf
+ byte[] tableSchemaBytes = byteArrayCoder.decode(inStream);
+ TableDescriptor tableDescriptor =
+ ProtobufUtil.toTableDescriptor(TableSchema.parseFrom(tableSchemaBytes));
+
+ // 4. Decode region size
+ Long regionSize = longCoder.decode(inStream);
+
+ return RegionConfig.builder()
+ .setSnapshotConfig(snapshotConfig)
+ .setRegionInfo(regionInfo)
+ .setTableDescriptor(tableDescriptor)
+ .setRegionSize(regionSize)
+ .build();
+ }
+
+ @Override
+ public List extends Coder>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {
+ throw new Coder.NonDeterministicException(
+ this,
+ "RegionConfigCoder is non-deterministic because it encodes SnapshotConfig using"
+ + " SerializableCoder.");
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/package-info.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/package-info.java
new file mode 100644
index 0000000000..c24c1f5d50
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains coders to handle serialization and deserialization for different classes used in the
+ * pipeline.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.coders;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilder.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/HBaseSnapshotInputConfigBuilder.java
similarity index 96%
rename from bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilder.java
rename to bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/HBaseSnapshotInputConfigBuilder.java
index 62b7a81ad5..a243f93edc 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilder.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/HBaseSnapshotInputConfigBuilder.java
@@ -13,8 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.beam.hbasesnapshots;
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+import com.google.api.core.InternalApi;
import com.google.common.base.Preconditions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.commons.logging.Log;
@@ -37,7 +38,8 @@
* hosted in Google Cloud Storage(GCS) bucket via GCS connector. It uses {@link
* TableSnapshotInputFormat} for reading HBase snapshots.
*/
-class HBaseSnapshotInputConfigBuilder {
+@InternalApi("For internal usage only")
+public class HBaseSnapshotInputConfigBuilder {
private static final Log LOG = LogFactory.getLog(HBaseSnapshotInputConfigBuilder.class);
// Batch size used for HBase snapshot scans
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/ImportConfig.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/ImportConfig.java
new file mode 100644
index 0000000000..9b52579795
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/ImportConfig.java
@@ -0,0 +1,319 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+
+import com.google.api.core.InternalApi;
+import com.google.common.base.Preconditions;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Class representing the job configuration loading the different input values and combinations of
+ * snapshot names (such as all snapshots or matching prefix or explicit names) provided.
+ */
+@InternalApi("For internal usage only")
+public final class ImportConfig implements Serializable {
+ private final long DEFAULT_BACKOFF_INITIAL_INTERVAL_MILLIS = 5000; // 5 seconds
+ private final long DEFAULT_BACKOFF_MAX_INTERVAL_MILLIS = 3 * 60 * 1000; // 180 seconds
+ private final int DEFAULT_BACKOFF_MAX_RETRIES = 3;
+
+ @JsonAdapter(SnapshotInfoJsonAdapter.class)
+ @SerializedName("snapshots")
+ public List snapshotInfos;
+
+ private String sourcepath;
+ private String restorepath;
+ private String runstatuspath;
+ private long backoffInitialIntervalInMillis =
+ DEFAULT_BACKOFF_INITIAL_INTERVAL_MILLIS; // Defaults to 5 seconds
+ private long backoffMaxIntervalInMillis = DEFAULT_BACKOFF_MAX_INTERVAL_MILLIS; // 60 seconds
+ private int backoffMaxretries = DEFAULT_BACKOFF_MAX_RETRIES;
+ private Map hbaseConfiguration;
+ private Map bigtableConfiguration;
+
+ public void setSnapshotsFromMap(Map snapshots) {
+ this.snapshotInfos =
+ snapshots.entrySet().stream()
+ .map(entry -> new SnapshotInfo(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ public String getSourcepath() {
+ return this.sourcepath;
+ }
+
+ public void setSourcepath(String sourcepath) {
+ this.sourcepath = sourcepath;
+ }
+
+ public String getRestorepath() {
+ return restorepath;
+ }
+
+ public void setRestorepath(String restorepath) {
+ this.restorepath = restorepath;
+ }
+
+ public String getRunstatuspath() {
+ return runstatuspath;
+ }
+
+ public void setRunstatuspath(String runstatuspath) {
+ this.runstatuspath = runstatuspath;
+ }
+
+ public long getBackoffInitialIntervalInMillis() {
+ return backoffInitialIntervalInMillis;
+ }
+
+ public void setBackoffInitialIntervalInMillis(long backoffInitialIntervalInMillis) {
+ Preconditions.checkArgument(
+ backoffInitialIntervalInMillis > 0,
+ "backoffInitialIntervalInMillis must be greater than 0: %s",
+ backoffInitialIntervalInMillis);
+ Preconditions.checkArgument(
+ backoffInitialIntervalInMillis <= this.backoffMaxIntervalInMillis,
+ "backoffInitialIntervalInMillis (%s) must be less than or equal to"
+ + " backoffMaxIntervalInMillis (%s)",
+ backoffInitialIntervalInMillis,
+ this.backoffMaxIntervalInMillis);
+ this.backoffInitialIntervalInMillis = backoffInitialIntervalInMillis;
+ }
+
+ public long getBackoffMaxIntervalInMillis() {
+ return this.backoffMaxIntervalInMillis;
+ }
+
+ public void setBackoffMaxIntervalInMillis(long backoffMaxIntervalInMillis) {
+ Preconditions.checkArgument(
+ backoffMaxIntervalInMillis > 0,
+ "backoffMaxIntervalInMillis must be greater than 0: %s",
+ backoffMaxIntervalInMillis);
+ Preconditions.checkArgument(
+ this.backoffInitialIntervalInMillis <= backoffMaxIntervalInMillis,
+ "backoffInitialIntervalInMillis (%s) must be less than or equal to"
+ + " backoffMaxIntervalInMillis (%s)",
+ this.backoffInitialIntervalInMillis,
+ backoffMaxIntervalInMillis);
+ this.backoffMaxIntervalInMillis = backoffMaxIntervalInMillis;
+ }
+
+ public int getBackoffMaxretries() {
+ return this.backoffMaxretries;
+ }
+
+ public void setBackoffMaxretries(int backoffMaxretries) {
+ Preconditions.checkArgument(
+ backoffMaxretries >= 0, "backoffMaxretries must be non-negative: %s", backoffMaxretries);
+ this.backoffMaxretries = backoffMaxretries;
+ }
+
+ public void validate() {
+ Preconditions.checkNotNull(
+ sourcepath, "Source Path containing hbase snapshots must be specified.");
+ Preconditions.checkArgument(
+ !sourcepath.trim().isEmpty(), "Source Path containing hbase snapshots must be specified.");
+ Preconditions.checkArgument(
+ restorepath == null || !restorepath.trim().isEmpty(),
+ "restorepath cannot be empty if specified");
+ Preconditions.checkArgument(
+ runstatuspath == null || !runstatuspath.trim().isEmpty(),
+ "runstatuspath cannot be empty if specified");
+
+ java.net.URI sourceUri;
+ try {
+ sourceUri = new java.net.URI(sourcepath);
+ } catch (java.net.URISyntaxException e) {
+ throw new IllegalArgumentException("sourcepath is not a valid URI: " + sourcepath, e);
+ }
+
+ if (restorepath != null) {
+ try {
+ new java.net.URI(restorepath);
+ } catch (java.net.URISyntaxException e) {
+ throw new IllegalArgumentException("restorepath is not a valid URI: " + restorepath, e);
+ }
+ }
+
+ if (runstatuspath != null) {
+ try {
+ new java.net.URI(runstatuspath);
+ } catch (java.net.URISyntaxException e) {
+ throw new IllegalArgumentException("runstatuspath is not a valid URI: " + runstatuspath, e);
+ }
+ }
+
+ if (restorepath == null) {
+ if (sourceUri.getScheme() != null && sourceUri.getScheme().equalsIgnoreCase("gs")) {
+ String path = sourceUri.getPath();
+ Preconditions.checkArgument(
+ path != null && !path.isEmpty() && !path.equals("/"),
+ "sourcepath must have a parent directory to auto-generate restorepath: %s",
+ sourcepath);
+ } else {
+ Preconditions.checkArgument(
+ new java.io.File(sourcepath).getParent() != null,
+ "sourcepath must have a parent directory to auto-generate restorepath: %s",
+ sourcepath);
+ }
+ }
+
+ Preconditions.checkNotNull(
+ snapshotInfos,
+ "Snapshots must be specified. Allowed values are '*' (indicating all snapshots under source"
+ + " path) or 'prefix*' (snapshots matching certain prefix) or"
+ + " 'snapshotname1:tablename1,snapshotname2:tablename2' (comma seperated list of"
+ + " snapshots)");
+ Preconditions.checkArgument(
+ !snapshotInfos.isEmpty(),
+ "Snapshots must be specified. Allowed values are '*' (indicating all snapshots under source"
+ + " path) or 'prefix*' (snapshots matching certain prefix) or"
+ + " 'snapshotname1:tablename1,snapshotname2:tablename2' (comma seperated list of"
+ + " snapshots)");
+ java.util.Set snapshotNames = new java.util.HashSet<>();
+ for (SnapshotInfo snapshotInfo : snapshotInfos) {
+ Preconditions.checkArgument(
+ snapshotInfo.getSnapshotName() != null
+ && !snapshotInfo.getSnapshotName().trim().isEmpty(),
+ "snapshotName inside snapshots cannot be null or empty");
+ Preconditions.checkArgument(
+ snapshotNames.add(snapshotInfo.getSnapshotName()),
+ "Duplicate snapshot name detected: %s",
+ snapshotInfo.getSnapshotName());
+ Preconditions.checkArgument(
+ snapshotInfo.getbigtableTableName() != null
+ && !snapshotInfo.getbigtableTableName().trim().isEmpty(),
+ "bigtableTableName inside snapshots cannot be null or empty");
+ }
+ Preconditions.checkArgument(
+ backoffInitialIntervalInMillis > 0,
+ "backoffInitialIntervalInMillis must be greater than 0: %s",
+ backoffInitialIntervalInMillis);
+ Preconditions.checkArgument(
+ backoffMaxIntervalInMillis > 0,
+ "backoffMaxIntervalInMillis must be greater than 0: %s",
+ backoffMaxIntervalInMillis);
+ Preconditions.checkArgument(
+ backoffInitialIntervalInMillis <= backoffMaxIntervalInMillis,
+ "backoffInitialIntervalInMillis (%s) must be less than or equal to"
+ + " backoffMaxIntervalInMillis (%s)",
+ backoffInitialIntervalInMillis,
+ backoffMaxIntervalInMillis);
+ Preconditions.checkArgument(
+ backoffMaxretries >= 0, "backoffMaxretries must be non-negative: %s", backoffMaxretries);
+
+ if (hbaseConfiguration != null) {
+ for (Map.Entry entry : hbaseConfiguration.entrySet()) {
+ Preconditions.checkArgument(
+ entry.getKey() != null && !entry.getKey().trim().isEmpty(),
+ "hbaseConfiguration keys cannot be null or empty");
+ Preconditions.checkArgument(
+ entry.getValue() != null, "hbaseConfiguration values cannot be null");
+ }
+ }
+ if (bigtableConfiguration != null) {
+ for (Map.Entry entry : bigtableConfiguration.entrySet()) {
+ Preconditions.checkArgument(
+ entry.getKey() != null && !entry.getKey().trim().isEmpty(),
+ "bigtableConfiguration keys cannot be null or empty");
+ Preconditions.checkArgument(
+ entry.getValue() != null, "bigtableConfiguration values cannot be null");
+ }
+ }
+ }
+
+ public List getSnapshots() {
+ return this.snapshotInfos;
+ }
+
+ public void setSnapshots(List snapshots) {
+ this.snapshotInfos = snapshots;
+ }
+
+ public Map getHbaseConfiguration() {
+ return this.hbaseConfiguration;
+ }
+
+ public void setHbaseConfiguration(Map hbaseConfiguration) {
+ this.hbaseConfiguration = hbaseConfiguration;
+ }
+
+ public Map getBigtableConfiguration() {
+ return bigtableConfiguration;
+ }
+
+ public void setBigtableConfiguration(Map bigtableConfiguration) {
+ this.bigtableConfiguration = bigtableConfiguration;
+ }
+
+ public static class SnapshotInfo implements Serializable {
+ private final String snapshotName;
+ private final String bigtableTableName;
+
+ public SnapshotInfo(String snapshotName, String tableName) {
+ this.snapshotName = snapshotName;
+ this.bigtableTableName = tableName;
+ }
+
+ public String getSnapshotName() {
+ return snapshotName;
+ }
+
+ public String getbigtableTableName() {
+ return bigtableTableName;
+ }
+ }
+
+ static class SnapshotInfoJsonAdapter extends TypeAdapter> {
+
+ @Override
+ public void write(JsonWriter jsonWriter, List snapshotInfos) throws IOException {
+ jsonWriter.beginObject();
+ if (snapshotInfos != null) {
+ snapshotInfos.forEach(
+ snapshotInfo -> {
+ try {
+ jsonWriter.name(snapshotInfo.getSnapshotName());
+ jsonWriter.value(snapshotInfo.getbigtableTableName());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ jsonWriter.endObject();
+ }
+
+ @Override
+ public List read(JsonReader jsonReader) throws IOException {
+ List snapshotInfoList = new ArrayList<>();
+ jsonReader.beginObject();
+ while (jsonReader.hasNext()) {
+ snapshotInfoList.add(new SnapshotInfo(jsonReader.nextName(), jsonReader.nextString()));
+ }
+ jsonReader.endObject();
+ return snapshotInfoList;
+ }
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/RegionConfig.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/RegionConfig.java
new file mode 100644
index 0000000000..8117772a4c
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/RegionConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+
+import com.google.api.core.InternalApi;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.beam.hbasesnapshots.coders.RegionConfigCoder;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+
+/**
+ * A {@link AutoValue} class representing the region configuration enclosing {@link SnapshotConfig},
+ * hbase region info and hbase table descriptor.
+ */
+@DefaultCoder(RegionConfigCoder.class)
+@AutoValue
+@InternalApi("For internal usage only")
+public abstract class RegionConfig {
+ public static Builder builder() {
+ return new AutoValue_RegionConfig.Builder();
+ }
+
+ /**
+ * Returns an optional label for this configuration, typically used for testing or debugging. This
+ * is not the official HBase region name.
+ */
+ @Nullable
+ public abstract String getName();
+
+ public abstract SnapshotConfig getSnapshotConfig();
+
+ public abstract RegionInfo getRegionInfo();
+
+ public abstract TableDescriptor getTableDescriptor();
+
+ public abstract Long getRegionSize();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setName(String value);
+
+ public abstract Builder setSnapshotConfig(SnapshotConfig value);
+
+ public abstract Builder setRegionInfo(RegionInfo value);
+
+ public abstract Builder setTableDescriptor(TableDescriptor value);
+
+ public abstract Builder setRegionSize(Long value);
+
+ public abstract RegionConfig build();
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java
new file mode 100644
index 0000000000..ce2bc0b1da
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+
+import com.google.api.core.InternalApi;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.beam.hbasesnapshots.SnapshotUtils;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/** A {@link AutoValue} class representing the configuration associated with each snapshot. */
+@InternalApi("For internal usage only")
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SnapshotConfig implements Serializable {
+
+ /** Creates a new builder for {@link SnapshotConfig}. */
+ public static Builder builder() {
+ return new AutoValue_SnapshotConfig.Builder();
+ }
+
+ /** Returns the Google Cloud project ID. */
+ public abstract String getProjectId();
+
+ /** Returns the GCS source location of the HBase snapshot. */
+ public abstract String getSourceLocation();
+
+ /** Returns the GCS source path as a Hadoop {@link Path}. */
+ public Path getSourcePath() {
+ return new Path(getSourceLocation());
+ }
+
+ /** Returns the GCS restore path as a Hadoop {@link Path}. */
+ public Path getRestorePath() {
+ return new Path(getRestoreLocation());
+ }
+
+ /** Returns the name of the HBase snapshot. */
+ public abstract String getSnapshotName();
+
+ /** Returns the name of the Bigtable table to import into. */
+ public abstract String getTableName();
+
+ /** Returns the GCS location where the snapshot is restored temporarily. */
+ public abstract String getRestoreLocation();
+
+ /** Returns the additional configuration details as a map. */
+ public abstract Map getConfigurationDetails();
+
+ /** Returns the Hadoop {@link Configuration} derived from configuration details. */
+ public Configuration getConfiguration() {
+ return SnapshotUtils.getHBaseConfiguration(getConfigurationDetails());
+ }
+
+ /** Returns a builder initialized with the current values of this instance. */
+ public abstract Builder toBuilder();
+
+ /** Builder for {@link SnapshotConfig}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /** Sets the Google Cloud project ID. */
+ public abstract Builder setProjectId(String projectId);
+
+ /** Sets the GCS source location of the HBase snapshot. */
+ public abstract Builder setSourceLocation(String value);
+
+ /** Sets the name of the HBase snapshot. */
+ public abstract Builder setSnapshotName(String value);
+
+ /** Sets the name of the Bigtable table to import into. */
+ public abstract Builder setTableName(String value);
+
+ /** Sets the GCS location where the snapshot is restored temporarily. */
+ public abstract Builder setRestoreLocation(String value);
+
+ /** Sets the additional configuration details. */
+ public abstract Builder setConfigurationDetails(Map configuration);
+
+ /** Builds the {@link SnapshotConfig} instance. */
+ public abstract SnapshotConfig build();
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotKey.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotKey.java
new file mode 100644
index 0000000000..64b7059d17
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotKey.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+
+import com.google.api.core.InternalApi;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+
+/**
+ * A lightweight key representing a snapshot and its target table, used to avoid serializing full
+ * configuration details for every row.
+ */
+@InternalApi("For internal usage only")
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SnapshotKey implements Serializable {
+ @SchemaCreate
+ public static SnapshotKey create(String snapshotName, String tableName) {
+ return new AutoValue_SnapshotKey(snapshotName, tableName);
+ }
+
+ public abstract String getSnapshotName();
+
+ public abstract String getTableName();
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/package-info.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/package-info.java
new file mode 100644
index 0000000000..5a7bb988eb
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Package contains configuration classes used in the pipeline. */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFn.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFilesFn.java
similarity index 93%
rename from bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFn.java
rename to bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFilesFn.java
index e0bdca69d5..ea65781bdf 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFn.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFilesFn.java
@@ -13,8 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.beam.hbasesnapshots;
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+import com.google.api.core.InternalApi;
import com.google.api.services.storage.model.Objects;
import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -32,9 +33,27 @@
* A {@link DoFn} that could be used for cleaning up temp files generated during HBase snapshot
* scans in Google Cloud Storage(GCS) bucket via GCS connector.
*/
-class CleanupHBaseSnapshotRestoreFilesFn extends DoFn, Boolean> {
+@InternalApi("For internal usage only")
+public class CleanupHBaseSnapshotRestoreFilesFn extends DoFn, Boolean> {
private static final Log LOG = LogFactory.getLog(CleanupHBaseSnapshotRestoreFilesFn.class);
+ public static String getWorkingBucketName(String hbaseSnapshotDir) {
+ Preconditions.checkArgument(
+ hbaseSnapshotDir.startsWith(GcsPath.SCHEME),
+ "snapshot folder must be hosted in a GCS bucket ");
+
+ return GcsPath.fromUri(hbaseSnapshotDir).getBucket();
+ }
+
+ // getListPrefix convert absolute restorePath in a Hadoop filesystem
+ // to a match prefix in a GCS bucket
+ public static String getListPrefix(String restorePath) {
+ Preconditions.checkArgument(
+ restorePath.startsWith("/"),
+ "restore folder must be an absolute path in current filesystem");
+ return restorePath.substring(1);
+ }
+
@ProcessElement
public void processElement(ProcessContext context) throws IOException {
KV elem = context.element();
@@ -65,20 +84,4 @@ public void processElement(ProcessContext context) throws IOException {
gcsUtil.remove(results);
context.output(true);
}
-
- public static String getWorkingBucketName(String hbaseSnapshotDir) {
- Preconditions.checkArgument(
- hbaseSnapshotDir.startsWith(GcsPath.SCHEME),
- "snapshot folder must be hosted in a GCS bucket ");
-
- return GcsPath.fromUri(hbaseSnapshotDir).getBucket();
- }
- // getListPrefix convert absolute restorePath in a Hadoop filesystem
- // to a match prefix in a GCS bucket
- public static String getListPrefix(String restorePath) {
- Preconditions.checkArgument(
- restorePath.startsWith("/"),
- "restore folder must be an absolute path in current filesystem");
- return restorePath.substring(1);
- }
}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupRestoredSnapshotsFn.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupRestoredSnapshotsFn.java
new file mode 100644
index 0000000000..35332114e2
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupRestoredSnapshotsFn.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} for cleaning up files from restore path generated during job run. */
+@InternalApi("For internal usage only")
+public class CleanupRestoredSnapshotsFn extends DoFn {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CleanupRestoredSnapshotsFn.class);
+ private final Counter failedCleanups =
+ Metrics.counter(CleanupRestoredSnapshotsFn.class, "failed_cleanups");
+
+ private final long initialBackoffMillis;
+ private final long maxBackoffMillis;
+ private final int maxRetries;
+
+ public CleanupRestoredSnapshotsFn(
+ long initialBackoffMillis, long maxBackoffMillis, int maxRetries) {
+ this.initialBackoffMillis = initialBackoffMillis;
+ this.maxBackoffMillis = maxBackoffMillis;
+ this.maxRetries = maxRetries;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element SnapshotConfig snapshotConfig, OutputReceiver outputReceiver)
+ throws IOException {
+
+ Path restorePath = snapshotConfig.getRestorePath();
+ Configuration configuration = snapshotConfig.getConfiguration();
+ FileSystem fileSystem = restorePath.getFileSystem(configuration);
+
+ FluentBackoff backoff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(initialBackoffMillis))
+ .withMaxBackoff(Duration.millis(maxBackoffMillis))
+ .withMaxRetries(maxRetries);
+
+ Sleeper sleeper = getSleeper();
+ BackOff executionBackoff = backoff.backoff();
+
+ while (true) {
+ try {
+ cleanupSnapshot(snapshotConfig, fileSystem, restorePath);
+ return; // Success
+ } catch (IOException ex) {
+ long nextSleep = executionBackoff.nextBackOffMillis();
+
+ if (nextSleep == BackOff.STOP) {
+ LOG.error(
+ "Failed to cleanup snapshot after retries. Manual cleanup required for path: {}",
+ restorePath,
+ ex);
+ failedCleanups.inc();
+ return; // Give up but don't fail the job
+ }
+
+ LOG.warn("Cleanup failed, retrying in {} ms. Error: {}", nextSleep, ex.getMessage());
+ try {
+ sleeper.sleep(nextSleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted during retry sleep", e);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ Sleeper getSleeper() {
+ return Sleeper.DEFAULT;
+ }
+
+ @VisibleForTesting
+ void cleanupSnapshot(SnapshotConfig snapshotConfig, FileSystem fileSystem, Path restorePath)
+ throws IOException {
+ fileSystem.delete(restorePath, true);
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CreateBigtableMutationsFn.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CreateBigtableMutationsFn.java
new file mode 100644
index 0000000000..9e9cf36e82
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CreateBigtableMutationsFn.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotKey;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} class for converting Hbase {@link Result} to list of Hbase {@link Mutation}s. */
+@InternalApi("For internal usage only")
+public class CreateBigtableMutationsFn
+ extends DoFn, KV>> {
+ private static final Logger LOG = LoggerFactory.getLogger(CreateBigtableMutationsFn.class);
+
+ private final Counter droppedRows =
+ Metrics.counter(CreateBigtableMutationsFn.class, "droppedRows");
+ private final Counter droppedCells =
+ Metrics.counter(CreateBigtableMutationsFn.class, "droppedCells");
+ private final Counter droppedRowKeys =
+ Metrics.counter(CreateBigtableMutationsFn.class, "droppedRowKeys");
+ private final int maxMutationsPerRequestThreshold;
+
+ private final boolean filterLargeRows;
+ private final long filterLargeRowThresholdBytes;
+
+ private final boolean filterLargeCells;
+ private final int filterLargeCellThresholdBytes;
+
+ private final boolean filterLargeRowKeys;
+ private final int filterLargeRowKeysThresholdBytes;
+ private final boolean filterWideRows;
+
+ public CreateBigtableMutationsFn(
+ int maxMutationsPerRequestThreshold,
+ boolean filterLargeRows,
+ long filterLargeRowThresholdBytes,
+ boolean filterLargeCells,
+ int filterLargeCellThresholdBytes,
+ boolean filterLargeRowKeys,
+ int filterLargeRowKeysThresholdBytes,
+ boolean filterWideRows) {
+
+ Preconditions.checkArgument(
+ maxMutationsPerRequestThreshold > 0,
+ "maxMutationsPerRequestThreshold must be greater than 0");
+ this.maxMutationsPerRequestThreshold = maxMutationsPerRequestThreshold;
+
+ this.filterLargeRows = filterLargeRows;
+ this.filterLargeRowThresholdBytes = filterLargeRowThresholdBytes;
+
+ this.filterLargeCells = filterLargeCells;
+ this.filterLargeCellThresholdBytes = filterLargeCellThresholdBytes;
+
+ this.filterLargeRowKeys = filterLargeRowKeys;
+ this.filterLargeRowKeysThresholdBytes = filterLargeRowKeysThresholdBytes;
+ this.filterWideRows = filterWideRows;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV element,
+ OutputReceiver>> outputReceiver)
+ throws IOException {
+ if (element == null
+ || element.getKey() == null
+ || element.getValue() == null
+ || element.getValue().isEmpty()) {
+ return;
+ }
+
+ // Extract metadata for routing and logging.
+ String targetTable = element.getKey().getTableName();
+ String snapshotName = element.getKey().getSnapshotName();
+ byte[] rowKey = element.getValue().getRow();
+
+ // Apply filters and chunk cells into Mutations.
+ // Returns null if the entire row should be dropped.
+ List mutations =
+ convertAndValidateThresholds(rowKey, element.getValue().listCells(), snapshotName);
+
+ // Output the mutations mapped to the target table name.
+ if (mutations != null && !mutations.isEmpty()) {
+ outputReceiver.output(KV.of(targetTable, mutations));
+ }
+ }
+
+ /**
+ * Converts a list of HBase cells for a specific row into a list of Bigtable mutations. It also
+ * applies various filtering rules based on row key size, cell size, total row size, and wide row
+ * thresholds.
+ *
+ * The process follows these steps: 1. Validate row key size. 2. Iterate over cells and filter
+ * large cells if enabled. 3. Accumulate total row size and filter large rows if enabled. 4. Chunk
+ * cells into multiple Put requests if they exceed the mutation count threshold.
+ *
+ *
Note on limits: When specific filters (e.g., large row/cell/row key filtering) are disabled
+ * or not met, this method does not fail-fast locally. The decision to reject requests that
+ * violate Bigtable constraints (such as payload size or mutation limits) is deferred to the
+ * underlying Cloud Bigtable client and server to avoid duplicating validation logic.
+ *
+ * @param rowKey the row key of the row being processed
+ * @param cells the list of cells for this row
+ * @param snapshotName the name of the snapshot for logging purposes
+ * @return a list of Mutations (Puts), or null if the entire row should be dropped
+ * @throws IOException if an error occurs
+ */
+ private List convertAndValidateThresholds(
+ byte[] rowKey, List| cells, String snapshotName) throws IOException {
+
+ // 1. Check row key size. We do this first to fail-fast and avoid allocations,
+ // since the row key size is constant and independent of cell processing.
+ if (filterLargeRowKeys && rowKey.length > filterLargeRowKeysThresholdBytes) {
+ LOG.warn(
+ "For snapshot "
+ + snapshotName
+ + ": Dropping row, row key length, "
+ + rowKey.length
+ + ", exceeds filter length threshold, "
+ + filterLargeRowKeysThresholdBytes
+ + ", row key: "
+ + Bytes.toStringBinary(rowKey));
+ droppedRowKeys.inc();
+ return null; // Signal skip
+ }
+
+ List mutations = new ArrayList<>();
+ Put put = null;
+ int chunkCellCount = 0; // Tracks cells in the current Put
+ int totalCellCount = 0; // Tracks total cells in the row
+ long totalByteSize = 0L; // Tracks estimated serialized size of cells in the current row
+ boolean loggedLargeCellForRow = false;
+
+ // Iterate over all cells in the row to build mutations. We apply cell-level and
+ // row-level filters during iteration, and split the row into multiple Put requests
+ // if the number of cells exceeds maxMutationsPerRequestThreshold.
+ for (Cell cell : cells) {
+
+ // 2. Handle large cells first. We do this before accumulating row size so that
+ // cells we intend to drop anyway don't unfairly cause the entire row to exceed
+ // the row size threshold.
+ if (filterLargeCells && cell.getValueLength() > filterLargeCellThresholdBytes) {
+ if (!loggedLargeCellForRow) {
+ LOG.warn(
+ "For snapshot "
+ + snapshotName
+ + ": Dropping large cells for row "
+ + Bytes.toStringBinary(rowKey)
+ + ". At least one cell exceeds threshold "
+ + filterLargeCellThresholdBytes);
+ loggedLargeCellForRow = true;
+ }
+ droppedCells.inc();
+ continue; // Skip this cell, do NOT add to totalByteSize or mutations
+ }
+
+ // 3. Accumulate size and check row size.
+ // Drops the entire row if it exceeds the row size threshold.
+ totalByteSize += CellUtil.estimatedSerializedSizeOf(cell);
+ if (filterLargeRows && totalByteSize > filterLargeRowThresholdBytes) {
+ LOG.warn(
+ "For snapshot "
+ + snapshotName
+ + ": Dropping row, row length, "
+ + totalByteSize
+ + ", exceeds filter length threshold, "
+ + filterLargeRowThresholdBytes
+ + ", row key: "
+ + Bytes.toStringBinary(rowKey));
+ droppedRows.inc();
+ return null; // Signal skip for the entire row
+ }
+
+ // 4. Chunk cells into multiple Put requests to avoid exceeding Bigtable's limit
+ // of 100,000 mutations per request. If filterWideRows is enabled and the total
+ // cells in the row exceed the threshold, we drop the entire row to avoid loss
+ // of atomicity.
+ if (chunkCellCount == maxMutationsPerRequestThreshold || chunkCellCount == 0) {
+ if (totalCellCount > 0 && filterWideRows) {
+ LOG.warn(
+ "For snapshot "
+ + snapshotName
+ + ": Dropping wide row, cell count exceeds threshold "
+ + maxMutationsPerRequestThreshold
+ + ", row key: "
+ + Bytes.toStringBinary(rowKey));
+ droppedRows.inc();
+ return null; // Signal skip
+ }
+ chunkCellCount = 0;
+ put = new Put(rowKey);
+ mutations.add(put);
+ }
+ put.add(cell);
+ chunkCellCount++;
+ totalCellCount++;
+ }
+
+ if (mutations.isEmpty()) {
+ droppedRows.inc();
+ return null;
+ }
+ return mutations;
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/HBaseRegionScanner.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/HBaseRegionScanner.java
new file mode 100644
index 0000000000..63ba782a02
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/HBaseRegionScanner.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.api.core.InternalApi;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A workalike for {@link org.apache.hadoop.hbase.client.ClientSideRegionScanner}.
+ *
+ * It serves the same purpose, but skips block and mobFile cache initialization. Those caches
+ * dont appear to useful for the import job and leak threads on shutdown
+ */
+@InternalApi("For internal usage only")
+public class HBaseRegionScanner implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseRegionScanner.class);
+
+ private static final String DEFAULT_HFILE_BLOCK_CACHE_POLICY = "IndexOnlyLRU";
+ private static final long DEFAULT_HFILE_BLOCK_CACHE_SIZE = 33554432L; // 32 MB
+ private static final int DEFAULT_COMPACTION_THRESHOLD = 10000;
+ private static final long DEFAULT_CACHE_FLUSH_INTERVAL = 0;
+ private static final int DEFAULT_CLIENT_RETRIES = 3;
+
+ private HRegion region;
+ private RegionScanner scanner;
+ private final List| cells;
+ private boolean regionOperationStarted = false;
+ private boolean hasMore = true;
+
+ public HBaseRegionScanner(
+ Configuration originalConf,
+ FileSystem fs,
+ Path rootDir,
+ TableDescriptor htd,
+ RegionInfo hri,
+ Scan scan)
+ throws IOException {
+ Configuration conf = new Configuration(originalConf);
+ conf.set("hfile.block.cache.policy", DEFAULT_HFILE_BLOCK_CACHE_POLICY);
+ // Set a small default block cache size (32MB). Since we are performing a strictly
+ // sequential scan of the snapshot, we read each block once and don't need a large
+ // cache for data blocks. 32MB is sufficient to hold HFile index blocks while avoiding
+ // OutOfMemory errors on memory-constrained Dataflow workers.
+ conf.setIfUnset(
+ "hfile.onheap.block.cache.fixed.size", String.valueOf(DEFAULT_HFILE_BLOCK_CACHE_SIZE));
+ conf.unset("hbase.bucketcache.ioengine");
+ // Setting a huge compaction threshold (10000) effectively disables compactions.
+ // Since snapshots are read-only, compactions are useless, and background threads
+ // can fail to shut down cleanly during DoFn teardown, causing thread leaks.
+ conf.setInt("hbase.hstore.compactionThreshold", DEFAULT_COMPACTION_THRESHOLD);
+ // Set flush interval to 0 to disable periodic flushes.
+ conf.setLong("hbase.regionserver.optionalcacheflushinterval", DEFAULT_CACHE_FLUSH_INTERVAL);
+ conf.setInt("hbase.client.retries.number", DEFAULT_CLIENT_RETRIES);
+
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build();
+ this.region =
+ HRegion.newHRegion(
+ CommonFSUtils.getTableDir(rootDir, htd.getTableName()),
+ (WAL) null,
+ fs,
+ conf,
+ hri,
+ htd,
+ (RegionServerServices) null);
+ this.region.setRestoredRegion(true);
+
+ // Wrap in try-catch to ensure close() is called on failure, avoiding leaks.
+ try {
+ this.region.initialize();
+ this.scanner = this.region.getScanner(scan);
+ this.cells = new ArrayList<>();
+
+ this.region.startRegionOperation();
+ this.regionOperationStarted = true;
+ } catch (Throwable t) {
+ LOG.error("Failed to initialize HBaseRegionScanner", t);
+ close();
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ }
+ throw new IOException("Failed to initialize HBaseRegionScanner", t);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.scanner != null) {
+ try {
+ this.scanner.close();
+ this.scanner = null;
+ } catch (IOException var3) {
+ LOG.warn("Exception while closing scanner", var3);
+ }
+ }
+
+ if (this.region != null) {
+ try {
+ if (this.regionOperationStarted) {
+ try {
+ this.region.closeRegionOperation();
+ } catch (IOException e) {
+ LOG.warn("Exception while closing region operation", e);
+ }
+ }
+ this.region.close(true);
+ this.region = null;
+ } catch (IOException var2) {
+ LOG.warn("Exception while closing region", var2);
+ }
+ }
+ }
+
+ public Result next() throws IOException {
+ while (this.hasMore) {
+ this.cells.clear();
+ this.hasMore = this.scanner.nextRaw(this.cells);
+
+ if (!this.cells.isEmpty()) {
+ return Result.create(this.cells);
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/ReadSnapshotRegionFn.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/ReadSnapshotRegionFn.java
new file mode 100644
index 0000000000..83a212e776
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/ReadSnapshotRegionFn.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.SnapshotUtils;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.RegionConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotKey;
+import com.google.cloud.bigtable.beam.hbasesnapshots.transforms.HbaseRegionSplitTracker;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A Splittable {@link DoFn} for reading the records from each region. */
+@InternalApi("For internal usage only")
+public class ReadSnapshotRegionFn extends DoFn> {
+ private static final Logger LOG = LoggerFactory.getLogger(ReadSnapshotRegionFn.class);
+
+ private static final long BYTES_PER_SPLIT = 512 * 1024 * 1024; // 512 MB
+ private static final long BYTES_PER_GB = 1024 * 1024 * 1024;
+
+ private final boolean useDynamicSplitting;
+ private transient Map | |