Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26766 Introduce custom attributes to threads #4348

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttribute;
import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttributeUtil;
import org.apache.htrace.Trace;

/**
Expand Down Expand Up @@ -234,6 +236,8 @@ public String toString() {
new ConcurrentHashMap<ServerName, AtomicInteger>();
// Start configuration settings.
private final int startLogErrorsCnt;
private Configuration conf;
private List<CustomThreadAttribute> customThreadAttributes;

/**
* The number of tasks simultaneously executed on the cluster.
Expand Down Expand Up @@ -318,6 +322,8 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po
this.connection = hc;
this.pool = pool;
this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
this.conf = conf;
this.customThreadAttributes = CustomThreadAttributeUtil.getAllAttributes(conf);

this.id = COUNTER.incrementAndGet();

Expand Down Expand Up @@ -607,7 +613,7 @@ <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
/**
* Helper that is used when grouping the actions per region server.
*
* @param loc - the destination. Must not be null.
* @param server - the destination. Must not be null.
* @param action - the action to add to the multiaction
* @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group.
Expand Down Expand Up @@ -711,40 +717,49 @@ public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTi

@Override
public void run() {
boolean done = false;
if (primaryCallTimeoutMicroseconds > 0) {
try {
done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
} catch (InterruptedException ex) {
LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
try {
CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf);
boolean done = false;
if (primaryCallTimeoutMicroseconds > 0) {
try {
done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
} catch (InterruptedException ex) {
LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
return;
}
}
if (done) {
// Done within primary timeout
return;
}
}
if (done) return; // Done within primary timeout
Map<ServerName, MultiAction<Row>> actionsByServer =
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) {
addReplicaActions(i, actionsByServer, unknownLocActions);
}
} else {
for (int replicaGetIndice : replicaGetIndices) {
addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
}
}
if (!actionsByServer.isEmpty()) {
sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
}
if (!unknownLocActions.isEmpty()) {
actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
for (Action<Row> action : unknownLocActions) {
addReplicaActionsAgain(action, actionsByServer);
List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) {
addReplicaActions(i, actionsByServer, unknownLocActions);
}
} else {
for (int replicaGetIndice : replicaGetIndices) {
addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
}
}
// Some actions may have completely failed, they are handled inside addAgain.
if (!actionsByServer.isEmpty()) {
sendMultiAction(actionsByServer, 1, null, true);
sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
}
if (!unknownLocActions.isEmpty()) {
actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
for (Action<Row> action : unknownLocActions) {
addReplicaActionsAgain(action, actionsByServer);
}
// Some actions may have completely failed, they are handled inside addAgain.
if (!actionsByServer.isEmpty()) {
sendMultiAction(actionsByServer, 1, null, true);
}
}
}
finally {
CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf);
}
}

Expand Down Expand Up @@ -819,6 +834,7 @@ public void run() {
MultiResponse res = null;
PayloadCarryingServerCallable callable = currentCallable;
try {
CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf);
// setup the callable based on the actions, if we don't have one already from the request
if (callable == null) {
callable = createCallable(server, tableName, multiAction);
Expand Down Expand Up @@ -858,6 +874,7 @@ public void run() {
if (callsInProgress != null && callable != null && res != null) {
callsInProgress.remove(callable);
}
CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttribute;
import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttributeUtil;

/**
* HTableMultiplexer provides a thread-safe non blocking PUT API across all the tables.
Expand Down Expand Up @@ -441,6 +443,8 @@ static class FlushWorker implements Runnable {
private final int maxRetryInQueue;
private final AtomicInteger retryInQueue = new AtomicInteger(0);
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
private Configuration conf;
private List<CustomThreadAttribute> customThreadAttributes;

public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
Expand All @@ -456,6 +460,8 @@ public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation a
this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
this.conf = conf;
this.customThreadAttributes = CustomThreadAttributeUtil.getAllAttributes(conf);
}

protected LinkedBlockingQueue<PutStatus> getQueue() {
Expand Down Expand Up @@ -512,12 +518,14 @@ boolean resubmitFailedPut(PutStatus ps, HRegionLocation oldLoc) throws IOExcepti
public void run() {
boolean succ = false;
try {
CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf);
succ = FlushWorker.this.getMultiplexer().put(tableName, failedPut, retryCount);
} finally {
FlushWorker.this.getRetryInQueue().decrementAndGet();
if (!succ) {
FlushWorker.this.getTotalFailedPutCount().incrementAndGet();
}
CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf);
}
}
}, delayMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -559,6 +567,7 @@ ScheduledExecutorService getExecutor() {
public void run() {
int failedCount = 0;
try {
CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf);
long start = EnvironmentEdgeManager.currentTime();

// drain all the queued puts into the tmp list
Expand Down Expand Up @@ -653,6 +662,7 @@ public void run() {
} finally {
// Update the totalFailedCount
this.totalFailedPutCount.addAndGet(failedCount);
CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -30,6 +31,8 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttribute;
import org.apache.hadoop.hbase.util.customthreadattribute.CustomThreadAttributeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,6 +61,8 @@ public class MasterAddressRefresher implements Closeable {
private final long periodicRefreshMs;
private final long timeBetweenRefreshesMs;
private final Object refreshMasters = new Object();
private Configuration conf;
private List<CustomThreadAttribute> customThreadAttributes;

@Override
public void close() {
Expand All @@ -72,31 +77,36 @@ private class RefreshThread implements Runnable {
@Override
public void run() {
long lastRpcTs = 0;
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
// have duplicate refreshes because once the thread is past the wait(), notify()s are
// ignored until the thread is back to the waiting state.
synchronized (refreshMasters) {
refreshMasters.wait(periodicRefreshMs);
try {
CustomThreadAttributeUtil.setAttributes(customThreadAttributes, conf);
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
// have duplicate refreshes because once the thread is past the wait(), notify()s are
// ignored until the thread is back to the waiting state.
synchronized (refreshMasters) {
refreshMasters.wait(periodicRefreshMs);
}
long currentTs = EnvironmentEdgeManager.currentTime();
if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
continue;
}
lastRpcTs = currentTs;
LOG.debug("Attempting to refresh master address end points.");
Set<ServerName> newMasters = new HashSet<>(registry.getMasters());
registry.populateMasterStubs(newMasters);
LOG.debug("Finished refreshing master end points. {}", newMasters);
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
break;
} catch (IOException e) {
LOG.debug("Error populating latest list of masters.", e);
}
long currentTs = EnvironmentEdgeManager.currentTime();
if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
continue;
}
lastRpcTs = currentTs;
LOG.debug("Attempting to refresh master address end points.");
Set<ServerName> newMasters = new HashSet<>(registry.getMasters());
registry.populateMasterStubs(newMasters);
LOG.debug("Finished refreshing master end points. {}", newMasters);
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
break;
} catch (IOException e) {
LOG.debug("Error populating latest list of masters.", e);
}
LOG.info("Master end point refresher loop exited.");
} finally{
CustomThreadAttributeUtil.clearAttributes(customThreadAttributes, conf);
}
LOG.info("Master end point refresher loop exited.");
}
}

Expand All @@ -111,6 +121,8 @@ public void run() {
Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
this.registry = registry;
pool.submit(new RefreshThread());
this.conf = conf;
this.customThreadAttributes = CustomThreadAttributeUtil.getAllAttributes(conf);
}

/**
Expand Down
Loading