Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22634 : Improve performance of BufferedMutator #343

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

Expand Down Expand Up @@ -255,12 +257,13 @@ private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
RequestController.Checker checker = requestController.newChecker();
boolean firstIter = true;
do {
// Wait until there is at least one slot for a new task.
requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1));
int posInList = -1;
if (!firstIter) {
checker.reset();
}

Set<ServerName> retainedServers = new TreeSet<>();

Iterator<? extends Row> it = rows.iterator();
while (it.hasNext()) {
Row r = it.next();
Expand Down Expand Up @@ -309,10 +312,16 @@ private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
// TODO: replica-get is not supported on this path
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
retainedServers.add(loc.getServerName());
it.remove();
}
}
firstIter = false;

// Wait until there is at least one slot per server
requestController.waitForFreeSlot(retainedServers.size(),id, periodToLog,
getLogger(tableName, -1));

} while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));

if (retainedActions.isEmpty()) return NO_REQS_RESULT;
Expand All @@ -321,6 +330,10 @@ private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task,
locationErrors, locationErrorRows, actionsByServer);
}

public void waitAllSlot() throws InterruptedIOException {
requestController.waitForAllFreeSlot(id);
}

<CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task,
List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors,
List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ final class SingleServerRequestRunnable implements Runnable {
private final int numAttempt;
private final ServerName server;
private final Set<CancellableRegionServerCallable> callsInProgress;
private long firstStartNanoTime;
private long startNanoTime;
private long elapseNanoTime;
private long numReject = -1;
@VisibleForTesting
SingleServerRequestRunnable(
MultiAction multiAction, int numAttempt, ServerName server,
Expand Down Expand Up @@ -248,6 +252,30 @@ public void run() {
}
}
}

public void onStart() {
++numReject;
startNanoTime = System.nanoTime();
if (numReject == 0) {
firstStartNanoTime = startNanoTime;
}
}

public void onFinish() {
elapseNanoTime = System.nanoTime() - startNanoTime;
}

public long getElapseNanoTime() {
return elapseNanoTime;
}

public long getNumReject() {
return numReject;
}

public long getRejectedElapseNanoTime() {
return startNanoTime - firstStartNanoTime;
}
}

private final Batch.Callback<CResult> callback;
Expand Down Expand Up @@ -518,17 +546,23 @@ private RegionLocations findAllLocationsOrFail(Action action, boolean useCache)
* @param numAttempt the attempt number.
* @param actionsForReplicaThread original actions for replica thread; null on non-first call.
*/
void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
// Must be synchronized because of the background thread writeBufferPeriodicFlushTimer
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SWL_SLEEP_WITH_LOCK_HELD",
justification="Slots are freed while in the executor thread, so the thread is not yet"
+ " available for the pool." +
" In that case, we have a RejectedExecutionException. Sleep let some time to threads"
+ " to be available again")
synchronized void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt,
List<Action> actionsForReplicaThread, boolean reuseThread) {
// Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads.
int actionsRemaining = actionsByServer.size();
// This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
ServerName server = e.getKey();
MultiAction multiAction = e.getValue();
Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
numAttempt);
Collection<? extends Runnable> runnables =
getNewMultiActionRunnable(server, multiAction, numAttempt);
// make sure we correctly count the number of runnables before we try to reuse the send
// thread, in case we had to split the request into different runnables because of backoff
if (runnables.size() > actionsRemaining) {
Expand All @@ -543,22 +577,39 @@ void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
&& numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {
runnable.run();
} else {
try {
pool.submit(runnable);
} catch (Throwable t) {
if (t instanceof RejectedExecutionException) {
// This should never happen. But as the pool is provided by the end user,
// let's secure this a little.
LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." +
" Server=" + server.getServerName(), t);
} else {
// see #HBASE-14359 for more details
LOG.warn("Caught unexpected exception/error: ", t);
boolean completed = false;
int nbTry = 0;
while (!completed) {
try {
++nbTry;
pool.submit(runnable);
completed = true;
} catch (Throwable t) {
if (t instanceof RejectedExecutionException) {
sbarnoud marked this conversation as resolved.
Show resolved Hide resolved
if ((nbTry % 1000) == 0) {
LOG.warn("#" + asyncProcess.id
+ ", the task was rejected by the pool. This is unexpected." + " Server is "
+ server.getServerName() + " (try " + nbTry + ")", t);
} else if (LOG.isDebugEnabled()) {
LOG.debug("#" + asyncProcess.id
+ ", the task was rejected by the pool. This is unexpected." + " Server is "
+ server.getServerName() + " (try " + nbTry + ")", t);
}
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} else {
// see #HBASE-14359 for more details
LOG.warn("Caught unexpected exception/error: ", t);
completed = true;
}
asyncProcess.decTaskCounters(multiAction.getRegions(), server);
// We're likely to fail again, but this will increment the attempt counter,
// so it will finish.
receiveGlobalFailure(multiAction, server, numAttempt, t);
}
asyncProcess.decTaskCounters(multiAction.getRegions(), server);
// We're likely to fail again, but this will increment the attempt counter,
// so it will finish.
receiveGlobalFailure(multiAction, server, numAttempt, t);
}
}
}
Expand Down Expand Up @@ -1167,6 +1218,10 @@ private String buildDetailedErrorMsg(String string, int index) {
return error.toString();
}

public boolean isFinished() {
return actionsInProgress.get() == 0;
}

@Override
public void waitUntilDone() throws InterruptedIOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
Expand Down Expand Up @@ -61,6 +63,10 @@
* @see Connection
* @since 1.0.0
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="writeBufferPeriodicFlushTimer needs to be synchronized only when not null, "
+ "and in mutual exclusion with close() and flush(true). "
+ "However to needs to synchronize with flush(false) coming from mutate")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BufferedMutatorImpl implements BufferedMutator {
Expand Down Expand Up @@ -93,6 +99,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final boolean cleanupPoolOnClose;
private volatile boolean closed = false;
private final AsyncProcess ap;
private List<AsyncRequestFuture> asfList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this field need to be a class member? It seems that we only use it in the doFlush method.

private int maxThreads;
private ReentrantLock lock = new ReentrantLock();

@VisibleForTesting
BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) {
Expand All @@ -109,6 +118,14 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.pool = params.getPool();
cleanupPoolOnClose = false;
}
maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) {
maxThreads = conf.getInt("hbase.client.max.total.tasks", Integer.MAX_VALUE);
if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) {
maxThreads = 1;
}
}
asfList = new ArrayList<AsyncRequestFuture>(maxThreads*4);
ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
this.writeBufferSize =
params.getWriteBufferSize() != UNSET ?
Expand Down Expand Up @@ -223,6 +240,7 @@ private void timerCallbackForWriteBufferPeriodicFlush() {
try {
executedWriteBufferPeriodicFlushes.incrementAndGet();
flush();
writeBufferPeriodicFlushTimer.notifyAll();
} catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage());
}
Expand All @@ -235,6 +253,15 @@ public synchronized void close() throws IOException {
}
// Stop any running Periodic Flush timer.
disableWriteBufferPeriodicFlush();
ap.waitAllSlot();
try {
// Let time to the periodic flush thread to exit (task are finished, but not the code after)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the problem if the code after is not finished? I'm a bit nervous that we just set a magic 5ms sleep time here...

if (writeBufferPeriodicFlushTimer != null) {
writeBufferPeriodicFlushTimer.wait(5);
}
} catch (InterruptedException e) {
throw new IOException(e);
}
try {
// As we can have an operation in progress even if the buffer is empty, we call
// doFlush at least one time.
Expand Down Expand Up @@ -275,9 +302,25 @@ public int getOperationTimeout() {
}

@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK",
justification="It seems that findBugs parser is wrong")
public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
checkClose();
doFlush(true);
// This will avoid concurrency between period flush, flush() and close()
// mutate are not synchronized, because it use doFlush(false)
boolean haveLocked = false;
if (writeBufferPeriodicFlushTimer != null) {
lock.lock();
// make sure to unlock even if writeBufferPeriodicFlushTimer is set to null before the end
haveLocked = true;
}
try {
checkClose();
doFlush(true);
} finally {
if (haveLocked) {
lock.unlock();
}
}
}

