Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public abstract class ConsistencyRequest {
@Nullable
public abstract String getConsistencyToken();

protected abstract boolean isFullyQualified();

public static ConsistencyRequest forReplication(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null);
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null, false);
}

/**
Expand All @@ -59,37 +61,110 @@ public static ConsistencyRequest forReplication(String tableId, String consisten
Preconditions.checkNotNull(consistencyToken, "consistencyToken must not be null");

return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, consistencyToken);
tableId,
CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES,
consistencyToken,
false);
}

public static ConsistencyRequest forDataBoost(String tableId) {
return new AutoValue_ConsistencyRequest(
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null);
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null, false);
}

@InternalApi
public CheckConsistencyRequest toCheckConsistencyProto(
TableAdminRequestContext requestContext, String token) {
public static ConsistencyRequest forReplicationFromTableName(String tableName) {
Preconditions.checkArgument(
TableName.isParsableFrom(tableName), "tableName must be a fully qualified table name");
return new AutoValue_ConsistencyRequest(
tableName, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null, true);
}

@InternalApi
public static ConsistencyRequest forReplicationFromTableName(
String tableName, String consistencyToken) {
Preconditions.checkArgument(
TableName.isParsableFrom(tableName), "tableName must be a fully qualified table name");
Preconditions.checkNotNull(consistencyToken, "consistencyToken must not be null");
Comment thread
jinseopkim0 marked this conversation as resolved.

return new AutoValue_ConsistencyRequest(
tableName,
Comment thread
jinseopkim0 marked this conversation as resolved.
CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES,
consistencyToken,
true);
}

private CheckConsistencyRequest.Builder buildBaseRequest(String name, String token) {
CheckConsistencyRequest.Builder builder = CheckConsistencyRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

if (getMode().equals(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES)) {
builder.setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build());
} else {
builder.setDataBoostReadLocalWrites(DataBoostReadLocalWrites.newBuilder().build());
}

return builder.setName(tableName.toString()).setConsistencyToken(token).build();
return builder.setName(name).setConsistencyToken(token);
}

/**
* Creates a CheckConsistencyRequest proto. This variant is used when the ConsistencyRequest was
* initialized with a short table ID, relying on the TableAdminRequestContext to construct the
* fully qualified table name.
*/
@InternalApi
public CheckConsistencyRequest toCheckConsistencyProto(
TableAdminRequestContext requestContext, String token) {
Preconditions.checkState(
!isFullyQualified(),
"Use toCheckConsistencyProto(String token) for fully qualified table names.");
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

return buildBaseRequest(tableName.toString(), token).build();
}

/**
* Creates a CheckConsistencyRequest proto. This variant is used when the ConsistencyRequest was
* initialized with a fully qualified table name, eliminating the need for a request context.
*/
@InternalApi
public CheckConsistencyRequest toCheckConsistencyProto(String token) {
Preconditions.checkState(
isFullyQualified(),
"Use toCheckConsistencyProto(TableAdminRequestContext, String) for non-qualified table"
+ " names.");

return buildBaseRequest(getTableId(), token).build();
}

/**
* Creates a GenerateConsistencyTokenRequest proto. This variant is used when the
* ConsistencyRequest was initialized with a short table ID, relying on the
* TableAdminRequestContext to construct the fully qualified table name.
*/
@InternalApi
public GenerateConsistencyTokenRequest toGenerateTokenProto(
TableAdminRequestContext requestContext) {
Preconditions.checkState(
!isFullyQualified(), "Use toGenerateTokenProto() for fully qualified table names.");
GenerateConsistencyTokenRequest.Builder builder = GenerateConsistencyTokenRequest.newBuilder();
TableName tableName =
TableName.of(requestContext.getProjectId(), requestContext.getInstanceId(), getTableId());

return builder.setName(tableName.toString()).build();
}

