From 68e6d1adcbfd449f2399a3f30cc0cec64ffab47f Mon Sep 17 00:00:00 2001 From: elisheva-qlogic <43041115+elisheva-qlogic@users.noreply.github.com> Date: Wed, 7 Nov 2018 14:10:47 -0500 Subject: [PATCH] Bigtable: add sync methods (#3856) --- .../bigtable/data/v2/BigtableDataClient.java | 203 +++++++++++++++++- .../data/v2/BigtableDataClientTest.java | 123 ++++++++++- 2 files changed, 321 insertions(+), 5 deletions(-) diff --git a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java index ca4a3d169ac7..e36e7900057b 100644 --- a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java +++ b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.ServerStreamingCallable; @@ -136,6 +137,66 @@ public static BigtableDataClient create(BigtableDataSettings settings) throws IO this.stub = stub; } + /** + * Convenience method for synchronously reading a single row. If the row does not exist, the + * value will be null. + * + *

Sample code: + * + *

{code
+   * InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
+   *   String tableId = "[TABLE]";
+   *
+   *   Row row = bigtableDataClient.readRow(tableId, ByteString.copyFromUtf8("key"));
+   *   // Do something with row, for example, display all cells
+   *   if(row != null) {
+   *     System.out.println(row.getKey().toStringUtf8());
+   *     for(RowCell cell : row.getCells()) {
+   *       System.out.println("Family: " + cell.getFamily() + "   Qualifier: " + cell.getQualifier().toStringUtf8() + "   Value: " + cell.getValue().toStringUtf8());
+   *     }
+   *   }
+   * } catch(ApiException e) {
+   *   e.printStackTrace();
+   * }
+   * }
+ * + * @throws com.google.api.gax.rpc.ApiException when a serverside error occurs + */ + public Row readRow(String tableId, ByteString rowKey) { + return ApiExceptions.callAndTranslateApiException(readRowAsync(tableId, rowKey)); + } + + /** + * Convenience method for synchronously reading a single row. If the row does not exist, the + * value will be null. + * + *

Sample code: + * + *

{code
+   * InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
+   *   String tableId = "[TABLE]";
+   *
+   *   Row row = bigtableDataClient.readRow(tableId, "key");
+   *   // Do something with row, for example, display all cells
+   *   if(row != null) {
+   *     System.out.println(row.getKey().toStringUtf8());
+   *      for(RowCell cell : row.getCells()) {
+   *        System.out.println("Family: " + cell.getFamily() + "   Qualifier: " + cell.getQualifier().toStringUtf8() + "   Value: " + cell.getValue().toStringUtf8());
+   *      }
+   *   }
+   * } catch(ApiException e) {
+   *   e.printStackTrace();
+   * }
+   * }
+ * + * @throws com.google.api.gax.rpc.ApiException when a serverside error occurs + */ + public Row readRow(String tableId, String rowKey) { + return ApiExceptions.callAndTranslateApiException(readRowAsync(tableId, rowKey)); + } + /** * Convenience method for asynchronously reading a single row. If the row does not exist, the * future's value will be null. @@ -158,7 +219,9 @@ public static BigtableDataClient create(BigtableDataSettings settings) throws IO * } * } * public void onSuccess(Row result) { - * System.out.println("Got row: " + result); + * if (result != null) { + * System.out.println("Got row: " + result); + * } * } * }, MoreExecutors.directExecutor()); * } @@ -190,7 +253,9 @@ public ApiFuture readRowAsync(String tableId, String rowKey) { * } * } * public void onSuccess(Row row) { - * System.out.println("Got row: " + row); + * if (result != null) { + * System.out.println("Got row: " + result); + * } * } * }, MoreExecutors.directExecutor()); * } @@ -374,6 +439,33 @@ public ServerStreamingCallable readRowsCallable(RowAdapterSample code: + * + *
{@code
+   * InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
+   *   String tableId = "[TABLE_ID]";
+   *
+   *   List keyOffsets = bigtableDataClient.sampleRowKeys(tableId);
+   *   for(KeyOffset keyOffset : keyOffsets) {
+   *   // Do something with keyOffset
+   *   }
+   * } catch(ApiException e) {
+   *   e.printStackTrace();
+   * }
+   * }
+ * + * @throws com.google.api.gax.rpc.ApiException when a serverside error occurs + */ + public List sampleRowKeys(String tableId) { + return ApiExceptions.callAndTranslateApiException(sampleRowKeysAsync(tableId)); + } + /** * Convenience method to asynchronously return a sample of row keys in the table. The returned row * keys will delimit contiguous sections of the table of approximately equal size, which can be @@ -447,6 +539,30 @@ public UnaryCallable> sampleRowKeysCallable() { return stub.sampleRowKeysCallable(); } + /** + * Convenience method to synchronously mutate a single row atomically. Cells already present in + * the row are left unchanged unless explicitly changed by the {@link RowMutation}. + * + *

Sample code: + * + *

{@code
+   * InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
+   *   RowMutation mutation = RowMutation.create("[TABLE]", "[ROW KEY]")
+   *     .setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]");
+   *
+   *   bigtableDataClient.mutateRow(mutation);
+   * } catch(ApiException e) {
+   *   e.printStackTrace();
+   * }
+   * }
+ * + * @throws com.google.api.gax.rpc.ApiException when a serverside error occurs + */ + public void mutateRow(RowMutation rowMutation) { + ApiExceptions.callAndTranslateApiException(mutateRowAsync(rowMutation)); + } + /** * Convenience method to asynchronously mutate a single row atomically. Cells already present in * the row are left unchanged unless explicitly changed by the {@link RowMutation}. @@ -547,6 +663,35 @@ public BulkMutationBatcher newBulkMutationBatcher() { * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) { * BulkMutation batch = BulkMutation.create("[TABLE]"); * for (String someValue : someCollection) { + * batch.add("[ROW KEY]", Mutation.create().setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]")); + * } + * bigtableDataClient.bulkMutateRows(batch); + * } catch(ApiException e) { + * e.printStackTrace(); + * } catch(MutateRowsException e) { + * e.printStackTrace(); + * } + * } + * + * @throws com.google.api.gax.rpc.ApiException when a serverside error occurs + * @throws com.google.cloud.bigtable.data.v2.models.MutateRowsException if any of the entries failed to be applied + */ + public void bulkMutateRows(BulkMutation mutation) { + ApiExceptions.callAndTranslateApiException(bulkMutateRowsAsync(mutation)); + } + + /** + * Convenience method to mutate multiple rows in a batch. Each individual row is mutated + * atomically as in MutateRow, but the entire batch is not executed atomically. Unlike {@link + * #newBulkMutationBatcher()}, this method expects the mutations to be pre-batched. + * + *

Sample code: + * + *

{@code
+   * InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
+   * try (BigtableClient bigtableClient = BigtableClient.create(instanceName)) {
+   *   BulkMutation batch = BulkMutation.create("[TABLE]");
+   *   for (String someValue : someCollection) {
    *     ApiFuture entryFuture = batch.add("[ROW KEY]",
    *       Mutation.create().setCell("[FAMILY NAME]", "[QUALIFIER]", "[VALUE]"));
    *   }
@@ -594,6 +739,33 @@ public UnaryCallable bulkMutationCallable() {
     return stub.bulkMutateRowsCallable();
   }
 
+  /**
+   * Convenience method to synchronously mutate a row atomically based on the output of a filter.
+   *
+   * 

Sample code: + * + *

{@code
+   * InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
+   *   ConditionalRowMutation mutation = ConditionalRowMutation.create("[TABLE]", "[KEY]")
+   *     .condition(FILTERS.value().regex("old-value"))
+   *     .then(
+   *       Mutation.create()
+   *         .setCell("[FAMILY]", "[QUALIFIER]", "[VALUE]")
+   *       );
+   *
+   *   Boolean result = bigtableDataClient.checkAndMutateRow(mutation);
+   * } catch(ApiException e) {
+   *   e.printStackTrace();
+   * }
+   * }
+ * + * @throws com.google.api.gax.rpc.ApiException when a serverside error occurs + */ + public Boolean checkAndMutateRow(ConditionalRowMutation mutation) { + return ApiExceptions.callAndTranslateApiException(checkAndMutateRowAsync(mutation)); + } + /** * Convenience method to asynchronously mutate a row atomically based on the output of a filter. * @@ -663,6 +835,33 @@ public UnaryCallable checkAndMutateRowCallable( return stub.checkAndMutateRowCallable(); } + /** + * Convenience method that synchronously modifies a row atomically on the server. The method + * reads the latest existing timestamp and value from the specified columns and writes a new + * entry. The new value for the timestamp is the greater of the existing timestamp or the current + * server time. The method returns the new contents of all modified cells. + * + *

Sample code: + * + *

{@code
+   * InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
+   * try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
+   *   ReadModifyWriteRow mutation = ReadModifyWriteRow.create("[TABLE]", "[KEY]")
+   *     .increment("[FAMILY]", "[QUALIFIER]", 1)
+   *     .append("[FAMILY2]", "[QUALIFIER2]", "suffix");
+   *
+   *   Row success = bigtableDataClient.readModifyWriteRow(mutation);
+   * } catch(ApiException e) {
+   *   e.printStackTrace();
+   * }
+   * }
+ * + * @throws com.google.api.gax.rpc.ApiException when a serverside error occurs + */ + public Row readModifyWriteRow(ReadModifyWriteRow mutation) { + return ApiExceptions.callAndTranslateApiException(readModifyWriteRowAsync(mutation)); + } + /** * Convenience method that asynchronously modifies a row atomically on the server. The method * reads the latest existing timestamp and value from the specified columns and writes a new diff --git a/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java index 9aee30878d9c..2fdfe1c37401 100644 --- a/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java +++ b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTest.java @@ -16,8 +16,10 @@ package com.google.cloud.bigtable.data.v2; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Matchers.any; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.ApiException; @@ -37,10 +39,13 @@ import com.google.cloud.bigtable.data.v2.models.Query; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowCell; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStub; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import io.grpc.Status.Code; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeoutException; import org.junit.Before; @@ -50,7 +55,9 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import org.threeten.bp.Duration; @RunWith(MockitoJUnitRunner.class) @@ -115,7 +122,6 @@ public void proxyReadRowAsyncTest() { @Test public void proxyReadRowStrAsyncTest() { bigtableDataClient.readRowAsync("fake-table", "fake-row-key"); - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Query.class); Mockito.verify(mockReadRowsCallable.first()).futureCall(requestCaptor.capture()); @@ -131,6 +137,50 @@ public void proxyReadRowStrAsyncTest() { .build()); } + @Test + public void readRowTest() { + Mockito.when(mockReadRowsCallable.first().futureCall(any(Query.class))) + .thenReturn(ApiFutures.immediateFuture( + Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.emptyList()))); + bigtableDataClient.readRow("fake-table", ByteString.copyFromUtf8("fake-row-key")); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Query.class); + Mockito.verify(mockReadRowsCallable.first()).futureCall(requestCaptor.capture()); + + RequestContext ctx = + RequestContext.create(InstanceName.of("fake-project", "fake-instance"), "fake-profile"); + // NOTE: limit(1) is added by the mocked first() call, so it's not tested here + assertThat(requestCaptor.getValue().toProto(ctx)) + .isEqualTo( + ReadRowsRequest.newBuilder() + .setTableName("projects/fake-project/instances/fake-instance/tables/fake-table") + .setAppProfileId("fake-profile") + .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("fake-row-key"))) + .build()); + } + + @Test + public void readRowStrTest() { + Mockito.when(mockReadRowsCallable.first().futureCall(any(Query.class))) + .thenReturn(ApiFutures.immediateFuture( + Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.emptyList()))); + bigtableDataClient.readRow("fake-table", "fake-row-key"); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(Query.class); + Mockito.verify(mockReadRowsCallable.first()).futureCall(requestCaptor.capture()); + + RequestContext ctx = + RequestContext.create(InstanceName.of("fake-project", "fake-instance"), "fake-profile"); + // NOTE: limit(1) is added by the mocked first() call, so it's not tested here + assertThat(requestCaptor.getValue().toProto(ctx)) + .isEqualTo( + ReadRowsRequest.newBuilder() + .setTableName("projects/fake-project/instances/fake-instance/tables/fake-table") + .setAppProfileId("fake-profile") + .setRows(RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("fake-row-key"))) + .build()); + } + @Test public void proxyReadRowsSyncTest() { Query query = Query.create("fake-table"); @@ -160,6 +210,14 @@ public void proxySampleRowKeysTest() { Mockito.verify(mockSampleRowKeysCallable).futureCall("fake-table"); } + @Test + public void sampleRowKeysTest() { + Mockito.when(mockSampleRowKeysCallable.futureCall(any(String.class))) + .thenReturn(ApiFutures.immediateFuture(Collections.emptyList())); + bigtableDataClient.sampleRowKeys("fake-table"); + Mockito.verify(mockSampleRowKeysCallable).futureCall("fake-table"); + } + @Test public void proxyMutateRowCallableTest() { assertThat(bigtableDataClient.mutateRowCallable()).isSameAs(mockMutateRowCallable); @@ -168,13 +226,31 @@ public void proxyMutateRowCallableTest() { @Test public void proxyMutateRowTest() { RowMutation request = - RowMutation.create("fake-table", "some-key") - .setCell("some-family", "fake-qualifier", "fake-value"); + RowMutation.create("fake-table", "some-key") + .setCell("some-family", "fake-qualifier", "fake-value"); bigtableDataClient.mutateRowAsync(request); Mockito.verify(mockMutateRowCallable).futureCall(request); } + @Test + public void mutateRowTest() { + Mockito.when(mockMutateRowCallable.futureCall(any(RowMutation.class))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + }); + + RowMutation request = + RowMutation.create("fake-table", "some-key") + .setCell("some-family", "fake-qualifier", "fake-value"); + + bigtableDataClient.mutateRow(request); + Mockito.verify(mockMutateRowCallable).futureCall(request); + } + @Test public void proxyBulkMutatesRowTest() { BulkMutation request = @@ -187,6 +263,26 @@ public void proxyBulkMutatesRowTest() { Mockito.verify(mockBulkMutateRowsCallable).futureCall(request); } + @Test + public void bulkMutatesRowTest() { + Mockito.when(mockBulkMutateRowsCallable.futureCall(any(BulkMutation.class))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + }); + + BulkMutation request = + BulkMutation.create("fake-table") + .add( + "fake-key", + Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value")); + + bigtableDataClient.bulkMutateRows(request); + Mockito.verify(mockBulkMutateRowsCallable).futureCall(request); + } + @Test public void proxyBulkMutationsBatchingSendTest() { BulkMutationBatcher batcher = bigtableDataClient.newBulkMutationBatcher(); @@ -291,6 +387,17 @@ public void proxyCheckAndMutateRowTest() { Mockito.verify(mockCheckAndMutateRowCallable).futureCall(mutation); } + @Test + public void checkAndMutateRowTest() { + Mockito.when(mockCheckAndMutateRowCallable.futureCall(any(ConditionalRowMutation.class))).thenReturn(ApiFutures.immediateFuture(Boolean.TRUE)); + ConditionalRowMutation mutation = + ConditionalRowMutation.create("fake-table", "fake-key") + .then(Mutation.create().setCell("fake-family", "fake-qualifier", "fake-value")); + bigtableDataClient.checkAndMutateRow(mutation); + + Mockito.verify(mockCheckAndMutateRowCallable).futureCall(mutation); + } + @Test public void proxyReadModifyWriteRowTest() { ReadModifyWriteRow request = @@ -300,6 +407,16 @@ public void proxyReadModifyWriteRowTest() { Mockito.verify(mockReadModifyWriteRowCallable).futureCall(request); } + @Test + public void readModifyWriteRowTest() { + Mockito.when(mockReadModifyWriteRowCallable.futureCall(any(ReadModifyWriteRow.class))).thenReturn(ApiFutures.immediateFuture(Row.create(ByteString.copyFromUtf8("fake-row-key"), Collections.emptyList()))); + ReadModifyWriteRow request = + ReadModifyWriteRow.create("fake-table", "some-key") + .append("fake-family", "fake-qualifier", "suffix"); + bigtableDataClient.readModifyWriteRow(request); + Mockito.verify(mockReadModifyWriteRowCallable).futureCall(request); + } + @Test public void proxyReadModifyWriterRowCallableTest() { assertThat(bigtableDataClient.readModifyWriteRowCallable())