Skip to content

Commit

Permalink
Bigtable: clean up consistency token
Browse files Browse the repository at this point in the history
  • Loading branch information
igorbernstein2 committed Aug 16, 2018
1 parent 1c5af25 commit 845e134
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,7 @@ public ApiFuture<Void> dropAllRowsAsync(String tableId) {
* }</pre>
*/
public ConsistencyToken generateConsistencyToken(String tableId) {
return ConsistencyToken.fromProto(
this.stub
.generateConsistencyTokenCallable()
.call(composeGenerateConsistencyTokenRequest(tableId)));
return awaitFuture(generateConsistencyTokenAsync(tableId));
}

/**
Expand All @@ -472,7 +469,7 @@ public ConsistencyToken generateConsistencyToken(String tableId) {
* }
* }</pre>
*/
public ApiFuture<ConsistencyToken> generateConsistencyTokenAsync(String tableId) {
public ApiFuture<ConsistencyToken> generateConsistencyTokenAsync(final String tableId) {
ApiFuture<GenerateConsistencyTokenResponse> tokenResp =
this.stub
.generateConsistencyTokenCallable()
Expand All @@ -482,8 +479,9 @@ public ApiFuture<ConsistencyToken> generateConsistencyTokenAsync(String tableId)
tokenResp,
new ApiFunction<GenerateConsistencyTokenResponse, ConsistencyToken>() {
@Override
public ConsistencyToken apply(GenerateConsistencyTokenResponse input) {
return ConsistencyToken.fromProto(input);
public ConsistencyToken apply(GenerateConsistencyTokenResponse proto) {
TableName tableName = TableName.of(instanceName.getProject(), instanceName.getProject(), tableId);
return ConsistencyToken.of(tableName, proto.getConsistencyToken());
}
});
}
Expand All @@ -495,30 +493,25 @@ public ConsistencyToken apply(GenerateConsistencyTokenResponse input) {
*
* <pre>{@code
* try(BigtableTableAdminClient client = BigtableTableAdminClient.create(InstanceName.of("[PROJECT]", "[INSTANCE]"))) {
* boolean consistent = client.isConsistent("tableId", token);
* }
* }</pre>
*/
public boolean isConsistent(String tableId, ConsistencyToken token) {
return stub.checkConsistencyCallable()
.call(token.toProto(getTableName(tableId)))
.getConsistent();
}

/**
* Checks replication consistency for the specified token consistency token asynchronously
* // Perform some mutations.
*
* <p>Sample code:
* ConsistencyToken token = client.generateConsistencyToken("table-id");
* while(!client.isConsistent(token)) {
* Thread.sleep(100);
* }
*
* <pre>{@code
* try(BigtableTableAdminClient client = BigtableTableAdminClient.create(InstanceName.of("[PROJECT]", "[INSTANCE]"))) {
* boolean consistent = client.isConsistentAsync("tableId", token);
* // Now all clusters are consistent
* }
* }</pre>
*/
public ApiFuture<Boolean> isConsistentAsync(String tableId, ConsistencyToken token) {
ApiFuture<CheckConsistencyResponse> checkConsResp =
stub.checkConsistencyCallable().futureCall(token.toProto(getTableName(tableId)));
public boolean isConsistent(ConsistencyToken token) {
return awaitFuture(isConsistentAsync(token));
}

@VisibleForTesting
ApiFuture<Boolean> isConsistentAsync(ConsistencyToken token) {
ApiFuture<CheckConsistencyResponse> checkConsResp = stub.checkConsistencyCallable()
.futureCall(token.toProto(instanceName));

return ApiFutures.transform(
checkConsResp,
Expand All @@ -530,6 +523,8 @@ public Boolean apply(CheckConsistencyResponse input) {
});
}

// TODO(igorbernstein2): add awaitConsist() & awaitConsistAsync() that generate & poll a token

/**
* Helper method to construct the table name in format:
* projects/{project}/instances/{instance}/tables/{tableId}
Expand Down Expand Up @@ -631,4 +626,13 @@ public Void apply(Empty empty) {
}
});
}

private <T> T awaitFuture(ApiFuture<T> future) {
try {
return future.get();
} catch(Throwable t) {
// TODO(igorbernstein2): figure out a better wrapper exception.
throw new RuntimeException(t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import com.google.api.core.InternalApi;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.bigtable.admin.v2.InstanceName;
import com.google.bigtable.admin.v2.TableName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

/**
* Wrapper for {@link GenerateConsistencyTokenResponse#getConsistencyToken()}
Expand All @@ -28,22 +31,26 @@
* com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient#generateConsistencyToken(String)}
*/
public final class ConsistencyToken {
private final TableName tableName;
private final String token;

@InternalApi
public static ConsistencyToken fromProto(GenerateConsistencyTokenResponse proto) {
return new ConsistencyToken(proto.getConsistencyToken());
public static ConsistencyToken of(TableName tableName, String token) {
return new ConsistencyToken(tableName, token);
}

private ConsistencyToken(String token) {
public ConsistencyToken(TableName tableName, String token) {
this.tableName = tableName;
this.token = token;
}

// TODO(igorbernstein): tableName should be part of the token and be parameterized
@InternalApi
public CheckConsistencyRequest toProto(String tableName) {
public CheckConsistencyRequest toProto(InstanceName instanceName) {
Preconditions.checkArgument(
instanceName.equals(InstanceName.of(tableName.getProject(), tableName.getInstance())),
"Consistency tokens are only valid within a single instance.");

return CheckConsistencyRequest.newBuilder()
.setName(tableName)
.setName(tableName.toString())
.setConsistencyToken(token)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public void generateAndCheckConsistency() {
Mockito.when(mockCheckConsistencyCallable.call(any(CheckConsistencyRequest.class)))
.thenReturn(consistencyResp);

adminClient.isConsistent("tableId", consistencyToken);
adminClient.isConsistent(consistencyToken);
Mockito.verify(mockCheckConsistencyCallable).call(requestCaptor.capture());
}

Expand All @@ -282,7 +282,7 @@ public void generateAndCheckConsistencyAsync() throws Exception {
Mockito.when(mockCheckConsistencyCallable.futureCall(any(CheckConsistencyRequest.class)))
.thenReturn(consistencyResp);

adminClient.isConsistentAsync("tableId", consistencyTokenFuture.get());
adminClient.isConsistentAsync(consistencyTokenFuture.get());
Mockito.verify(mockCheckConsistencyCallable).futureCall(requestCaptor.capture());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void checkConsistency() {
tableAdmin.createTable(CreateTableRequest.of(tableId));
ConsistencyToken consistencyToken = tableAdmin.generateConsistencyToken(tableId);
assertNotNull(consistencyToken);
boolean consistent = tableAdmin.isConsistent(tableId, consistencyToken);
boolean consistent = tableAdmin.isConsistent(consistencyToken);
assertTrue(consistent);
} finally {
tableAdmin.deleteTable(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,47 @@
*/
package com.google.cloud.bigtable.admin.v2.models;

import static org.junit.Assert.assertEquals;
import static com.google.common.truth.Truth.assertThat;

import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.InstanceName;
import com.google.bigtable.admin.v2.TableName;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ConsistencyTokenTest {
private static final InstanceName INSTANCE_NAME = InstanceName.of("my-project", "my-instance");
private static final TableName TABLE_NAME = TableName.of(INSTANCE_NAME.getProject(), INSTANCE_NAME.getInstance(), "my-table");
private static final String TOKEN_VALUE = "87282hgwd8yg";

@Test
public void testToProto() {
ConsistencyToken token = ConsistencyToken.of(TABLE_NAME, TOKEN_VALUE);

assertThat(token.toProto(INSTANCE_NAME)).isEqualTo(
CheckConsistencyRequest.newBuilder()
.setName(TABLE_NAME.toString())
.setConsistencyToken(TOKEN_VALUE)
.build()
);
}

@Test
public void fromJsonTest() {
ConsistencyToken tokenResponse =
ConsistencyToken.fromProto(
GenerateConsistencyTokenResponse.newBuilder()
.setConsistencyToken("87282hgwd8yg")
.build());

assertEquals("87282hgwd8yg", tokenResponse.getToken());
public void testInstanceMismatch() {
ConsistencyToken token = ConsistencyToken.of(TABLE_NAME, TOKEN_VALUE);

InstanceName otherInstanceName = InstanceName.of("my-project", "other-instance");

Exception actualError = null;

try {
token.toProto(otherInstanceName);
} catch (Exception e) {
actualError = e;
}

assertThat(actualError).isInstanceOf(IllegalArgumentException.class);
}
}

0 comments on commit 845e134

Please sign in to comment.