/**
* Creates a GenerateConsistencyTokenRequest proto. This variant is used when the
* ConsistencyRequest was initialized with a fully qualified table name, eliminating the need for
* a request context.
*/
@InternalApi
public GenerateConsistencyTokenRequest toGenerateTokenProto() {
Preconditions.checkState(
isFullyQualified(),
"Use toGenerateTokenProto(TableAdminRequestContext) for non-qualified table names.");
GenerateConsistencyTokenRequest.Builder builder = GenerateConsistencyTokenRequest.newBuilder();
return builder.setName(getTableId()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import javax.annotation.Nullable;

/**
* Callable that waits until either replication or Data Boost has caught up to the point it was
Expand All @@ -56,15 +57,15 @@ class AwaitConsistencyCallable extends UnaryCallable<ConsistencyRequest, Void> {
private final UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable;
private final RetryingExecutor<CheckConsistencyResponse> executor;

private final TableAdminRequestContext requestContext;
@Nullable private final TableAdminRequestContext requestContext;

static AwaitConsistencyCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings,
TableAdminRequestContext requestContext) {
@Nullable TableAdminRequestContext requestContext) {

RetryAlgorithm<CheckConsistencyResponse> retryAlgorithm =
new RetryAlgorithm<>(
Expand All @@ -78,13 +79,22 @@ static AwaitConsistencyCallable create(
generateCallable, checkCallable, retryingExecutor, requestContext);
}

static AwaitConsistencyCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings) {
return create(generateCallable, checkCallable, clientContext, pollingSettings, null);
}

@VisibleForTesting
AwaitConsistencyCallable(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
RetryingExecutor<CheckConsistencyResponse> executor,
TableAdminRequestContext requestContext) {
@Nullable TableAdminRequestContext requestContext) {
this.generateCallable = generateCallable;
this.checkCallable = checkCallable;
this.executor = executor;
Expand All @@ -98,22 +108,30 @@ public ApiFuture<Void> futureCall(
// If the token is already provided, skip generation and poll directly.
if (consistencyRequest.getConsistencyToken() != null) {
CheckConsistencyRequest request =
consistencyRequest.toCheckConsistencyProto(
requestContext, consistencyRequest.getConsistencyToken());
requestContext == null
? consistencyRequest.toCheckConsistencyProto(consistencyRequest.getConsistencyToken())
: consistencyRequest.toCheckConsistencyProto(
requestContext, consistencyRequest.getConsistencyToken());
return pollToken(request, apiCallContext);
}

ApiFuture<GenerateConsistencyTokenResponse> tokenFuture =
generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext);
generateToken(
requestContext == null
? consistencyRequest.toGenerateTokenProto()
: consistencyRequest.toGenerateTokenProto(requestContext),
apiCallContext);

return ApiFutures.transformAsync(
tokenFuture,
new ApiAsyncFunction<GenerateConsistencyTokenResponse, Void>() {
@Override
public ApiFuture<Void> apply(GenerateConsistencyTokenResponse input) {
CheckConsistencyRequest request =
consistencyRequest.toCheckConsistencyProto(
requestContext, input.getConsistencyToken());
requestContext == null
? consistencyRequest.toCheckConsistencyProto(input.getConsistencyToken())
: consistencyRequest.toCheckConsistencyProto(
requestContext, input.getConsistencyToken());
return pollToken(request, apiCallContext);
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,38 @@ public class EnhancedBigtableTableAdminStub extends GrpcBigtableTableAdminStub {
private final BigtableTableAdminStubSettings settings;
private final ClientContext clientContext;

private final TableAdminRequestContext requestContext;
@javax.annotation.Nullable private final TableAdminRequestContext requestContext;

@Deprecated private final AwaitReplicationCallable awaitReplicationCallable;

private final AwaitConsistencyCallable awaitConsistencyCallable;
private final OperationCallable<Void, Empty, OptimizeRestoredTableMetadata>
optimizeRestoredTableOperationBaseCallable;

/**
* Creates an instance of {@link EnhancedBigtableTableAdminStub} using the provided settings. This
* variant is used by the V2 client stack which relies on fully qualified table names and
* therefore does not require a {@link TableAdminRequestContext}.
*
* @param settings The settings used to configure the stub.
* @return A new instance of {@code EnhancedBigtableTableAdminStub}.
* @throws IOException If there are errors creating the underlying client context.
*/
public static EnhancedBigtableTableAdminStub createEnhanced(
BigtableTableAdminStubSettings settings) throws IOException {
return new EnhancedBigtableTableAdminStub(settings, ClientContext.create(settings), null);
}
Comment thread
jinseopkim0 marked this conversation as resolved.

Comment thread
jinseopkim0 marked this conversation as resolved.
/**
* Creates an instance of {@link EnhancedBigtableTableAdminStub} using the provided settings. This
* variant is used by the legacy client stack which relies on short table IDs and requires a
* {@link TableAdminRequestContext} to construct fully qualified table names.
*
* @param settings The settings used to configure the stub.
* @param requestContext The context used to format short table IDs.
* @return A new instance of {@code EnhancedBigtableTableAdminStub}.
* @throws IOException If there are errors creating the underlying client context.
*/
public static EnhancedBigtableTableAdminStub createEnhanced(
BigtableTableAdminStubSettings settings, TableAdminRequestContext requestContext)
throws IOException {
Expand All @@ -72,7 +96,7 @@ public static EnhancedBigtableTableAdminStub createEnhanced(
private EnhancedBigtableTableAdminStub(
BigtableTableAdminStubSettings settings,
ClientContext clientContext,
TableAdminRequestContext requestContext)
@javax.annotation.Nullable TableAdminRequestContext requestContext)
throws IOException {
super(settings, clientContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,55 @@ public void testToCheckConsistencyProtoWithToken() {
assertThat(checkConsistencyRequest.getModeCase())
.isEqualTo(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
}

@Test
public void testToCheckConsistencyProtoFromTableName() {
String fullTableName = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID);
ConsistencyRequest consistencyRequest =
ConsistencyRequest.forReplicationFromTableName(fullTableName);

CheckConsistencyRequest checkConsistencyRequest =
consistencyRequest.toCheckConsistencyProto(CONSISTENCY_TOKEN);

assertThat(checkConsistencyRequest.getName()).isEqualTo(fullTableName);
assertThat(checkConsistencyRequest.getConsistencyToken()).isEqualTo(CONSISTENCY_TOKEN);
assertThat(checkConsistencyRequest.getModeCase())
.isEqualTo(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
}

@Test
public void testToCheckConsistencyProtoFromTableNameWithToken() {
String fullTableName = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID);
ConsistencyRequest consistencyRequest =
ConsistencyRequest.forReplicationFromTableName(fullTableName, CONSISTENCY_TOKEN);

CheckConsistencyRequest checkConsistencyRequest =
consistencyRequest.toCheckConsistencyProto(CONSISTENCY_TOKEN);

assertThat(checkConsistencyRequest.getName()).isEqualTo(fullTableName);
assertThat(checkConsistencyRequest.getConsistencyToken()).isEqualTo(CONSISTENCY_TOKEN);
assertThat(checkConsistencyRequest.getModeCase())
.isEqualTo(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
}

@Test
public void testToGenerateTokenProtoFromTableName() {
String fullTableName = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID);
ConsistencyRequest consistencyRequest =
ConsistencyRequest.forReplicationFromTableName(fullTableName);

GenerateConsistencyTokenRequest generateRequest = consistencyRequest.toGenerateTokenProto();

assertThat(generateRequest.getName()).isEqualTo(fullTableName);
}

@Test(expected = IllegalArgumentException.class)
public void testForReplicationFromTableNameInvalid() {
ConsistencyRequest.forReplicationFromTableName(TABLE_ID);
}

@Test(expected = IllegalArgumentException.class)
public void testForReplicationFromTableNameWithTokenInvalid() {
ConsistencyRequest.forReplicationFromTableName(TABLE_ID, CONSISTENCY_TOKEN);
}
}
Loading