/**
Expand All @@ -286,6 +329,9 @@ public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsEx
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
* returning. Otherwise, flush until buffer size is smaller than threshold
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SWL_SLEEP_WITH_LOCK_HELD",
justification="Backpressure, when we have to many pending Future, "
+ "we wait under the lock to slow down the application")
private void doFlush(boolean flushAll) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
Expand All @@ -302,11 +348,53 @@ private void doFlush(boolean flushAll) throws InterruptedIOException,
}
asf = ap.submit(createTask(access));
}
// DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
// be released.
asf.waitUntilDone();
if (asf.hasError()) {
errors.add(asf.getErrors());

if (flushAll || writeBufferSize == 0) {
// if we have setWriteBufferPeriodicFlushTimeoutMs we may have concurrent update
List<AsyncRequestFuture> waitList;
synchronized(asfList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, BufferedMutator is thread safe, so multiple threads can enter this method at the same time and we want them to share the safe asfList, so it should be class member.

waitList = new ArrayList<>(asfList);
}
// DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
// be released.
for(AsyncRequestFuture toWait:waitList) {
toWait.waitUntilDone();
if (toWait.hasError()) {
errors.add(toWait.getErrors());
}
}
synchronized(asfList) {
asfList.removeAll(waitList);
}
asf.waitUntilDone();
if (asf.hasError()) {
errors.add(asf.getErrors());
}
} else {
// Do some cleanup in asfList to decrease memory
int nbRemoved = 0;
while (asfList.size() >= maxThreads*4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we get size out of synchornized?

synchronized(asfList) {
Iterator<AsyncRequestFuture> it = asfList.iterator();
while (it.hasNext()) {
AsyncRequestFutureImpl toCheck = (AsyncRequestFutureImpl) it.next();
if (toCheck.isFinished()) {
it.remove();
nbRemoved++;
}
}
if (nbRemoved == 0) {
try {
Thread.sleep(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means we will do a busy waiting here? This is not always the best choice, maybe we should provide a configurable way to wait here, the default one should be the typical wait/notify, and if do not care wasting the CPU cycles but only want the maximum throughput, you can use busy waiting.

} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
}
}
}
synchronized(asfList) {
asfList.add(asf);
}
}
}

Expand Down
Loading