Skip to content

Commit

Permalink
chore: add session pool options for multiplexed session. (googleapis#…
Browse files Browse the repository at this point in the history
…2960)

* fix: prevent illegal negative timeout values into thread sleep() method while retrying exceptions in unit tests.

* For details on issue see - googleapis#2206

* Fixing lint issues.

* chore: add session pool options for multiplexed session.

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Co-authored-by: Knut Olav Løite <[email protected]>

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Co-authored-by: Knut Olav Løite <[email protected]>

* fix: comments.

* chore: lint fix.

---------

Co-authored-by: Knut Olav Løite <[email protected]>
  • Loading branch information
2 people authored and tlhquynh committed Mar 25, 2024
1 parent a0a2e8d commit eaa05da
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public class SessionPoolOptions {
/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;

private final Duration waitForMultiplexedSession;
private final boolean useMultiplexedSession;
private final Duration multiplexedSessionMaintenanceDuration;

private SessionPoolOptions(Builder builder) {
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
// We allow that to prevent code that only sets a value for maxSessions to break if the
Expand All @@ -93,6 +97,9 @@ private SessionPoolOptions(Builder builder) {
this.randomizePositionQPSThreshold = builder.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
this.useMultiplexedSession = builder.useMultiplexedSession;
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
this.waitForMultiplexedSession = builder.waitForMultiplexedSession;
}

@Override
Expand Down Expand Up @@ -123,7 +130,11 @@ public boolean equals(Object o) {
&& Objects.equals(this.randomizePositionQPSThreshold, other.randomizePositionQPSThreshold)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock)
&& Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession)
&& Objects.equals(
this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration)
&& Objects.equals(this.waitForMultiplexedSession, other.waitForMultiplexedSession);
}

@Override
Expand All @@ -148,7 +159,10 @@ public int hashCode() {
this.releaseToPosition,
this.randomizePositionQPSThreshold,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
this.poolMaintainerClock,
this.useMultiplexedSession,
this.multiplexedSessionMaintenanceDuration,
this.waitForMultiplexedSession);
}

public Builder toBuilder() {
Expand Down Expand Up @@ -271,6 +285,18 @@ long getRandomizePositionQPSThreshold() {
return randomizePositionQPSThreshold;
}

boolean getUseMultiplexedSession() {
return useMultiplexedSession;
}

Duration getMultiplexedSessionMaintenanceDuration() {
return multiplexedSessionMaintenanceDuration;
}

Duration getWaitForMultiplexedSession() {
return waitForMultiplexedSession;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -467,6 +493,9 @@ public static class Builder {
*/
private long randomizePositionQPSThreshold = 0L;

private boolean useMultiplexedSession = false;
private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
private Duration waitForMultiplexedSession = Duration.ofSeconds(10);
private Clock poolMaintainerClock;

private static Position getReleaseToPositionFromSystemProperty() {
Expand Down Expand Up @@ -669,6 +698,47 @@ Builder setPoolMaintainerClock(Clock poolMaintainerClock) {
return this;
}

/**
* Sets whether the client should use multiplexed session or not. If set to true, the client
* optimises and runs multiple applicable requests concurrently on a single session. A single
* multiplexed session is sufficient to handle all concurrent traffic.
*
* <p>When set to false, the client uses the regular session cached in the session pool for
* running 1 concurrent transaction per session. We require to provision sufficient sessions by
* making use of {@link SessionPoolOptions#minSessions} and {@link
* SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in
* higher latencies.
*/
Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
this.useMultiplexedSession = useMultiplexedSession;
return this;
}

@VisibleForTesting
Builder setMultiplexedSessionMaintenanceDuration(
Duration multiplexedSessionMaintenanceDuration) {
this.multiplexedSessionMaintenanceDuration = multiplexedSessionMaintenanceDuration;
return this;
}

/**
* This option is only used when {@link SessionPoolOptions#useMultiplexedSession} is set to
* true. If greater than zero, calls to {@link Spanner#getDatabaseClient(DatabaseId)} will block
* for up to the given duration while waiting for the multiplexed session to be created. The
* default value for this is 10 seconds.
*
* <p>If this is set to null or zero, the client does not wait for the session to be created,
* which means that the first read requests could see more latency, as they will need to wait
* until the multiplexed session has been created.
*
* <p>Note that we would need to use the option {@link SessionPoolOptions#waitForMinSessions} if
* we want a similar blocking behavior for the other sessions within the session pool.
*/
Builder setWaitForMultiplexedSession(Duration waitForMultiplexedSession) {
this.waitForMultiplexedSession = waitForMultiplexedSession;
return this;
}

/**
* Sets whether the client should automatically execute a background query to detect the dialect
* that is used by the database or not. Set this option to true if you do not know what the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,62 @@ public void testRandomizePositionQPSThreshold() {
IllegalArgumentException.class,
() -> SessionPoolOptions.newBuilder().setRandomizePositionQPSThreshold(-1L));
}

@Test
public void testUseMultiplexedSession() {
assertEquals(false, SessionPoolOptions.newBuilder().build().getUseMultiplexedSession());
assertEquals(
true,
SessionPoolOptions.newBuilder()
.setUseMultiplexedSession(true)
.build()
.getUseMultiplexedSession());
assertEquals(
false,
SessionPoolOptions.newBuilder()
.setUseMultiplexedSession(true)
.setUseMultiplexedSession(false)
.build()
.getUseMultiplexedSession());
}

@Test
public void testMultiplexedSessionMaintenanceDuration() {
assertEquals(
Duration.ofDays(7),
SessionPoolOptions.newBuilder().build().getMultiplexedSessionMaintenanceDuration());
assertEquals(
Duration.ofDays(2),
SessionPoolOptions.newBuilder()
.setMultiplexedSessionMaintenanceDuration(Duration.ofDays(2))
.build()
.getMultiplexedSessionMaintenanceDuration());
assertEquals(
Duration.ofDays(10),
SessionPoolOptions.newBuilder()
.setMultiplexedSessionMaintenanceDuration(Duration.ofDays(2))
.setMultiplexedSessionMaintenanceDuration(Duration.ofDays(10))
.build()
.getMultiplexedSessionMaintenanceDuration());
}

@Test
public void testWaitForMultiplexedSession() {
assertEquals(
Duration.ofSeconds(10),
SessionPoolOptions.newBuilder().build().getWaitForMultiplexedSession());
assertEquals(
Duration.ofSeconds(20),
SessionPoolOptions.newBuilder()
.setWaitForMultiplexedSession(Duration.ofSeconds(20))
.build()
.getWaitForMultiplexedSession());
assertEquals(
Duration.ofSeconds(10),
SessionPoolOptions.newBuilder()
.setWaitForMultiplexedSession(Duration.ofSeconds(2))
.setWaitForMultiplexedSession(Duration.ofSeconds(10))
.build()
.getWaitForMultiplexedSession());
}
}

0 comments on commit eaa05da

Please sign in to comment.