Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
56 changes: 49 additions & 7 deletions bigtable-dataflow-parent/bigtable-beam-import/pom.xml
Comment thread
tianlei2 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,33 @@ limitations under the License.
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- Version alignment -->
<!-- Mark all annotations as provided. They don't affect the runtime of the pipeline so
there is no need to try to version align them -->
<dependency>
Comment thread
tianlei2 marked this conversation as resolved.
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
<version>3.31.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.18.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
<version>1.22</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -85,6 +112,7 @@ limitations under the License.
<artifactId>bigtable-hbase-beam</artifactId>
<version>2.18.2-SNAPSHOT</version> <!-- {x-version-update:bigtable-client-parent:current} -->
<exclusions>
<!-- Exclude hbase-shaded-client to prevent reintroducing the dnsjava SPI / LiteralByteString conflict (NoClassDefFoundError) -->
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
Expand All @@ -104,6 +132,7 @@ limitations under the License.
</dependency>


<!-- Use hbase-shaded-mapreduce (instead of hbase-shaded-client) to defeat the dnsjava SPI / LiteralByteString conflict (NoClassDefFoundError on JDK 21+) -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-mapreduce</artifactId>
Expand All @@ -118,6 +147,10 @@ limitations under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand All @@ -134,7 +167,11 @@ limitations under the License.
<artifactId>beam-runners-direct-java</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-emulator-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-shaded-testing-util</artifactId>
Expand All @@ -148,11 +185,6 @@ limitations under the License.
</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-emulator-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-internal-test-helper</artifactId>
Expand Down Expand Up @@ -186,7 +218,7 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
Expand Down Expand Up @@ -221,6 +253,14 @@ limitations under the License.


<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<fork>true</fork>
</configuration>
</plugin>

