From e043de8a79491fe6d0a1118183fbbcc7998398f3 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Mon, 25 Mar 2024 20:16:31 +0530 Subject: [PATCH] chore: generalise session pool class for multiplexed session. (#2964) * chore: generalise session pool class for multiplexed session. * chore: add back previous code. * chore: address comments. --- .../cloud/spanner/DatabaseClientImpl.java | 4 +- .../com/google/cloud/spanner/SessionPool.java | 225 ++++++++++++------ .../SessionPoolAsyncTransactionManager.java | 19 +- 3 files changed, 172 insertions(+), 76 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index b63ad379305..e9c9818f451 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -257,7 +257,9 @@ private T runWithSessionRetry(Function callable) { try { return callable.apply(session); } catch (SessionNotFoundException e) { - session = pool.replaceSession(e, session); + session = + (PooledSessionFuture) + pool.getPooledSessionReplacementHandler().replaceSession(e, session); } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 8058802a8fc..92ecbc3d55e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -166,7 +166,8 @@ public ResultSet get() { * Wrapper around {@code ReadContext} that releases the session to the pool once the call is * finished, if it is a single use context. */ - private static class AutoClosingReadContext implements ReadContext { + private static class AutoClosingReadContext + implements ReadContext { /** * {@link AsyncResultSet} implementation that keeps track of the async operations that are still * running for this {@link ReadContext} and that should finish before the {@link ReadContext} @@ -201,9 +202,10 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { } } - private final Function readContextDelegateSupplier; + private final Function readContextDelegateSupplier; private T readContextDelegate; private final SessionPool sessionPool; + private final SessionReplacementHandler sessionReplacementHandler; private final boolean isSingleUse; private final AtomicInteger asyncOperationsCount = new AtomicInteger(); @@ -213,7 +215,7 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { private boolean sessionUsedForQuery = false; @GuardedBy("lock") - private PooledSessionFuture session; + private I session; @GuardedBy("lock") private boolean closed; @@ -222,12 +224,14 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { private boolean delegateClosed; private AutoClosingReadContext( - Function delegateSupplier, + Function delegateSupplier, SessionPool sessionPool, - PooledSessionFuture session, + SessionReplacementHandler sessionReplacementHandler, + I session, boolean isSingleUse) { this.readContextDelegateSupplier = delegateSupplier; this.sessionPool = sessionPool; + this.sessionReplacementHandler = sessionReplacementHandler; this.session = session; this.isSingleUse = isSingleUse; } @@ -293,7 +297,7 @@ private boolean internalNext() { } catch (SpannerException e) { synchronized (lock) { if (!closed && isSingleUse) { - session.get().lastException = e; + session.get().setLastException(e); AutoClosingReadContext.this.close(); } } @@ -319,7 +323,7 @@ private void replaceSessionIfPossible(SessionNotFoundException notFound) { if (isSingleUse || !sessionUsedForQuery) { // This class is only used by read-only transactions, so we know that we only need a // read-only session. - session = sessionPool.replaceSession(notFound, session); + session = sessionReplacementHandler.replaceSession(notFound, session); readContextDelegate = readContextDelegateSupplier.apply(session); } else { throw notFound; @@ -529,15 +533,16 @@ public void close() { } } - private static class AutoClosingReadTransaction - extends AutoClosingReadContext implements ReadOnlyTransaction { + private static class AutoClosingReadTransaction + extends AutoClosingReadContext implements ReadOnlyTransaction { AutoClosingReadTransaction( - Function txnSupplier, + Function txnSupplier, SessionPool sessionPool, - PooledSessionFuture session, + SessionReplacementHandler sessionReplacementHandler, + I session, boolean isSingleUse) { - super(txnSupplier, sessionPool, session, isSingleUse); + super(txnSupplier, sessionPool, sessionReplacementHandler, session, isSingleUse); } @Override @@ -546,6 +551,29 @@ public Timestamp getReadTimestamp() { } } + interface SessionReplacementHandler { + T replaceSession(SessionNotFoundException notFound, T sessionFuture); + } + + class PooledSessionReplacementHandler implements SessionReplacementHandler { + @Override + public PooledSessionFuture replaceSession( + SessionNotFoundException e, PooledSessionFuture session) { + if (!options.isFailIfSessionNotFound() && session.get().isAllowReplacing()) { + synchronized (lock) { + numSessionsInUse--; + numSessionsReleased++; + checkedOutSessions.remove(session); + } + session.leakedException = null; + invalidateSession(session.get()); + return getSession(); + } else { + throw e; + } + } + } + interface SessionNotFoundHandler { /** * Handles the given {@link SessionNotFoundException} by possibly converting it to a different @@ -798,20 +826,22 @@ public void close() { } } - private static class AutoClosingTransactionManager + private static class AutoClosingTransactionManager implements TransactionManager, SessionNotFoundHandler { private TransactionManager delegate; - private final SessionPool sessionPool; - private PooledSessionFuture session; + private T session; + private final SessionReplacementHandler sessionReplacementHandler; private final TransactionOption[] options; private boolean closed; private boolean restartedAfterSessionNotFound; AutoClosingTransactionManager( - SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { - this.sessionPool = sessionPool; + T session, + SessionReplacementHandler sessionReplacementHandler, + TransactionOption... options) { this.session = session; this.options = options; + this.sessionReplacementHandler = sessionReplacementHandler; } @Override @@ -830,9 +860,9 @@ private TransactionContext internalBegin() { @Override public SpannerException handleSessionNotFound(SessionNotFoundException notFoundException) { - session = sessionPool.replaceSession(notFoundException, session); - PooledSession pooledSession = session.get(); - delegate = pooledSession.delegate.transactionManager(options); + session = sessionReplacementHandler.replaceSession(notFoundException, session); + CachedSession cachedSession = session.get(); + delegate = cachedSession.getDelegate().transactionManager(options); restartedAfterSessionNotFound = true; return createAbortedExceptionWithMinimalRetryDelay(notFoundException); } @@ -880,9 +910,9 @@ public TransactionContext resetForRetry() { return new SessionPoolTransactionContext(this, delegate.resetForRetry()); } } catch (SessionNotFoundException e) { - session = sessionPool.replaceSession(e, session); - PooledSession pooledSession = session.get(); - delegate = pooledSession.delegate.transactionManager(options); + session = sessionReplacementHandler.replaceSession(e, session); + CachedSession cachedSession = session.get(); + delegate = cachedSession.getDelegate().transactionManager(options); restartedAfterSessionNotFound = true; } } @@ -927,17 +957,21 @@ public TransactionState getState() { * {@link TransactionRunner} that automatically handles {@link SessionNotFoundException}s by * replacing the underlying session and then restarts the transaction. */ - private static final class SessionPoolTransactionRunner implements TransactionRunner { - private final SessionPool sessionPool; - private PooledSessionFuture session; + private static final class SessionPoolTransactionRunner + implements TransactionRunner { + + private I session; + private final SessionReplacementHandler sessionReplacementHandler; private final TransactionOption[] options; private TransactionRunner runner; private SessionPoolTransactionRunner( - SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { - this.sessionPool = sessionPool; + I session, + SessionReplacementHandler sessionReplacementHandler, + TransactionOption... options) { this.session = session; this.options = options; + this.sessionReplacementHandler = sessionReplacementHandler; } private TransactionRunner getRunner() { @@ -957,15 +991,16 @@ public T run(TransactionCallable callable) { result = getRunner().run(callable); break; } catch (SessionNotFoundException e) { - session = sessionPool.replaceSession(e, session); - PooledSession ps = session.get(); - runner = ps.delegate.readWriteTransaction(); + session = sessionReplacementHandler.replaceSession(e, session); + CachedSession cachedSession = session.get(); + runner = cachedSession.getDelegate().readWriteTransaction(); } } session.get().markUsed(); return result; } catch (SpannerException e) { - throw session.get().lastException = e; + session.get().setLastException(e); + throw e; } finally { session.close(); } @@ -988,17 +1023,19 @@ public TransactionRunner allowNestedTransaction() { } } - private static class SessionPoolAsyncRunner implements AsyncRunner { - private final SessionPool sessionPool; - private volatile PooledSessionFuture session; + private static class SessionPoolAsyncRunner implements AsyncRunner { + private volatile I session; + private final SessionReplacementHandler sessionReplacementHandler; private final TransactionOption[] options; private SettableApiFuture commitResponse; private SessionPoolAsyncRunner( - SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { - this.sessionPool = sessionPool; + I session, + SessionReplacementHandler sessionReplacementHandler, + TransactionOption... options) { this.session = session; this.options = options; + this.sessionReplacementHandler = sessionReplacementHandler; } @Override @@ -1027,7 +1064,9 @@ public ApiFuture runAsync(final AsyncWork work, Executor executor) { try { // The replaceSession method will re-throw the SessionNotFoundException if the // session cannot be replaced with a new one. - session = sessionPool.replaceSession((SessionNotFoundException) se, session); + session = + sessionReplacementHandler.replaceSession( + (SessionNotFoundException) se, session); se = null; } catch (SessionNotFoundException e) { exception = e; @@ -1098,8 +1137,24 @@ private PooledSessionFuture createPooledSessionFuture( return new PooledSessionFuture(future, span); } + interface SessionFuture extends Session { + + /** + * We need to do this because every implementation of {@link SessionFuture} today extends {@link + * SimpleForwardingListenableFuture}. The get() method in parent {@link + * java.util.concurrent.Future} classes specifies checked exceptions in method signature. + * + *

This method is a workaround we don't have to handle checked exceptions specified by other + * interfaces. + */ + CachedSession get(); + + default void addListener(Runnable listener, Executor exec) {} + } + class PooledSessionFuture extends SimpleForwardingListenableFuture - implements Session { + implements SessionFuture { + private volatile LeakedSessionException leakedException; private volatile AtomicBoolean inUse = new AtomicBoolean(); private volatile CountDownLatch initialized = new CountDownLatch(1); @@ -1172,6 +1227,7 @@ public ReadContext singleUse() { return ps.delegate.singleUse(); }, SessionPool.this, + pooledSessionReplacementHandler, this, true); } catch (Exception e) { @@ -1189,6 +1245,7 @@ public ReadContext singleUse(final TimestampBound bound) { return ps.delegate.singleUse(bound); }, SessionPool.this, + pooledSessionReplacementHandler, this, true); } catch (Exception e) { @@ -1241,8 +1298,12 @@ private ReadOnlyTransaction internalReadOnlyTransaction( Function transactionSupplier, boolean isSingleUse) { try { - return new AutoClosingReadTransaction( - transactionSupplier, SessionPool.this, this, isSingleUse); + return new AutoClosingReadTransaction<>( + transactionSupplier, + SessionPool.this, + pooledSessionReplacementHandler, + this, + isSingleUse); } catch (Exception e) { close(); throw e; @@ -1251,22 +1312,23 @@ private ReadOnlyTransaction internalReadOnlyTransaction( @Override public TransactionRunner readWriteTransaction(TransactionOption... options) { - return new SessionPoolTransactionRunner(SessionPool.this, this, options); + return new SessionPoolTransactionRunner<>(this, pooledSessionReplacementHandler, options); } @Override public TransactionManager transactionManager(TransactionOption... options) { - return new AutoClosingTransactionManager(SessionPool.this, this, options); + return new AutoClosingTransactionManager<>(this, pooledSessionReplacementHandler, options); } @Override public AsyncRunner runAsync(TransactionOption... options) { - return new SessionPoolAsyncRunner(SessionPool.this, this, options); + return new SessionPoolAsyncRunner(this, pooledSessionReplacementHandler, options); } @Override public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { - return new SessionPoolAsyncTransactionManager(SessionPool.this, this, options); + return new SessionPoolAsyncTransactionManager<>( + pooledSessionReplacementHandler, this, options); } @Override @@ -1358,7 +1420,25 @@ PooledSession get(final boolean eligibleForLongRunning) { } } - class PooledSession implements Session { + interface CachedSession extends Session { + + SessionImpl getDelegate(); + + void markBusy(ISpan span); + + void markUsed(); + + SpannerException setLastException(SpannerException exception); + + boolean isAllowReplacing(); + + AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options); + + void setAllowReplacing(boolean b); + } + + class PooledSession implements CachedSession { + @VisibleForTesting SessionImpl delegate; private volatile SpannerException lastException; private volatile boolean allowReplacing = true; @@ -1416,7 +1496,8 @@ public String toString() { } @VisibleForTesting - void setAllowReplacing(boolean allowReplacing) { + @Override + public void setAllowReplacing(boolean allowReplacing) { this.allowReplacing = allowReplacing; } @@ -1612,7 +1693,13 @@ private Dialect determineDialect() { } } - private void markBusy(ISpan span) { + @Override + public SessionImpl getDelegate() { + return this.delegate; + } + + @Override + public void markBusy(ISpan span) { this.delegate.setCurrentSpan(span); this.state = SessionState.BUSY; } @@ -1621,10 +1708,22 @@ private void markClosing() { this.state = SessionState.CLOSING; } - void markUsed() { + @Override + public void markUsed() { delegate.markUsed(clock.instant()); } + @Override + public SpannerException setLastException(SpannerException exception) { + this.lastException = exception; + return exception; + } + + @Override + public boolean isAllowReplacing() { + return this.allowReplacing; + } + @Override public TransactionManager transactionManager(TransactionOption... options) { return delegate.transactionManager(options); @@ -1728,6 +1827,7 @@ private PooledSession pollUninterruptiblyWithTimeout( * */ final class PoolMaintainer { + // Length of the window in millis over which we keep track of maximum number of concurrent // sessions in use. private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10)); @@ -1932,9 +2032,9 @@ private void removeLongRunningSessions( // the below get() call on future object is non-blocking since checkedOutSessions // collection is populated only when the get() method in {@code PooledSessionFuture} is // called. - final PooledSession session = sessionFuture.get(); + final PooledSession session = (PooledSession) sessionFuture.get(); final Duration durationFromLastUse = - Duration.between(session.delegate.getLastUseTime(), currentTime); + Duration.between(session.getDelegate().getLastUseTime(), currentTime); if (!session.eligibleForLongRunning && durationFromLastUse.compareTo( inactiveTransactionRemovalOptions.getIdleTimeThreshold()) @@ -2090,6 +2190,8 @@ enum Position { @VisibleForTesting Function longRunningSessionRemovedListener; private final CountDownLatch waitOnMinSessionsLatch; + private final SessionReplacementHandler pooledSessionReplacementHandler = + new PooledSessionReplacementHandler(); /** * Create a session pool with the given options and for the given database. It will also start @@ -2237,7 +2339,7 @@ Dialect getDialect() { } if (mustDetectDialect) { try (PooledSessionFuture session = getSession()) { - dialect.set(session.get().determineDialect()); + dialect.set(((PooledSession) session.get()).determineDialect()); } } try { @@ -2251,6 +2353,10 @@ Dialect getDialect() { } } + SessionReplacementHandler getPooledSessionReplacementHandler() { + return pooledSessionReplacementHandler; + } + @Nullable public String getDatabaseRole() { return databaseRole; @@ -2451,21 +2557,6 @@ private PooledSessionFuture checkoutSession( return res; } - PooledSessionFuture replaceSession(SessionNotFoundException e, PooledSessionFuture session) { - if (!options.isFailIfSessionNotFound() && session.get().allowReplacing) { - synchronized (lock) { - numSessionsInUse--; - numSessionsReleased++; - checkedOutSessions.remove(session); - } - session.leakedException = null; - invalidateSession(session.get()); - return getSession(); - } else { - throw e; - } - } - private void incrementNumSessionsInUse() { synchronized (lock) { if (maxSessionsInUse < ++numSessionsInUse) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index b6442fd2182..90f5317e88d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -22,15 +22,16 @@ import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.SessionPool.PooledSessionFuture; +import com.google.cloud.spanner.SessionPool.SessionFuture; import com.google.cloud.spanner.SessionPool.SessionNotFoundHandler; +import com.google.cloud.spanner.SessionPool.SessionReplacementHandler; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import javax.annotation.concurrent.GuardedBy; -class SessionPoolAsyncTransactionManager +class SessionPoolAsyncTransactionManager implements CommittableAsyncTransactionManager, SessionNotFoundHandler { private final Object lock = new Object(); @@ -40,20 +41,22 @@ class SessionPoolAsyncTransactionManager @GuardedBy("lock") private AbortedException abortedException; - private final SessionPool pool; + private final SessionReplacementHandler sessionReplacementHandler; private final TransactionOption[] options; - private volatile PooledSessionFuture session; + private volatile I session; private volatile SettableApiFuture delegate; private boolean restartedAfterSessionNotFound; SessionPoolAsyncTransactionManager( - SessionPool pool, PooledSessionFuture session, TransactionOption... options) { - this.pool = Preconditions.checkNotNull(pool); + SessionReplacementHandler sessionReplacementHandler, + I session, + TransactionOption... options) { this.options = options; + this.sessionReplacementHandler = sessionReplacementHandler; createTransaction(session); } - private void createTransaction(PooledSessionFuture session) { + private void createTransaction(I session) { this.session = session; this.delegate = SettableApiFuture.create(); this.session.addListener( @@ -75,7 +78,7 @@ private void createTransaction(PooledSessionFuture session) { public SpannerException handleSessionNotFound(SessionNotFoundException notFound) { // Restart the entire transaction with a new session and throw an AbortedException to force the // client application to retry. - createTransaction(pool.replaceSession(notFound, session)); + createTransaction(sessionReplacementHandler.replaceSession(notFound, session)); restartedAfterSessionNotFound = true; return SpannerExceptionFactory.newSpannerException( ErrorCode.ABORTED, notFound.getMessage(), notFound);