Skip to content

Commit

Permalink
feat(spanner): support multiplexed session for Partitioned operations (
Browse files Browse the repository at this point in the history
…#3231)

* feat(spanner): support multiplexed session for Partitioned read or query.

* chore(spanner): lint fixes

* feat(spanner): support multiplexed session for Partitioned DML operations.

* lint(spanner): javadoc fixes.

* feat(spanner): Updated unit tests of Partitioned operations for Multiplexed Session.

* feat(spanner): Updated unit tests of Partitioned operations for Multiplexed Session.

* lint(spanner): Apply suggestions from code review

Co-authored-by: Knut Olav Løite <[email protected]>

* lint(spanner): Apply suggestions from code review

Co-authored-by: Knut Olav Løite <[email protected]>

* feat(spanner): Modified BatchClientImpl to store multiplexed session and create fresh session after expiration date.

* feat(spanner): Removed env variable for Partitioned Ops ensuring that Multiplexed Session for Partitioned Ops is not available to customers.

* lint(spanner): Removed unused variables.

---------

Co-authored-by: Knut Olav Løite <[email protected]>
  • Loading branch information
pratickchokhani and olavloite authored Dec 9, 2024
1 parent 134be9b commit 4501a3e
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ jobs:
env:
JOB_TYPE: test
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true
units-java8:
# Building using Java 17 and run the tests with Java 8 runtime
name: "units (8)"
Expand Down Expand Up @@ -92,6 +93,7 @@ jobs:
env:
JOB_TYPE: test
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true
windows:
runs-on: windows-latest
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
value: "true"
}

env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
value: "true"
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSessionForRW(
SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSessionForRW) {
return sessionPoolOptionsBuilder.setUseMultiplexedSessionForRW(useMultiplexedSessionForRW);
}

// TODO: Remove when multiplexed session for partitioned operations are released.
public static SessionPoolOptions.Builder setUseMultiplexedSessionForPartitionedOperations(
SessionPoolOptions.Builder sessionPoolOptionsBuilder,
boolean useMultiplexedSessionForPartitionedOps) {
return sessionPoolOptionsBuilder.setUseMultiplexedSessionPartitionedOps(
useMultiplexedSessionForPartitionedOps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;

/**
Expand Down Expand Up @@ -51,9 +50,4 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,50 @@
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.TransactionSelector;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/** Default implementation for Batch Client interface. */
public class BatchClientImpl implements BatchClient {
private final SessionClient sessionClient;

BatchClientImpl(SessionClient sessionClient) {
private final boolean isMultiplexedSessionEnabled;

/** Lock to protect the multiplexed session. */
private final ReentrantLock multiplexedSessionLock = new ReentrantLock();

/** The duration before we try to replace the multiplexed session. The default is 7 days. */
private final Duration sessionExpirationDuration;

/** The expiration date/time of the current multiplexed session. */
@GuardedBy("multiplexedSessionLock")
private final AtomicReference<Instant> expirationDate;

@GuardedBy("multiplexedSessionLock")
private final AtomicReference<SessionImpl> multiplexedSessionReference;

BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
this.sessionClient = checkNotNull(sessionClient);
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
this.sessionExpirationDuration =
Duration.ofMillis(
sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getMultiplexedSessionMaintenanceDuration()
.toMillis());
// Initialize the expiration date to the start of time to avoid unnecessary null checks.
// This also ensured that a new session is created on first request.
this.expirationDate = new AtomicReference<>(Instant.MIN);
this.multiplexedSessionReference = new AtomicReference<>();
}

@Override
Expand All @@ -50,7 +84,12 @@ public String getDatabaseRole() {

@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session = sessionClient.createSession();
SessionImpl session;
if (isMultiplexedSessionEnabled) {
session = getMultiplexedSession();
} else {
session = sessionClient.createSession();
}
return new BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
Expand Down Expand Up @@ -92,6 +131,20 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
batchTransactionId);
}

private SessionImpl getMultiplexedSession() {
this.multiplexedSessionLock.lock();
try {
if (Clock.systemUTC().instant().isAfter(this.expirationDate.get())
|| this.multiplexedSessionReference.get() == null) {
this.multiplexedSessionReference.set(this.sessionClient.createMultiplexedSession());
this.expirationDate.set(Clock.systemUTC().instant().plus(this.sessionExpirationDuration));
}
return this.multiplexedSessionReference.get();
} finally {
this.multiplexedSessionLock.unlock();
}
}

private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
implements BatchReadOnlyTransaction {
private final String sessionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;

final boolean useMultiplexedSessionBlindWrite;
Expand All @@ -47,6 +48,7 @@ class DatabaseClientImpl implements DatabaseClient {
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
}
Expand All @@ -58,6 +60,7 @@ class DatabaseClientImpl implements DatabaseClient {
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
}
Expand All @@ -67,12 +70,14 @@ class DatabaseClientImpl implements DatabaseClient {
SessionPool pool,
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
boolean useMultiplexedSessionPartitionedOps,
TraceWrapper tracer,
boolean useMultiplexedSessionForRW) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps;
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
}
Expand Down Expand Up @@ -309,6 +314,14 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
if (useMultiplexedSessionPartitionedOps) {
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
}
return executePartitionedUpdateWithPooledSession(stmt, options);
}

private long executePartitionedUpdateWithPooledSession(
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -224,4 +225,16 @@ private SessionReference getSessionReference() {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}

/**
* Execute `stmt` within PARTITIONED_DML transaction using multiplexed session. This method is a
* blocking call as the interface expects to return the output of the `stmt`.
*/
@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
SessionReference sessionReference = getSessionReference();
return new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true)
.executePartitionedUpdate(stmt, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -553,6 +554,12 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
.transactionManagerAsync(options);
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
return createMultiplexedSessionTransaction(/* singleUse = */ true)
.executePartitionedUpdate(stmt, options);
}

/**
* It is enough with one executor to maintain the multiplexed sessions in all the clients, as they
* do not need to be updated often, and the maintenance task is light. The core pool size is set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@ SessionImpl createSession() {
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void createMultiplexedSession(SessionConsumer consumer) {
try {
SessionImpl sessionImpl = createMultiplexedSession();
consumer.onSessionReady(sessionImpl);
} catch (Throwable t) {
consumer.onSessionCreateFailure(t, 1);
}
}

/**
* Creates a multiplexed session and returns it. A multiplexed session is not affiliated with any
* GRPC channel. In case of an error during the gRPC calls, an exception will be thrown.
*/
SessionImpl createMultiplexedSession() {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
Expand All @@ -253,10 +266,12 @@ void createMultiplexedSession(SessionConsumer consumer) {
spanner,
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
consumer.onSessionReady(sessionImpl);
span.addAnnotation(
String.format("Request for %d multiplexed session returned %d session", 1, 1));
return sessionImpl;
} catch (Throwable t) {
span.setStatus(t);
consumer.onSessionCreateFailure(t, 1);
throw t;
} finally {
span.end();
}
Expand Down Expand Up @@ -289,31 +304,7 @@ private CreateMultiplexedSessionsRunnable(SessionConsumer consumer) {

@Override
public void run() {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
null,
true);
SessionImpl sessionImpl =
new SessionImpl(
spanner,
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
span.addAnnotation(
String.format("Request for %d multiplexed session returned %d session", 1, 1));
consumer.onSessionReady(sessionImpl);
} catch (Throwable t) {
span.setStatus(t);
consumer.onSessionCreateFailure(t, 1);
} finally {
span.end();
}
createMultiplexedSession(consumer);
}
}

Expand Down
Loading

0 comments on commit 4501a3e

Please sign in to comment.