diff --git a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java new file mode 100644 index 000000000000..e37bff9d3d2a --- /dev/null +++ b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtil.java @@ -0,0 +1,388 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.internal; + +import com.google.api.core.InternalApi; +import com.google.auto.value.AutoValue; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy; +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.SortedSet; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Internal helper to split a {@link RowSet} into segments based on keys. + * + *

This class is considered an internal implementation detail and not meant to be used by + * applications. + * + * @see Query#shard(List) + * @see Query#getBound() + * @see ReadRowsResumptionStrategy#getResumeRequest(ReadRowsRequest) + */ +@InternalApi +public final class RowSetUtil { + private RowSetUtil() {} + + /** + * Splits the provided {@link RowSet} along the provided splitPoint into 2 segments. + * The right segment will contain all keys that are strictly greater than the splitPoint and all + * {@link RowRange}s truncated to start right after the splitPoint. + */ + @Nonnull + public static Split split(@Nonnull RowSet rowSet, @Nonnull ByteString splitPoint) { + ImmutableSortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE).add(splitPoint).build(); + + List splits = split(rowSet, splitPoints, true); + + return Split.of(splits.get(0), splits.get(1)); + } + + /** + * Splits the provided {@link RowSet} into segments partitioned by the provided {@code + * splitPoints}. Each split point represents the last row of the corresponding segment. The row + * keys contained in the provided {@link RowSet} will be distributed across the segments. Each + * range in the {@link RowSet} will be split up across each segment. + * + * @see #split(RowSet, SortedSet, boolean) for more details. + */ + @Nonnull + public static List shard( + @Nonnull RowSet rowSet, @Nonnull SortedSet splitPoints) { + return split(rowSet, splitPoints, false); + } + + /** + * Split a {@link RowSet} into segments. + * + *

Each segment is defined by a split point. The split point identifies the segment's inclusive + * end. This means that the first segment will start at the beginning of the table and extend to + * include the first split point. The last segment will start just after the last split point and + * extend until the end of the table. The maximum number of segments that can be returned is the + * number of split points + 1. + * + *

Each segment is represented by a RowSet in the returned List. Each of the returned RowSets + * will contain all of the {@link RowRange}s and keys that fall between the previous segment and + * this segment's split point. If there are no {@link RowRange}s or keys that belong to a segment, + * then that segment will either be omitted or if {@code preserveNullSegments} is true, then it + * will be represented by a null value in the returned list. + * + *

The segments in the returned list are guaranteed to be sorted. If {@code + * preserveNullSegments} is true, then it will have exactly {@code splitPoints.size() + 1} items. + * The extra segment will contain keys and {@link RowRange}s between the last splitPoint and the + * end of the table. + * + *

