Skip to content

Commit

Permalink
feat: support lazy decoding of query results (#2847)
Browse files Browse the repository at this point in the history
* feat: support lazy decoding of query results

Adds an option for lazy decoding of query results. Currently, all values
in a query result row are decoded from protobuf values to plain Java
objects at the moment that the result set is advanced to the next row.
This means that all values are decoded, regardless whether the
application actually fetches these or not.

Lazy decoding also enables the possibility for (internal) consumers
of a result set to access the protobuf value before it is converted
to a plain Java object. This for example allows ChecksumResultSet to
calculate the checksum based on the protobuf value, instead of a
Java object, which can be more efficient.

* fix: add null check

* perf: calculate checksum using protobuf values (#2848)

* perf: calculate checksum using protobuf values

* chore: cleanup

* test: remove unrelated test

* fix: undo change to public API

* chore: cleanup|
  • Loading branch information
olavloite authored Feb 9, 2024
1 parent b37881b commit 6fc57e4
Show file tree
Hide file tree
Showing 23 changed files with 910 additions and 446 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private TraceWrapper tracer;
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private DecodeMode defaultDecodeMode = SpannerOptions.Builder.DEFAULT_DECODE_MODE;
private DirectedReadOptions defaultDirectedReadOption;
private ExecutorProvider executorProvider;
private Clock clock = new Clock();
Expand Down Expand Up @@ -111,6 +112,11 @@ B setDefaultQueryOptions(QueryOptions defaultQueryOptions) {
return self();
}

B setDefaultDecodeMode(DecodeMode defaultDecodeMode) {
this.defaultDecodeMode = defaultDecodeMode;
return self();
}

B setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return self();
Expand Down Expand Up @@ -411,8 +417,8 @@ void initTransaction() {
TraceWrapper tracer;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

private final DirectedReadOptions defaultDirectedReadOptions;
private final DecodeMode defaultDecodeMode;
private final Clock clock;

@GuardedBy("lock")
Expand All @@ -438,6 +444,7 @@ void initTransaction() {
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.defaultDirectedReadOptions = builder.defaultDirectedReadOption;
this.defaultDecodeMode = builder.defaultDecodeMode;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
this.clock = builder.clock;
Expand Down Expand Up @@ -727,7 +734,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode);
}

/**
Expand Down Expand Up @@ -871,7 +879,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
return stream;
}
};
return new GrpcResultSet(stream, this);
return new GrpcResultSet(
stream, this, readOptions.hasDecodeMode() ? readOptions.decodeMode() : defaultDecodeMode);
}

private Struct consumeSingleRow(ResultSet resultSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDecodeMode(sessionClient.getSpanner().getDefaultDecodeMode())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
Expand All @@ -81,6 +82,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId()))
.setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider())
.setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks())
.setDefaultDecodeMode(sessionClient.getSpanner().getDefaultDecodeMode())
.setDefaultDirectedReadOptions(
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

/** Specifies how and when to decode a value from protobuf to a plain Java object. */
public enum DecodeMode {
/**
* Decodes all columns of a row directly when a {@link ResultSet} is advanced to the next row with
* {@link ResultSet#next()}
*/
DIRECT,
/**
* Decodes all columns of a row the first time a {@link ResultSet} value is retrieved from the
* row.
*/
LAZY_PER_ROW,
/**
* Decodes a columns of a row the first time the value of that column is retrieved from the row.
*/
LAZY_PER_COL,
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.spanner.v1.ResultSetStats;

/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
public class ForwardingResultSet extends ForwardingStructReader implements ResultSet {
public class ForwardingResultSet extends ForwardingStructReader implements ProtobufResultSet {

private Supplier<ResultSet> delegate;

Expand Down Expand Up @@ -55,6 +55,22 @@ public boolean next() throws SpannerException {
return delegate.get().next();
}

@Override
public boolean canGetProtobufValue(int columnIndex) {
ResultSet resultSetDelegate = delegate.get();
return (resultSetDelegate instanceof ProtobufResultSet)
&& ((ProtobufResultSet) resultSetDelegate).canGetProtobufValue(columnIndex);
}

@Override
public com.google.protobuf.Value getProtobufValue(int columnIndex) {
ResultSet resultSetDelegate = delegate.get();
Preconditions.checkState(
resultSetDelegate instanceof ProtobufResultSet,
"The result set does not support protobuf values");
return ((ProtobufResultSet) resultSetDelegate).getProtobufValue(columnIndex);
}

@Override
public Struct getCurrentRowAsStruct() {
return delegate.get().getCurrentRowAsStruct();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Value;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
Expand All @@ -28,18 +29,37 @@
import javax.annotation.Nullable;

@VisibleForTesting
class GrpcResultSet extends AbstractResultSet<List<Object>> {
class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufResultSet {
private final GrpcValueIterator iterator;
private final Listener listener;
private final DecodeMode decodeMode;
private ResultSetMetadata metadata;
private GrpcStruct currRow;
private SpannerException error;
private ResultSetStats statistics;
private boolean closed;

GrpcResultSet(CloseableIterator<PartialResultSet> iterator, Listener listener) {
this(iterator, listener, DecodeMode.DIRECT);
}

GrpcResultSet(
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
this.iterator = new GrpcValueIterator(iterator);
this.listener = listener;
this.decodeMode = decodeMode;
}

@Override
public boolean canGetProtobufValue(int columnIndex) {
return !closed && currRow != null && currRow.canGetProtoValue(columnIndex);
}

@Override
public Value getProtobufValue(int columnIndex) {
checkState(!closed, "ResultSet is closed");
checkState(currRow != null, "next() call required");
return currRow.getProtoValueInternal(columnIndex);
}

@Override
Expand All @@ -65,7 +85,7 @@ public boolean next() throws SpannerException {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
}
currRow = new GrpcStruct(iterator.type(), new ArrayList<>());
currRow = new GrpcStruct(iterator.type(), new ArrayList<>(), decodeMode);
}
boolean hasNext = currRow.consumeRow(iterator);
if (!hasNext) {
Expand Down
Loading

0 comments on commit 6fc57e4

Please sign in to comment.