Skip to content

Commit

Permalink
HBASE-27768 Race conditions in BlockingRpcConnection (apache#5154)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Xiaolin Ha <[email protected]>
  • Loading branch information
bbeaudreault committed Apr 10, 2023
1 parent c174379 commit 6b6902d
Showing 1 changed file with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.SaslException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
Expand Down Expand Up @@ -98,6 +99,13 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
justification = "We are always under lock actually")
private Thread thread;

// Used for ensuring two reader threads don't run over each other. Should only be used
// in reader thread run() method, to avoid deadlocks with synchronization on BlockingRpcConnection
private final Object readerThreadLock = new Object();

// Used to suffix the threadName in a way that we can differentiate them in logs/thread dumps.
private final AtomicInteger attempts = new AtomicInteger();

// connected socket. protected for writing UT.
protected Socket socket = null;
private DataInputStream in;
Expand Down Expand Up @@ -325,6 +333,17 @@ private synchronized boolean waitForWork() {
if (thread == null) {
return false;
}

// If closeConn is called while we are in the readResponse method, it's possible that a new
// call to setupIOStreams comes in and creates a new value for "thread" before readResponse
// finishes. Once readResponse finishes, it will come in here and thread will be non-null
// above, but pointing at a new thread. In that case, we should end to avoid a situation
// where two threads are forever competing for the same socket.
if (!isCurrentThreadExpected()) {
LOG.debug("Thread replaced by new connection thread. Ending waitForWork loop.");
return false;
}

if (!calls.isEmpty()) {
return true;
}
Expand All @@ -336,20 +355,50 @@ private synchronized boolean waitForWork() {
try {
wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
} catch (InterruptedException e) {
// Restore interrupt status
Thread.currentThread().interrupt();

String msg = "Interrupted while waiting for work";

// If we were interrupted by closeConn, it would have set thread to null.
// We are synchronized here and if we somehow got interrupted without setting thread to
// null, we want to make sure the connection is closed since the read thread would be dead.
// Rather than do a null check here, we check if the current thread is the expected thread.
// This guards against the case where a call to setupIOStreams got the synchronized lock
// first after closeConn, thus changing the thread to a new thread.
if (isCurrentThreadExpected()) {
LOG.debug(msg + ", closing connection");
closeConn(new InterruptedIOException(msg));
} else {
LOG.debug(msg);
}

return false;
}
}
}

@Override
public void run() {
if (LOG.isTraceEnabled()) {
LOG.trace(threadName + ": starting");
LOG.trace("starting");
}
while (waitForWork()) {
readResponse();

// We have a synchronization here because it's possible in error scenarios for a new
// thread to be started while readResponse is still reading on the socket. We don't want
// two threads to be reading from the same socket/inputstream.
// The below calls can synchronize on "BlockingRpcConnection.this".
// We should not synchronize on readerThreadLock anywhere else, to avoid deadlocks
synchronized (readerThreadLock) {
if (LOG.isTraceEnabled()) {
LOG.trace("started");
}
while (waitForWork()) {
readResponse();
}
}
if (LOG.isTraceEnabled()) {
LOG.trace(threadName + ": stopped");
LOG.trace("stopped");
}
}

Expand Down Expand Up @@ -519,7 +568,7 @@ public Boolean run() throws IOException {
}

// start the receiver thread after the socket connection has been set up
thread = new Thread(this, threadName);
thread = new Thread(this, threadName + " (attempt: " + attempts.incrementAndGet() + ")");
thread.setDaemon(true);
thread.start();
}
Expand Down Expand Up @@ -633,7 +682,7 @@ private void writeRequest(Call call) throws IOException {
call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
} catch (Throwable t) {
if (LOG.isTraceEnabled()) {
LOG.trace("Error while writing {}", call.toShortString());
LOG.trace("Error while writing {}", call.toShortString(), t);
}
IOException e = IPCUtil.toIOE(t);
closeConn(e);
Expand Down Expand Up @@ -720,16 +769,33 @@ private void readResponse() {
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
if (LOG.isTraceEnabled()) {
LOG.trace("ignored", e);
LOG.trace("ignored ex for call {}", call, e);
}
} else {
synchronized (this) {
closeConn(e);
// The exception we received may have been caused by another thread closing
// this connection. It's possible that before getting to this point, a new connection was
// created. In that case, it doesn't help and can actually hurt to close again here.
if (isCurrentThreadExpected()) {
LOG.debug("Closing connection after error in call {}", call, e);
closeConn(e);
}
}
}
}
}

/**
* For use in the reader thread, tests if the current reader thread is the one expected to be
* running. When closeConn is called, the reader thread is expected to end. setupIOStreams then
* creates a new thread and updates the thread pointer. At that point, the new thread should be
* the only one running. We use this method to guard against cases where the old thread may be
* erroneously running or closing the connection in error states.
*/
private boolean isCurrentThreadExpected() {
return thread == Thread.currentThread();
}

@Override
protected synchronized void callTimeout(Call call) {
// call sender
Expand Down

0 comments on commit 6b6902d

Please sign in to comment.