Please note that an empty {@link RowSet} is treated like a full table scan and each segment + * will contain a {@link RowRange} that covers the full extent of the segment. + */ + @Nonnull + static List split( + @Nonnull RowSet rowSet, + @Nonnull SortedSet splitPoints, + boolean preserveNullSegments) { + // An empty RowSet represents a full table scan. Make that explicit so that there is RowRange to + // split. + if (RowSet.getDefaultInstance().equals(rowSet)) { + rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build(); + } + + // Create sorted copies of the ranges and keys in the RowSet + ByteString[] rowKeys = + rowSet.getRowKeysList().toArray(new ByteString[rowSet.getRowKeysCount()]); + RowRange[] rowRanges = + rowSet.getRowRangesList().toArray(new RowRange[rowSet.getRowRangesCount()]); + + Arrays.sort(rowKeys, ByteStringComparator.INSTANCE); + Arrays.sort(rowRanges, RANGE_START_COMPARATOR); + + List results = Lists.newArrayList(); + + // Track consumption of input ranges & keys. + int rowKeysStart = 0; + int rowRangesStart = 0; + + // Keys and ranges that lie before the current split point, + RowSet.Builder segment = RowSet.newBuilder(); + boolean isSegmentEmpty = true; + + for (ByteString splitPoint : splitPoints) { + Preconditions.checkState(!splitPoint.isEmpty(), "Split point can't be empty"); + + // Consume all of the row keys that lie on and to the left of the split point. Consumption is + // designated by advancing rowKeysStart. + for (int i = rowKeysStart; i < rowKeys.length; i++) { + ByteString rowKey = rowKeys[i]; + if (ByteStringComparator.INSTANCE.compare(rowKey, splitPoint) <= 0) { + segment.addRowKeys(rowKey); + isSegmentEmpty = false; + rowKeysStart++; + } else { + // This key and all following keys belong to a later segment. + break; + } + } + + // Consume all of the ranges that lie before the split point (splitting the range if + // necessary). Consumption is designated by advancing rowRangesStart. + for (int i = rowRangesStart; i < rowRanges.length; i++) { + RowRange rowRange = rowRanges[i]; + + // Break early when encountering the first start point that is past the split point. + // (The split point is the inclusive end of of the segment) + int startCmp = StartPoint.extract(rowRange).compareTo(new StartPoint(splitPoint, true)); + if (startCmp > 0) { + break; + } + + // Some part of this range will be in the segment. + isSegmentEmpty = false; + + // Figure out the endpoint and remainder. + int endCmp = EndPoint.extract(rowRange).compareTo(new EndPoint(splitPoint, true)); + if (endCmp <= 0) { + // The range is fully contained in the segment. + segment.addRowRanges(rowRange); + + // Consume the range, but take care to shift partially consumed ranges to fill the gap + // created by consuming the current range. For example if the list contained the following + // ranges: [a-z], [b-d], [f-z] and the split point was 'e'. Then after processing the + // split point, the list would contain: (d-z], GAP, [f-z]. So we fill the gap by shifting + // (d-z] over by one and advancing rowRangesStart. + // Partially consumed ranges will only exist if the original RowSet had overlapping + // ranges, this should be a rare occurrence. + System.arraycopy( + rowRanges, rowRangesStart, rowRanges, rowRangesStart + 1, i - rowRangesStart); + rowRangesStart++; + } else { + // The range is split: + // Add the left part to the segment + RowRange leftSubRange = rowRange.toBuilder().setEndKeyClosed(splitPoint).build(); + segment.addRowRanges(leftSubRange); + // Save the remainder for the next segment. This is done by replacing the current rowRange + // with the remainder and not advancing rowRangesStart. + RowRange rightSubRange = rowRange.toBuilder().setStartKeyOpen(splitPoint).build(); + rowRanges[i] = rightSubRange; + } + } + + // Build the current segment + if (!isSegmentEmpty) { + results.add(segment.build()); + isSegmentEmpty = true; + segment = RowSet.newBuilder(); + } else if (preserveNullSegments) { + results.add(null); + } + } + + // Create the last segment (from the last splitKey to the end of the table) + for (int i = rowKeysStart; i < rowKeys.length; i++) { + isSegmentEmpty = false; + segment.addRowKeys(rowKeys[i]); + } + for (int i = rowRangesStart; i < rowRanges.length; i++) { + isSegmentEmpty = false; + segment.addRowRanges(rowRanges[i]); + } + if (!isSegmentEmpty) { + results.add(segment.build()); + } else if (preserveNullSegments) { + results.add(null); + } + + return results; + } + + /** Get the bounding range of a {@link RowSet}. */ + public static ByteStringRange getBound(RowSet rowSet) { + // Find min & max keys + ByteString minKey = null; + ByteString maxKey = null; + + for (ByteString key : rowSet.getRowKeysList()) { + if (minKey == null || ByteStringComparator.INSTANCE.compare(minKey, key) > 0) { + minKey = key; + } + if (maxKey == null || ByteStringComparator.INSTANCE.compare(maxKey, key) < 0) { + maxKey = key; + } + } + + // Convert min & max keys in start & end points for a range + StartPoint minStartPoint = null; + EndPoint maxEndPoint = null; + if (minKey != null) { + minStartPoint = new StartPoint(minKey, true); + } + if (maxKey != null) { + maxEndPoint = new EndPoint(maxKey, true); + } + + // Expand the range using the RowSet ranges + for (RowRange rowRange : rowSet.getRowRangesList()) { + StartPoint currentStartPoint = StartPoint.extract(rowRange); + if (minStartPoint == null || minStartPoint.compareTo(currentStartPoint) > 0) { + minStartPoint = currentStartPoint; + } + + EndPoint currentEndpoint = EndPoint.extract(rowRange); + if (maxEndPoint == null || maxEndPoint.compareTo(currentEndpoint) < 0) { + maxEndPoint = currentEndpoint; + } + } + + // Build a range using the endpoints + ByteStringRange boundingRange = ByteStringRange.unbounded(); + if (minStartPoint != null) { + if (minStartPoint.isClosed) { + boundingRange.startClosed(minStartPoint.value); + } else { + boundingRange.startOpen(minStartPoint.value); + } + } + if (maxEndPoint != null) { + if (maxEndPoint.isClosed) { + boundingRange.endClosed(maxEndPoint.value); + } else { + boundingRange.endOpen(maxEndPoint.value); + } + } + + return boundingRange; + } + + /** + * Represents a RowSet split into 2 non-overlapping parts. + * + *

This class is considered an internal implementation detail and not meant to be used by + * applications. + */ + @InternalApi + @AutoValue + public static abstract class Split { + @Nullable + public abstract RowSet getLeft(); + @Nullable + public abstract RowSet getRight(); + + public static Split of(RowSet left, RowSet right) { + return new AutoValue_RowSetUtil_Split(left, right); + } + } + + private static final Comparator RANGE_START_COMPARATOR = + new Comparator() { + @Override + public int compare(@Nonnull RowRange o1, @Nonnull RowRange o2) { + return StartPoint.extract(o1).compareTo(StartPoint.extract(o2)); + } + }; + + /** Helper class to ease comparison of RowRange start points. */ + private static final class StartPoint implements Comparable { + private final ByteString value; + private final boolean isClosed; + + @Nonnull + static StartPoint extract(@Nonnull RowRange rowRange) { + switch (rowRange.getStartKeyCase()) { + case STARTKEY_NOT_SET: + return new StartPoint(ByteString.EMPTY, true); + case START_KEY_CLOSED: + return new StartPoint(rowRange.getStartKeyClosed(), true); + case START_KEY_OPEN: + if (rowRange.getStartKeyOpen().isEmpty()) { + // Take care to normalize an open empty start key to be closed. + return new StartPoint(ByteString.EMPTY, true); + } else { + return new StartPoint(rowRange.getStartKeyOpen(), false); + } + default: + throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase()); + } + } + + StartPoint(@Nonnull ByteString value, boolean isClosed) { + this.value = value; + this.isClosed = isClosed; + } + + @Override + public int compareTo(@Nonnull StartPoint o) { + return ComparisonChain.start() + // Empty string comes first + .compareTrueFirst(value.isEmpty(), o.value.isEmpty()) + .compare(value, o.value, ByteStringComparator.INSTANCE) + // Closed start point comes before an open start point: [x,y] starts before (x,y]. + .compareTrueFirst(isClosed, o.isClosed) + .result(); + } + } + + /** Helper class to ease comparison of RowRange endpoints. */ + private static final class EndPoint implements Comparable { + private final ByteString value; + private final boolean isClosed; + + @Nonnull + static EndPoint extract(@Nonnull RowRange rowRange) { + switch (rowRange.getEndKeyCase()) { + case ENDKEY_NOT_SET: + return new EndPoint(ByteString.EMPTY, true); + case END_KEY_CLOSED: + return new EndPoint(rowRange.getEndKeyClosed(), true); + case END_KEY_OPEN: + if (rowRange.getEndKeyOpen().isEmpty()) { + // Take care to normalize an open empty end key to be closed. + return new EndPoint(ByteString.EMPTY, true); + } else { + return new EndPoint(rowRange.getEndKeyOpen(), false); + } + default: + throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase()); + } + } + + EndPoint(@Nonnull ByteString value, boolean isClosed) { + this.value = value; + this.isClosed = isClosed; + } + + @Override + public int compareTo(@Nonnull EndPoint o) { + return ComparisonChain.start() + // Empty string comes last + .compareFalseFirst(value.isEmpty(), o.value.isEmpty()) + .compare(value, o.value, ByteStringComparator.INSTANCE) + // Open end point comes before a closed end point: [x,y) ends before [x,y]. + .compareFalseFirst(isClosed, o.isClosed) + .result(); + } + } +} diff --git a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/package-info.java b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/package-info.java new file mode 100644 index 000000000000..bd02f2a63f20 --- /dev/null +++ b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/package-info.java @@ -0,0 +1,25 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation helpers used by the client. + * + *

Classes defined here do not have a stable API and change without deprecation. + */ +@InternalApi +package com.google.cloud.bigtable.data.v2.internal; + +import com.google.api.core.InternalApi; diff --git a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java index f7b734f167e1..41091c894bad 100644 --- a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java +++ b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java @@ -18,15 +18,22 @@ import com.google.api.core.InternalApi; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; import com.google.bigtable.v2.TableName; +import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator; import com.google.cloud.bigtable.data.v2.internal.RequestContext; +import com.google.cloud.bigtable.data.v2.internal.RowSetUtil; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.List; +import java.util.SortedSet; /** A simple wrapper to construct a query for the ReadRows RPC. */ public final class Query implements Serializable { @@ -163,6 +170,76 @@ public Query limit(long limit) { return this; } + /** + * Split this query into multiple queries that can be evenly distributed across Bigtable nodes and + * be run in parallel. This method takes the results from {@link + * com.google.cloud.bigtable.data.v2.BigtableDataClient#sampleRowKeysAsync(String)} to divide this + * query into a set of disjoint queries that logically combine into form this query. + * + *

Expected Usage: + * + *

{@code
+   * List keyOffsets = dataClient.sampleRowKeysAsync("my-table").get();
+   * List queryShards = myQuery.shard(keyOffsets);
+   * List>> futures = new ArrayList();
+   * for (Query subQuery : queryShards) {
+   *   futures.add(dataClient.readRowsCallable().all().futureCall(subQuery));
+   * }
+   * List> results = ApiFutures.allAsList(futures).get();
+   * }
+ */ + public List shard(List sampledRowKeys) { + Preconditions.checkState(builder.getRowsLimit() == 0, "Can't shard query with row limits"); + + ImmutableSortedSet.Builder splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE); + + for (KeyOffset keyOffset : sampledRowKeys) { + if (!keyOffset.geyKey().isEmpty()) { + splitPoints.add(keyOffset.geyKey()); + } + } + + return shard(splitPoints.build()); + } + + /** + * Split this query into multiple queries that logically combine into this query. This is intended + * to be used by map reduce style frameworks like Beam to split a query across multiple workers. + * + *

Expected Usage: + * + *

{@code
+   * List splitPoints = ...;
+   * List queryShards = myQuery.shard(splitPoints);
+   * List>> futures = new ArrayList();
+   * for (Query subQuery : queryShards) {
+   *   futures.add(dataClient.readRowsCallable().all().futureCall(subQuery));
+   * }
+   * List> results = ApiFutures.allAsList(futures).get();
+   * }
+ */ + public List shard(SortedSet splitPoints) { + Preconditions.checkState(builder.getRowsLimit() == 0, "Can't shard a query with a row limit"); + + List shardedRowSets = RowSetUtil.shard(builder.getRows(), splitPoints); + List shards = Lists.newArrayListWithCapacity(shardedRowSets.size()); + + for (RowSet rowSet : shardedRowSets) { + Query queryShard = new Query(tableId); + queryShard.builder.mergeFrom(this.builder.build()); + queryShard.builder.setRows(rowSet); + shards.add(queryShard); + } + + return shards; + } + + /** Get the minimal range that encloses all of the row keys and ranges in this Query. */ + public ByteStringRange getBound() { + return RowSetUtil.getBound(builder.getRows()); + } + /** * Creates the request protobuf. This method is considered an internal implementation detail and * not meant to be used by applications. diff --git a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java index 5ec34504cb87..ab312ec41c29 100644 --- a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java +++ b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsResumptionStrategy.java @@ -19,9 +19,8 @@ import com.google.api.gax.retrying.StreamResumptionStrategy; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsRequest.Builder; -import com.google.bigtable.v2.RowRange; import com.google.bigtable.v2.RowSet; -import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator; +import com.google.cloud.bigtable.data.v2.internal.RowSetUtil; import com.google.cloud.bigtable.data.v2.models.RowAdapter; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; @@ -79,98 +78,27 @@ public RowT processResponse(RowT response) { * received rows. */ @Override - public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { + public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) { // An empty lastKey means that we have not successfully read the first row, // so resume with the original request object. if (lastKey.isEmpty()) { - return request; + return originalRequest; } - ReadRowsRequest originalRequest = request; - - // Special case: empty query implies full table scan, so make this explicit by adding an - // unbounded range to the request - if (request.getRows().getRowKeysList().isEmpty() - && request.getRows().getRowRangesList().isEmpty()) { - - originalRequest = - request - .toBuilder() - .setRows(RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance())) - .build(); - } - - // Start building the resume request. The keys & ranges are cleared and will be recomputed. - Builder builder = originalRequest.toBuilder(); - builder.clearRows(); - - RowSet.Builder rowSetBuilder = RowSet.newBuilder(); - - for (ByteString key : originalRequest.getRows().getRowKeysList()) { - if (ByteStringComparator.INSTANCE.compare(key, lastKey) > 0) { - rowSetBuilder.addRowKeys(key); - } - } - - for (RowRange rowRange : originalRequest.getRows().getRowRangesList()) { - RowRange.Builder rowRangeBuilder = RowRange.newBuilder(); - - switch (rowRange.getEndKeyCase()) { - case END_KEY_CLOSED: - if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyClosed(), lastKey) > 0) { - rowRangeBuilder.setEndKeyClosed(rowRange.getEndKeyClosed()); - } else { - continue; - } - break; - case END_KEY_OPEN: - if (ByteStringComparator.INSTANCE.compare(rowRange.getEndKeyOpen(), lastKey) > 0) { - rowRangeBuilder.setEndKeyOpen(rowRange.getEndKeyOpen()); - } else { - continue; - } - break; - case ENDKEY_NOT_SET: - rowRangeBuilder.clearEndKey(); - break; - default: - throw new IllegalArgumentException("Unknown endKeyCase: " + rowRange.getEndKeyCase()); - } - - switch (rowRange.getStartKeyCase()) { - case STARTKEY_NOT_SET: - rowRangeBuilder.setStartKeyOpen(lastKey); - break; - case START_KEY_OPEN: - if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyOpen(), lastKey) < 0) { - rowRangeBuilder.setStartKeyOpen(lastKey); - } else { - rowRangeBuilder.setStartKeyOpen(rowRange.getStartKeyOpen()); - } - break; - case START_KEY_CLOSED: - if (ByteStringComparator.INSTANCE.compare(rowRange.getStartKeyClosed(), lastKey) <= 0) { - rowRangeBuilder.setStartKeyOpen(lastKey); - } else { - rowRangeBuilder.setStartKeyClosed(rowRange.getStartKeyClosed()); - } - break; - default: - throw new IllegalArgumentException("Unknown startKeyCase: " + rowRange.getStartKeyCase()); - } - rowSetBuilder.addRowRanges(rowRangeBuilder.build()); - } + RowSet remaining = RowSetUtil.split(originalRequest.getRows(), lastKey).getRight(); // Edge case: retrying a fulfilled request. // A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it // had a row limit, has seen enough rows. These requests are replaced with a marker request that // will be handled by ReadRowsRetryCompletedCallable. See docs in ReadRowsRetryCompletedCallable // for more details. - if ((rowSetBuilder.getRowRangesCount() == 0 && rowSetBuilder.getRowKeysCount() == 0) + if (remaining == null || (originalRequest.getRowsLimit() > 0 && originalRequest.getRowsLimit() == numProcessed)) { return ReadRowsRetryCompletedCallable.FULFILLED_REQUEST_MARKER; } + Builder builder = originalRequest.toBuilder().setRows(remaining); + if (originalRequest.getRowsLimit() > 0) { Preconditions.checkState( originalRequest.getRowsLimit() > numProcessed, @@ -178,7 +106,6 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest request) { builder.setRowsLimit(originalRequest.getRowsLimit() - numProcessed); } - builder.setRows(rowSetBuilder.build()); return builder.build(); } } diff --git a/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtilTest.java b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtilTest.java new file mode 100644 index 000000000000..21d1dd7b0c74 --- /dev/null +++ b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/RowSetUtilTest.java @@ -0,0 +1,585 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.internal; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.SortedSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RowSetUtilTest { + @Test + public void noSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("a")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("p")) + .setEndKeyOpen(ByteString.copyFromUtf8("q"))) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE).build(); + + verifySplit(rowSet, splitPoints, rowSet); + } + + @Test + public void splitEmptyTest() { + RowSet rowSet = RowSet.newBuilder().build(); + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("a")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder() + .addRowRanges(RowRange.newBuilder().setEndKeyClosed(ByteString.copyFromUtf8("a"))) + .build(), + RowSet.newBuilder() + .addRowRanges(RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("a"))) + .build()); + } + + @Test + public void splitMultipleKeysTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("1-beforeSplit")) + .addRowKeys(ByteString.copyFromUtf8("2-onSplit")) + .addRowKeys(ByteString.copyFromUtf8("3-afterSplit")) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("2-onSplit")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("1-beforeSplit")) + .addRowKeys(ByteString.copyFromUtf8("2-onSplit")) + .build(), + RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("3-afterSplit")).build()); + } + + @Test + public void splitKeysEmptyLeft() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("5-test")) + .addRowKeys(ByteString.copyFromUtf8("8-test")) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("0-split")) + .add(ByteString.copyFromUtf8("6-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + null, + RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("5-test")).build(), + RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("8-test")).build()); + } + + @Test + public void splitKeysEmptyRight() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("0-test")) + .addRowKeys(ByteString.copyFromUtf8("2-test")) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("1-split")) + .add(ByteString.copyFromUtf8("5-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("0-test")).build(), + RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("2-test")).build(), + null); + } + + @Test + public void rangeLeftOfSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("0-key")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("1-range-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("2-range-end"))) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("3-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("0-key")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("1-range-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("2-range-end"))) + .build(), + null); + } + + @Test + public void unboundedRangeLeftOfSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder().setEndKeyOpen(ByteString.copyFromUtf8("1-range-end"))) + .build(); + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("5-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder().setEndKeyOpen(ByteString.copyFromUtf8("1-range-end"))) + .build(), + null); + } + + @Test + public void rangeImmediatelyLeftOfSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("0-key")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("1-range-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("2-range-end"))) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("2-range-end")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("0-key")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("1-range-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("2-range-end"))) + .build(), + null); + } + + @Test + public void rangeRightOfSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("9-row-key")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("5-range-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("7-range-end"))) + .build(); + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("3-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + null, + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("9-row-key")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("5-range-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("7-range-end"))) + .build()); + } + + @Test + public void unboundedRightOfSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("5-range-start"))) + .build(); + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("3-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + null, + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("5-range-start"))) + .build()); + } + + @Test + public void rangeExactlyFitsSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("5-split")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("3-split")) + .setEndKeyClosed(ByteString.copyFromUtf8("5-split"))) + .build(); + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("3-split")) + .add(ByteString.copyFromUtf8("5-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + null, + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("5-split")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("3-split")) + .setEndKeyClosed(ByteString.copyFromUtf8("5-split"))) + .build(), + null); + } + + @Test + public void startOnSplitPointTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder().setStartKeyClosed(ByteString.copyFromUtf8("3-split"))) + .build(); + + // Inclusive start on a split point should generate 2 segments + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("3-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("3-split")) + .setEndKeyClosed(ByteString.copyFromUtf8("3-split"))) + .build(), + RowSet.newBuilder() + .addRowRanges(RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("3-split"))) + .build()); + } + + @Test + public void mixedSplitTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("0")) + .addRowKeys(ByteString.copyFromUtf8("a")) + .addRowKeys(ByteString.copyFromUtf8("c")) + // Range 1: fully in "a" segment + .addRowRanges(RowRange.newBuilder().setEndKeyClosed(ByteString.copyFromUtf8("a"))) + // Range 2: split between segment "a" & "d" + .addRowRanges(RowRange.newBuilder().setEndKeyClosed(ByteString.copyFromUtf8("b"))) + // Range 3: split between segment "d" & "j" + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("c")) + .setEndKeyClosed(ByteString.copyFromUtf8("e"))) + // Range 4: fully in "j" + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("d")) + .setEndKeyClosed(ByteString.copyFromUtf8("f"))) + // Range 5: fully in "j" + .addRowRanges(RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("m"))) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + // Split the unbounded + .add(ByteString.copyFromUtf8("a")) + .add(ByteString.copyFromUtf8("d")) + .add(ByteString.copyFromUtf8("j")) + .add(ByteString.copyFromUtf8("o")) + .build(); + + verifySplit( + rowSet, + splitPoints, + // Split "a" + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("0")) + .addRowKeys(ByteString.copyFromUtf8("a")) + // Range 1 + .addRowRanges(RowRange.newBuilder().setEndKeyClosed(ByteString.copyFromUtf8("a"))) + // Range 2: part1 + .addRowRanges(RowRange.newBuilder().setEndKeyClosed(ByteString.copyFromUtf8("a"))) + .build(), + // Split "d" + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("c")) + // Range 2: part 2 + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("a")) + .setEndKeyClosed(ByteString.copyFromUtf8("b"))) + // Range 3: part 1 + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("c")) + .setEndKeyClosed(ByteString.copyFromUtf8("d"))) + .build(), + // Split "j" + RowSet.newBuilder() + // Range 3: part 2 + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("d")) + .setEndKeyClosed(ByteString.copyFromUtf8("e"))) + // Range 4 + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("d")) + .setEndKeyClosed(ByteString.copyFromUtf8("f"))) + .build(), + // Split "o" + RowSet.newBuilder() + // Range 5: part1 + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("m")) + .setEndKeyClosed(ByteString.copyFromUtf8("o"))) + .build(), + // Remainder + RowSet.newBuilder() + // Range 5: part2 + .addRowRanges(RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("o"))) + .build()); + } + + @Test + public void unsortedRequestTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("7-row-key-1")) + .addRowKeys(ByteString.copyFromUtf8("2-row-key-2")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("8-range-1-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("9-range-1-end"))) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("3-range-2-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("4-range-2-end"))) + .build(); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("5-split")) + .build(); + + verifySplit( + rowSet, + splitPoints, + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("2-row-key-2")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("3-range-2-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("4-range-2-end"))) + .build(), + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("7-row-key-1")) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("8-range-1-start")) + .setEndKeyOpen(ByteString.copyFromUtf8("9-range-1-end"))) + .build()); + } + + @Test + public void emptyBoundTest() { + RowSet rowSet = RowSet.getDefaultInstance(); + + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded()); + } + + @Test + public void singleKeyBoundTest() { + RowSet rowSet = RowSet.newBuilder().addRowKeys(ByteString.copyFromUtf8("a")).build(); + + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().startClosed("a").endClosed("a")); + } + + @Test + public void multiKeyBoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowKeys(ByteString.copyFromUtf8("a")) + .addRowKeys(ByteString.copyFromUtf8("d")) + .build(); + + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().startClosed("a").endClosed("d")); + } + + @Test + public void singleClosedClosedRangeBoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyClosed(ByteString.copyFromUtf8("b"))) + .build(); + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().startClosed("a").endClosed("b")); + } + + @Test + public void singleClosedOpenRangeBoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyOpen(ByteString.copyFromUtf8("b"))) + .build(); + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().startClosed("a").endOpen("b")); + } + + @Test + public void singleOpenOpenRangeBoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("a")) + .setEndKeyOpen(ByteString.copyFromUtf8("b"))) + .build(); + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().startOpen("a").endOpen("b")); + } + + @Test + public void singleRangeOpenClosedBoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("a")) + .setEndKeyClosed(ByteString.copyFromUtf8("b"))) + .build(); + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().startOpen("a").endClosed("b")); + } + + @Test + public void singleRangeUnbounded1BoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges(RowRange.newBuilder().setStartKeyClosed(ByteString.copyFromUtf8("a"))) + .build(); + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().startClosed("a")); + } + + @Test + public void singleRangeUnbounded2BoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges(RowRange.newBuilder().setEndKeyClosed(ByteString.copyFromUtf8("z"))) + .build(); + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.unbounded().endClosed("z")); + } + + @Test + public void multipleRangeBoundTest() { + RowSet rowSet = + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyOpen(ByteString.copyFromUtf8("m"))) + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("q")) + .setEndKeyOpen(ByteString.copyFromUtf8("z"))) + .build(); + ByteStringRange actual = RowSetUtil.getBound(rowSet); + assertThat(actual).isEqualTo(ByteStringRange.create("a", "z")); + } + + // Helpers + private static void verifySplit(RowSet input, SortedSet splits, RowSet... expected) { + List actualWithNull = RowSetUtil.split(input, splits, true); + assertThat(actualWithNull).containsExactly(expected).inOrder(); + + List actualNonnull = RowSetUtil.split(input, splits, false); + List expectedNonnull = Lists.newArrayList(); + for (RowSet rowSet : expected) { + if (rowSet != null) { + expectedNonnull.add(rowSet); + } + } + assertThat(actualNonnull).containsExactlyElementsIn(expectedNonnull).inOrder(); + } +} diff --git a/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java index c9e79d06af3f..85521bc2dbe9 100644 --- a/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java +++ b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java @@ -22,15 +22,21 @@ import com.google.bigtable.v2.ReadRowsRequest.Builder; import com.google.bigtable.v2.RowFilter; import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.RowSet; import com.google.bigtable.v2.TableName; +import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator; import com.google.cloud.bigtable.data.v2.internal.RequestContext; import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedSet; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.util.List; +import java.util.SortedSet; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -140,6 +146,82 @@ public void serializationTest() throws IOException, ClassNotFoundException { assertThat(actual.toProto(requestContext)).isEqualTo(expected.toProto(requestContext)); } + @Test + public void shardTestSplitPoints() { + Query query = Query.create(TABLE_NAME.getTable()).range("a", "z"); + + SortedSet splitPoints = + ImmutableSortedSet.orderedBy(ByteStringComparator.INSTANCE) + .add(ByteString.copyFromUtf8("j")) + .build(); + + List subQueries = query.shard(splitPoints); + + assertThat(subQueries).hasSize(2); + assertThat(subQueries.get(0).toProto(requestContext)) + .isEqualTo( + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME.toString()) + .setAppProfileId(APP_PROFILE_ID) + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyClosed(ByteString.copyFromUtf8("j")))) + .build()); + assertThat(subQueries.get(1).toProto(requestContext)) + .isEqualTo( + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME.toString()) + .setAppProfileId(APP_PROFILE_ID) + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("j")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")))) + .build()); + } + + @Test + public void shardTestKeyOffsets() { + Query query = Query.create(TABLE_NAME.getTable()).range("a", "z"); + + List keyOffsets = + ImmutableList.of( + KeyOffset.create(ByteString.copyFromUtf8("j"), 10), + KeyOffset.create(ByteString.EMPTY, 100)); + + List subQueries = query.shard(keyOffsets); + + assertThat(subQueries).hasSize(2); + assertThat(subQueries.get(0).toProto(requestContext)) + .isEqualTo( + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME.toString()) + .setAppProfileId(APP_PROFILE_ID) + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyClosed(ByteString.copyFromUtf8("j")))) + .build()); + assertThat(subQueries.get(1).toProto(requestContext)) + .isEqualTo( + ReadRowsRequest.newBuilder() + .setTableName(TABLE_NAME.toString()) + .setAppProfileId(APP_PROFILE_ID) + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("j")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")))) + .build()); + } + private static ReadRowsRequest.Builder expectedProtoBuilder() { return ReadRowsRequest.newBuilder() .setTableName(TABLE_NAME.toString())