Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add a system property to set session pool ordering #2656

Merged
merged 2 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,7 @@ private void removeLongRunningSessions(

enum Position {
FIRST,
LAST,
RANDOM
}

Expand Down Expand Up @@ -2477,16 +2478,18 @@ private void releaseSession(PooledSession session, boolean isNewSession) {
// Do not randomize if there are few other sessions checked out and this session has been
// used. This ensures that this session will be re-used for the next transaction, which is
// more efficient.
session.releaseToPosition = Position.FIRST;
session.releaseToPosition = options.getReleaseToPosition();
}
if (session.releaseToPosition == Position.RANDOM && !sessions.isEmpty()) {
// A session should only be added at a random position the first time it is added to
// the pool or if the pool was deemed unbalanced. All following releases into the pool
// should normally happen at the front of the pool (unless the pool is again deemed to be
// unbalanced).
session.releaseToPosition = Position.FIRST;
// should normally happen at the default release position (unless the pool is again deemed
// to be unbalanced and the insertion would happen at the front of the pool).
session.releaseToPosition = options.getReleaseToPosition();
int pos = random.nextInt(sessions.size() + 1);
sessions.add(pos, session);
} else if (session.releaseToPosition == Position.LAST) {
sessions.addLast(session);
} else {
sessions.addFirst(session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.google.cloud.spanner;

import com.google.cloud.spanner.SessionPool.Clock;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Locale;
import java.util.Objects;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -62,6 +64,7 @@ public class SessionPoolOptions {
private final boolean autoDetectDialect;
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;
private final Position releaseToPosition;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -86,6 +89,7 @@ private SessionPoolOptions(Builder builder) {
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.releaseToPosition = builder.releaseToPosition;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -114,6 +118,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(this.releaseToPosition, other.releaseToPosition)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -138,6 +143,7 @@ public int hashCode() {
this.autoDetectDialect,
this.waitForMinSessions,
this.acquireSessionTimeout,
this.releaseToPosition,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -254,6 +260,10 @@ Duration getAcquireSessionTimeout() {
return acquireSessionTimeout;
}

Position getReleaseToPosition() {
return releaseToPosition;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -440,9 +450,23 @@ public static class Builder {
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);
private Position releaseToPosition = getReleaseToPositionFromSystemProperty();

private Clock poolMaintainerClock;

private static Position getReleaseToPositionFromSystemProperty() {
// NOTE: This System property is a beta feature. Support for it can be removed in the future.
String key = "com.google.cloud.spanner.session_pool_release_to_position";
if (System.getProperties().containsKey(key)) {
try {
return Position.valueOf(System.getProperty(key).toUpperCase(Locale.ENGLISH));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to add some logging for the case when this environment variable is set? Just so that if we run into any un-intended cases where a few other customers discover this option and set this, we at-least know if this option got set?

Logging is the minimum we can do, apart from adding to the trace?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wouldn't be able to see the logging, except if we add a gRPC header, and I don't think it's worth the latter. I don't worry about anyone finding and enabling this option on their own (and if they do, they probably also turn it off if they feel that it does not work well for them).

} catch (Throwable ignore) {
// fallthrough and return the default.
}
}
return Position.FIRST;
}

public Builder() {}

private Builder(SessionPoolOptions options) {
Expand Down Expand Up @@ -735,6 +759,11 @@ public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) {
return this;
}

Builder setReleaseToPosition(Position releaseToPosition) {
this.releaseToPosition = Preconditions.checkNotNull(releaseToPosition);
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_DEFAULT_LABEL_VALUES;
import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS;
import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS_WITH_TYPE;
import static com.google.cloud.spanner.SpannerOptionsTest.runWithSystemProperty;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -60,6 +62,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.ResultStreamConsumer;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
Expand All @@ -82,12 +85,14 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -236,6 +241,134 @@ public void poolLifo() {
session4.close();
}

@Test
public void poolFifo() throws Exception {
setupMockSessionCreation();
runWithSystemProperty(
"com.google.cloud.spanner.session_pool_release_to_position",
"LAST",
() -> {
options =
options
.toBuilder()
.setMinSessions(2)
.setWaitForMinSessions(Duration.ofSeconds(10L))
.build();
pool = createPool();
pool.maybeWaitOnMinSessions();
Session session1 = pool.getSession().get();
Session session2 = pool.getSession().get();
assertNotEquals(session1, session2);

session2.close();
session1.close();

// Check the session out and back in once more to finalize their positions.
session1 = pool.getSession().get();
session2 = pool.getSession().get();
session2.close();
session1.close();

// Verify that we get the sessions in FIFO order, so in this order:
// 1. session2
// 2. session1
Session session3 = pool.getSession().get();
Session session4 = pool.getSession().get();
assertEquals(session2, session3);
assertEquals(session1, session4);
session3.close();
session4.close();

return null;
});
}

@Test
public void poolAllPositions() throws Exception {
int maxAttempts = 100;
setupMockSessionCreation();
for (Position position : Position.values()) {
runWithSystemProperty(
"com.google.cloud.spanner.session_pool_release_to_position",
position.name(),
() -> {
int attempt = 0;
while (attempt < maxAttempts) {
int numSessions = 5;
options =
options
.toBuilder()
.setMinSessions(numSessions)
.setMaxSessions(numSessions)
.setWaitForMinSessions(Duration.ofSeconds(10L))
.build();
pool = createPool();
pool.maybeWaitOnMinSessions();
// First check out and release the sessions twice to the pool, so we know that we have
// finalized the position of them.
for (int n = 0; n < 2; n++) {
checkoutAndReleaseAllSessions();
}

// Now verify that if we get all sessions twice, they will be in random order.
List<List<PooledSessionFuture>> allSessions = new ArrayList<>(2);
for (int n = 0; n < 2; n++) {
allSessions.add(checkoutAndReleaseAllSessions());
}
List<Session> firstTime =
allSessions.get(0).stream()
.map(PooledSessionFuture::get)
.collect(Collectors.toList());
List<Session> secondTime =
allSessions.get(1).stream()
.map(PooledSessionFuture::get)
.collect(Collectors.toList());
switch (position) {
case FIRST:
// LIFO:
// First check out all sessions, so we have 1, 2, 3, 4, ..., N
// Then release them all back into the pool in the same order (1, 2, 3, 4, ..., N)
// That will give us the list N, ..., 4, 3, 2, 1 because each session is added at
// the front of the pool.
assertEquals(firstTime, Lists.reverse(secondTime));
break;
case LAST:
// FIFO:
// First check out all sessions, so we have 1, 2, 3, 4, ..., N
// Then release them all back into the pool in the same order (1, 2, 3, 4, ..., N)
// That will give us the list 1, 2, 3, 4, ..., N because each session is added at
// the end of the pool.
assertEquals(firstTime, secondTime);
break;
case RANDOM:
// Random means that we should not get the same order twice (unless the randomizer
// got lucky, and then we retry).
if (attempt < (maxAttempts - 1)) {
if (Objects.equals(firstTime, secondTime)) {
attempt++;
continue;
}
}
assertNotEquals(firstTime, secondTime);
}
break;
}
return null;
});
}
}

private List<PooledSessionFuture> checkoutAndReleaseAllSessions() {
List<PooledSessionFuture> sessions = new ArrayList<>(pool.totalSessions());
for (int i = 0; i < pool.totalSessions(); i++) {
sessions.add(pool.getSession());
}
for (Session session : sessions) {
session.close();
}
return sessions;
}

@Test
public void poolClosure() throws Exception {
setupMockSessionCreation();
Expand Down