From 6b6902d2b9a54ad4a194fce701d4aa066301a868 Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Mon, 10 Apr 2023 15:23:03 -0400 Subject: [PATCH] HBASE-27768 Race conditions in BlockingRpcConnection (#5154) Signed-off-by: Duo Zhang Signed-off-by: Xiaolin Ha --- .../hbase/ipc/BlockingRpcConnection.java | 82 +++++++++++++++++-- 1 file changed, 74 insertions(+), 8 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 60e2502524e6..d789417aef7a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import javax.security.sasl.SaslException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; @@ -98,6 +99,13 @@ class BlockingRpcConnection extends RpcConnection implements Runnable { justification = "We are always under lock actually") private Thread thread; + // Used for ensuring two reader threads don't run over each other. Should only be used + // in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection + private final Object readerThreadLock = new Object(); + + // Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps. + private final AtomicInteger attempts = new AtomicInteger(); + // connected socket. protected for writing UT. protected Socket socket = null; private DataInputStream in; @@ -325,6 +333,17 @@ private synchronized boolean waitForWork() { if (thread == null) { return false; } + + // If closeConn is called while we are in the readResponse method, it's possible that a new + // call to setupIOStreams comes in and creates a new value for "thread" before readResponse + // finishes. Once readResponse finishes, it will come in here and thread will be non-null + // above, but pointing at a new thread. In that case, we should end to avoid a situation + // where two threads are forever competing for the same socket. + if (!isCurrentThreadExpected()) { + LOG.debug("Thread replaced by new connection thread. Ending waitForWork loop."); + return false; + } + if (!calls.isEmpty()) { return true; } @@ -336,6 +355,25 @@ private synchronized boolean waitForWork() { try { wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000)); } catch (InterruptedException e) { + // Restore interrupt status + Thread.currentThread().interrupt(); + + String msg = "Interrupted while waiting for work"; + + // If we were interrupted by closeConn, it would have set thread to null. + // We are synchronized here and if we somehow got interrupted without setting thread to + // null, we want to make sure the connection is closed since the read thread would be dead. + // Rather than do a null check here, we check if the current thread is the expected thread. + // This guards against the case where a call to setupIOStreams got the synchronized lock + // first after closeConn, thus changing the thread to a new thread. + if (isCurrentThreadExpected()) { + LOG.debug(msg + ", closing connection"); + closeConn(new InterruptedIOException(msg)); + } else { + LOG.debug(msg); + } + + return false; } } } @@ -343,13 +381,24 @@ private synchronized boolean waitForWork() { @Override public void run() { if (LOG.isTraceEnabled()) { - LOG.trace(threadName + ": starting"); + LOG.trace("starting"); } - while (waitForWork()) { - readResponse(); + + // We have a synchronization here because it's possible in error scenarios for a new + // thread to be started while readResponse is still reading on the socket. We don't want + // two threads to be reading from the same socket/inputstream. + // The below calls can synchronize on "BlockingRpcConnection.this". + // We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks + synchronized (readerThreadLock) { + if (LOG.isTraceEnabled()) { + LOG.trace("started"); + } + while (waitForWork()) { + readResponse(); + } } if (LOG.isTraceEnabled()) { - LOG.trace(threadName + ": stopped"); + LOG.trace("stopped"); } } @@ -519,7 +568,7 @@ public Boolean run() throws IOException { } // start the receiver thread after the socket connection has been set up - thread = new Thread(this, threadName); + thread = new Thread(this, threadName + " (attempt: " + attempts.incrementAndGet() + ")"); thread.setDaemon(true); thread.start(); } @@ -633,7 +682,7 @@ private void writeRequest(Call call) throws IOException { call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock)); } catch (Throwable t) { if (LOG.isTraceEnabled()) { - LOG.trace("Error while writing {}", call.toShortString()); + LOG.trace("Error while writing {}", call.toShortString(), t); } IOException e = IPCUtil.toIOE(t); closeConn(e); @@ -720,16 +769,33 @@ private void readResponse() { // since we expect certain responses to not make it by the specified // {@link ConnectionId#rpcTimeout}. if (LOG.isTraceEnabled()) { - LOG.trace("ignored", e); + LOG.trace("ignored ex for call {}", call, e); } } else { synchronized (this) { - closeConn(e); + // The exception we received may have been caused by another thread closing + // this connection. It's possible that before getting to this point, a new connection was + // created. In that case, it doesn't help and can actually hurt to close again here. + if (isCurrentThreadExpected()) { + LOG.debug("Closing connection after error in call {}", call, e); + closeConn(e); + } } } } } + /** + * For use in the reader thread, tests if the current reader thread is the one expected to be + * running. When closeConn is called, the reader thread is expected to end. setupIOStreams then + * creates a new thread and updates the thread pointer. At that point, the new thread should be + * the only one running. We use this method to guard against cases where the old thread may be + * erroneously running or closing the connection in error states. + */ + private boolean isCurrentThreadExpected() { + return thread == Thread.currentThread(); + } + @Override protected synchronized void callTimeout(Call call) { // call sender