Skip to content

Commit

Permalink
Bigtable: add a separate callable for point reads (#4264)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorbernstein2 authored Dec 28, 2018
1 parent 31971fb commit 36fbd97
Show file tree
Hide file tree
Showing 7 changed files with 355 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,88 @@ public ApiFuture<Row> readRowAsync(String tableId, ByteString rowKey, @Nullable
if (filter != null) {
query = query.filter(filter);
}
return readRowsCallable().first().futureCall(query);
return readRowCallable().futureCall(query);
}

/**
* Reads a single row. The returned callable object allows for customization of api invocation.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* String tableId = "[TABLE]";
*
* Query query = Query.create(tableId)
* .rowKey("[KEY]")
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
*
* // Synchronous invocation
* try {
* Row row = bigtableDataClient.readRowCallable().call(query);
* if (row == null) {
* System.out.println("Row not found");
* }
* } catch (RuntimeException e) {
* e.printStackTrace();
* }
*
* // Asynchronous invocation
* ApiFuture<Row> rowFuture = bigtableDataClient.readRowCallable().futureCall(query);
*
* ApiFutures.addCallback(rowFuture, new ApiFutureCallback<Row>() {
* public void onFailure(Throwable t) {
* if (t instanceof NotFoundException) {
* System.out.println("Tried to read a non-existent table");
* } else {
* t.printStackTrace();
* }
* }
* public void onSuccess(Row row) {
* if (row == null) {
* System.out.println("Row not found");
* }
* }
* }, MoreExecutors.directExecutor());
* }
* }</pre>
*
* @see UnaryCallable For call styles.
* @see Query For query options.
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
*/
public UnaryCallable<Query, Row> readRowCallable() {
return stub.readRowCallable();
}

/**
* Reads a single row. This callable allows for customization of the logical representation of a
* row. It's meant for advanced use cases.
*
* <p>Sample code:
*
* <pre>{@code
* InstanceName instanceName = InstanceName.of("[PROJECT]", "[INSTANCE]");
* try (BigtableDataClient bigtableDataClient = BigtableDataClient.create(instanceName)) {
* String tableId = "[TABLE]";
*
* Query query = Query.create(tableId)
* .rowKey("[KEY]")
* .filter(FILTERS.qualifier().regex("[COLUMN PREFIX].*"));
*
* // Synchronous invocation
* CustomRow row = bigtableDataClient.readRowCallable(new CustomRowAdapter()).call(query));
* // Do something with row
* }
* }</pre>
*
* @see ServerStreamingCallable For call styles.
* @see Query For query options.
* @see com.google.cloud.bigtable.data.v2.models.Filters For the filter building DSL.
*/
public <RowT> UnaryCallable<Query, RowT> readRowCallable(RowAdapter<RowT> rowAdapter) {
return stub.createReadRowCallable(rowAdapter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.internal.RowSetUtil;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -264,4 +266,34 @@ private static ByteString wrapKey(String key) {
}
return ByteString.copyFromUtf8(key);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Query query = (Query) o;
return Objects.equal(tableId, query.tableId)
&& Objects.equal(builder.build(), query.builder.build());
}

@Override
public int hashCode() {
return Objects.hashCode(tableId, builder.build());
}

@Override
public String toString() {
ReadRowsRequest request = builder.build();

return MoreObjects.toStringHelper(this)
.add("tableId", tableId)
.add("keys", request.getRows().getRowKeysList())
.add("ranges", request.getRows().getRowRangesList())
.add("filter", request.getFilter())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final RequestContext requestContext;

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -151,6 +152,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
RequestContext.create(settings.getInstanceName(), settings.getAppProfileId());

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand All @@ -162,7 +164,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
// <editor-fold desc="Callable creators">

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
* Creates a callable chain to handle streaming ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
Expand All @@ -176,6 +178,48 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
*/
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
return createReadRowsCallable(settings.readRowsSettings(), rowAdapter);
}

/**
* Creates a callable chain to handle point ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
* dispatch the RPC.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*/
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
return createReadRowsCallable(
ServerStreamingCallSettings.<Query, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter)
.first();
}

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
* dispatch the RPC.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*/
private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<Query, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
Expand All @@ -185,9 +229,9 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
.build();

// Retry logic is split into 2 parts to workaround a rare edge case described in
Expand Down Expand Up @@ -356,10 +400,16 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
// </editor-fold>

// <editor-fold desc="Callable accessors">
/** Returns a streaming read rows callable */
public ServerStreamingCallable<Query, Row> readRowsCallable() {
return readRowsCallable;
}

/** Return a point read callable */
public UnaryCallable<Query, Row> readRowCallable() {
return readRowCallable;
}

public UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable() {
return sampleRowKeysCallable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final String appProfileId;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
private final UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings;
private final UnaryCallSettings<RowMutation, Void> mutateRowSettings;
private final BatchingCallSettings<RowMutation, Void> bulkMutateRowsSettings;
Expand All @@ -120,11 +121,22 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS

private EnhancedBigtableStubSettings(Builder builder) {
super(builder);

// Since point reads & streaming reads share the same base callable that converts grpc errors
// into ApiExceptions, they must have the same retry codes.
Preconditions.checkState(
builder
.readRowSettings
.getRetryableCodes()
.equals(builder.readRowsSettings.getRetryableCodes()),
"Single ReadRow retry codes must match ReadRows retry codes");

instanceName = builder.instanceName;
appProfileId = builder.appProfileId;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
readRowSettings = builder.readRowSettings.build();
sampleRowKeysSettings = builder.sampleRowKeysSettings.build();
mutateRowSettings = builder.mutateRowSettings.build();
bulkMutateRowsSettings = builder.bulkMutateRowsSettings.build();
Expand Down Expand Up @@ -163,6 +175,11 @@ public UnaryCallSettings<String, List<KeyOffset>> sampleRowKeysSettings() {
return sampleRowKeysSettings;
}

/** Returns the object with the settings used for point reads via ReadRows. */
public UnaryCallSettings<Query, Row> readRowSettings() {
return readRowSettings;
}

/** Returns the object with the settings used for calls to MutateRow. */
public UnaryCallSettings<RowMutation, Void> mutateRowSettings() {
return mutateRowSettings;
Expand Down Expand Up @@ -200,6 +217,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private String appProfileId;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
private final UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings;
private final UnaryCallSettings.Builder<RowMutation, Void> mutateRowSettings;
private final BatchingCallSettings.Builder<RowMutation, Void> bulkMutateRowsSettings;
Expand Down Expand Up @@ -234,18 +252,27 @@ private Builder() {

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
/* TODO: copy timeouts, retryCodes & retrySettings from baseSettings.readRows once it exists in GAPIC */
readRowsSettings
.setRetryableCodes(DEFAULT_RETRY_CODES)
.setRetrySettings(
DEFAULT_RETRY_SETTINGS.toBuilder().setTotalTimeout(Duration.ofHours(1)).build())
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
.setRetrySettings(baseDefaults.readRowsSettings().getRetrySettings())
.setIdleTimeout(Duration.ofMinutes(5));

// Point reads should use same defaults as streaming reads, but with a shorter timeout
readRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
readRowSettings
.setRetryableCodes(baseDefaults.readRowsSettings().getRetryableCodes())
.setRetrySettings(
baseDefaults
.readRowsSettings()
.getRetrySettings()
.toBuilder()
.setTotalTimeout(DEFAULT_RETRY_SETTINGS.getTotalTimeout())
.build());

sampleRowKeysSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
/* TODO: copy retryCodes & retrySettings from baseSettings.sampleRowKeysSettings once it exists in GAPIC */
sampleRowKeysSettings
.setRetryableCodes(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE, Code.ABORTED)
.setRetrySettings(DEFAULT_RETRY_SETTINGS);
.setRetryableCodes(baseDefaults.sampleRowKeysSettings().getRetryableCodes())
.setRetrySettings(baseDefaults.sampleRowKeysSettings().getRetrySettings());

mutateRowSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
copyRetrySettings(baseDefaults.mutateRowSettings(), mutateRowSettings);
Expand Down Expand Up @@ -282,6 +309,7 @@ private Builder(EnhancedBigtableStubSettings settings) {

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
readRowSettings = settings.readRowSettings.toBuilder();
sampleRowKeysSettings = settings.sampleRowKeysSettings.toBuilder();
mutateRowSettings = settings.mutateRowSettings.toBuilder();
bulkMutateRowsSettings = settings.bulkMutateRowsSettings.toBuilder();
Expand Down Expand Up @@ -339,6 +367,11 @@ public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
}

/** Returns the builder for the settings used for point reads using readRow. */
public UnaryCallSettings.Builder<Query, Row> readRowSettings() {
return readRowSettings;
}

/** Returns the builder for the settings used for calls to SampleRowKeysSettings. */
public UnaryCallSettings.Builder<String, List<KeyOffset>> sampleRowKeysSettings() {
return sampleRowKeysSettings;
Expand Down
Loading

0 comments on commit 36fbd97

Please sign in to comment.