<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
Expand Down Expand Up @@ -287,6 +327,8 @@ limitations under the License.
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- Exclude InetAddressResolverProvider to prevent the dnsjava SPI / LiteralByteString conflict (NoClassDefFoundError on JDK 21+) -->
<exclude>META-INF/services/java.net.spi.InetAddressResolverProvider</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.bigtable.repackaged.com.google.api.core.InternalApi;
import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
import com.google.cloud.bigtable.beam.hbasesnapshots.HBaseSnapshotRestoreTool;
import com.google.cloud.bigtable.beam.hbasesnapshots.ImportJobFromHbaseSnapshot;
import com.google.cloud.bigtable.beam.sequencefiles.CreateTableHelper;
import com.google.cloud.bigtable.beam.sequencefiles.ExportJob;
Expand Down Expand Up @@ -51,6 +52,9 @@ public static void main(String[] args) throws Exception {
case "importsnapshot":
ImportJobFromHbaseSnapshot.main(subArgs);
break;
case "restoresnapshot":
HBaseSnapshotRestoreTool.main(subArgs);
break;
case "create-table":
CreateTableHelper.main(subArgs);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob.ImportOptions;
import com.google.cloud.bigtable.beam.validation.SyncTableJob.SyncTableOptions;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import com.google.cloud.bigtable.hbase.wrappers.BigtableHBaseSettings;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.options.ValueProvider;

/**
Expand All @@ -44,7 +46,16 @@ public static CloudBigtableTableConfiguration buildImportConfig(
.withProjectId(opts.getBigtableProject())
.withInstanceId(opts.getBigtableInstanceId())
.withTableId(opts.getBigtableTableId())
.withConfiguration(BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, customUserAgent);
.withConfiguration(BigtableOptionsFactory.CUSTOM_USER_AGENT_KEY, customUserAgent)
.withConfiguration(
BigtableOptionsFactory.MAX_INFLIGHT_RPCS_KEY,
ValueProvider.NestedValueProvider.of(opts.getMaxInflightRpcs(), String::valueOf))
.withConfiguration(
BigtableHBaseSettings.BULK_MUTATION_CLOSE_TIMEOUT_MILLISECONDS,
ValueProvider.NestedValueProvider.of(
opts.getBulkMutationCloseTimeoutMinutes(),
(Integer minutes) ->
String.valueOf(TimeUnit.MINUTES.toMillis(minutes == null ? 30 : minutes))));
if (opts.getBigtableAppProfileId() != null) {
builder.withAppProfileId(opts.getBigtableAppProfileId());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.beam.hbasesnapshots;

import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;

/**
* Tool to restore HBase snapshots in GCS for scanning. This tool runs locally (without Dataflow)
* and copies snapshot files to a restore path, resolving HLinks and References so that they can be
* read by a scanner.
*
Comment thread
tianlei2 marked this conversation as resolved.
* <p>Execute the following command to run the tool directly using system properties:
*
* <pre>
* {@code mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.bigtable.beam.hbasesnapshots.HBaseSnapshotRestoreTool \
* -Dproject=[PROJECT_ID] \
* -DhbaseSnapshotSourceDir=gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/data \
* -Dsnapshots=[SNAPSHOT_NAMES] \
* -DrestorePath=gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/restore
* }
* </pre>
*
* <p>Alternatively, you can provide a path to a JSON configuration file:
*
* <pre>
* {@code mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.bigtable.beam.hbasesnapshots.HBaseSnapshotRestoreTool \
* -Dproject=[PROJECT_ID] \
* -DimportConfigFilePath=[PATH_TO_JSON_CONFIG]
* }
* </pre>
*
* <p>The JSON configuration file should have the following format:
*
* <pre>
* {
* "sourcepath": "gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/data",
* "restorepath": "gs://[BUCKET]/[HBASE_EXPORT_ROOT_PATH]/restore",
* "snapshots": {
* "snapshot1": "table1",
* "snapshot2": "table2"
* }
* }
* </pre>
*/
@InternalExtensionOnly
public class HBaseSnapshotRestoreTool {
private static final Log LOG = LogFactory.getLog(HBaseSnapshotRestoreTool.class);

private static final String PROJECT_PROPERTY = "project";
private static final String IMPORT_CONFIG_FILE_PATH_PROPERTY = "importConfigFilePath";
private static final String HBASE_SNAPSHOT_SOURCE_DIR_PROPERTY = "hbaseSnapshotSourceDir";
private static final String SNAPSHOTS_PROPERTY = "snapshots";
private static final String RESTORE_PATH_PROPERTY = "restorePath";

public static void main(String[] args) throws Exception {
GcsOptions options = PipelineOptionsFactory.create().as(GcsOptions.class);
String project = System.getProperty(PROJECT_PROPERTY);
if (project != null) {
options.setProject(project);
}

ImportConfig importConfig =
System.getProperty(IMPORT_CONFIG_FILE_PATH_PROPERTY) != null
? buildImportConfigFromConfigFile(System.getProperty(IMPORT_CONFIG_FILE_PATH_PROPERTY))
: buildImportConfigFromArgs(options);

LOG.info(
String.format(
"SourcePath:%s, RestorePath:%s",
importConfig.getSourcepath(), importConfig.getRestorepath()));

Map<String, String> configurations =
SnapshotUtils.getConfiguration(
null, // invoke from a DirectRunner without using dataflow
options.getProject(),
importConfig.getSourcepath(),
importConfig.getHbaseConfiguration());

List<SnapshotConfig> snapshotConfigs =
SnapshotUtils.buildSnapshotConfigs(
importConfig.getSnapshots(),
configurations,
options.getProject(),
importConfig.getSourcepath(),
importConfig.getRestorepath());

for (SnapshotConfig config : snapshotConfigs) {
restoreSnapshot(config);
}
}

@VisibleForTesting
static ImportConfig buildImportConfigFromArgs(GcsOptions gcsOptions) throws IOException {
String sourceDir = System.getProperty(HBASE_SNAPSHOT_SOURCE_DIR_PROPERTY);
String snapshotsProperty = System.getProperty(SNAPSHOTS_PROPERTY);
Map<String, String> snapshots = null;
if (snapshotsProperty != null) {
snapshots =
(sourceDir != null && SnapshotUtils.isRegex(snapshotsProperty))
? SnapshotUtils.getSnapshotsFromSnapshotPath(
sourceDir, gcsOptions.getGcsUtil(), snapshotsProperty)
: SnapshotUtils.getSnapshotsFromString(snapshotsProperty);
}

ImportConfig importConfig = new ImportConfig();
importConfig.setSourcepath(sourceDir);
if (snapshots != null) {
importConfig.setSnapshotsFromMap(snapshots);
}
importConfig.validate();
SnapshotUtils.setRestorePath(System.getProperty(RESTORE_PATH_PROPERTY), importConfig);

return importConfig;
}

@VisibleForTesting
static ImportConfig buildImportConfigFromConfigFile(String configFilePath) throws Exception {
Gson gson = new GsonBuilder().create();
ImportConfig importConfig =
gson.fromJson(SnapshotUtils.readFileContents(configFilePath), ImportConfig.class);
Preconditions.checkNotNull(importConfig, "ImportConfig parsed from file cannot be null.");
importConfig.validate();
SnapshotUtils.setRestorePath(importConfig.getRestorepath(), importConfig);
return importConfig;
}

@VisibleForTesting
/**
* Creates a copy of Snasphsot from the source path into restore path.
*
* @param snapshotConfig - Snapshot Configuration
* @throws IOException
*/
static void restoreSnapshot(SnapshotConfig snapshotConfig) throws IOException {
Path sourcePath = snapshotConfig.getSourcePath();
Path restorePath = snapshotConfig.getRestorePath();
Configuration configuration = snapshotConfig.getConfiguration();
LOG.info(
String.format("RestoreSnapshot - sourcePath:%s restorePath: %s", sourcePath, restorePath));
FileSystem fileSystem = sourcePath.getFileSystem(configuration);
if (fileSystem.exists(restorePath)) {
LOG.info(
String.format(
"Restore path %s already exists, deleting it for idempotency", restorePath));
fileSystem.delete(restorePath, true);
Comment thread
tianlei2 marked this conversation as resolved.
}
RestoreSnapshotHelper.copySnapshotForScanner(
configuration, fileSystem, sourcePath, restorePath, snapshotConfig.getSnapshotName());
Comment thread
tianlei2 marked this conversation as resolved.
}
}
Loading
Loading