Skip to content

Commit

Permalink
HBASE-23185 Fix high cpu usage because getTable()#put() gets config v…
Browse files Browse the repository at this point in the history
…alue every time (#731)

* HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time

* Fix checkstyle
  • Loading branch information
bitterfox authored and saintstack committed Oct 21, 2019
1 parent e183c90 commit ce65db3
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,35 +323,60 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po

this.id = COUNTER.incrementAndGet();

this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
ConnectionConfiguration connConf =
hc.getConfiguration() == conf
? hc.getConnectionConfiguration()
// Slow: parse conf in ConnectionConfiguration constructor
: new ConnectionConfiguration(conf);
if (connConf == null) {
// Slow: parse conf in ConnectionConfiguration constructor
connConf = new ConnectionConfiguration(conf);
}

this.pause = connConf.getPause();
this.pauseForCQTBE = connConf.getPauseForCQTBE();

this.numTries = connConf.getRetriesNumber();
this.rpcTimeout = rpcTimeout;
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);

this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
this.operationTimeout = connConf.getOperationTimeout();

// Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for put
// Can be null when constructing hc's AsyncProcess or it's not reusable
AsyncProcess globalAsyncProcess = hc.getConfiguration() == conf ? hc.getAsyncProcess() : null;

this.primaryCallTimeoutMicroseconds =
globalAsyncProcess == null
? conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000)
: globalAsyncProcess.primaryCallTimeoutMicroseconds;

this.maxTotalConcurrentTasks =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)
: globalAsyncProcess.maxTotalConcurrentTasks;
this.maxConcurrentTasksPerServer =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS)
: globalAsyncProcess.maxConcurrentTasksPerServer;
this.maxConcurrentTasksPerRegion =
globalAsyncProcess == null
? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS)
: globalAsyncProcess.maxConcurrentTasksPerRegion;
this.maxHeapSizePerRequest =
globalAsyncProcess == null
? conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE)
: globalAsyncProcess.maxHeapSizePerRequest;
this.maxHeapSizeSubmit =
globalAsyncProcess == null
? conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE)
: globalAsyncProcess.maxHeapSizeSubmit;
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
globalAsyncProcess == null
? conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT)
: globalAsyncProcess.startLogErrorsCnt;

if (this.maxTotalConcurrentTasks <= 0) {
throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
Expand Down Expand Up @@ -387,11 +412,16 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po

this.rpcCallerFactory = rpcCaller;
this.rpcFactory = rpcFactory;
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
this.logBatchErrorDetails =
globalAsyncProcess == null
? conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false)
: globalAsyncProcess.logBatchErrorDetails;

this.thresholdToLogUndoneTaskDetails =
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
globalAsyncProcess == null
? conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS)
: globalAsyncProcess.thresholdToLogUndoneTaskDetails;
}

public void setRpcTimeout(int rpcTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
Expand Down Expand Up @@ -112,32 +111,32 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.pool = params.getPool();
this.listener = params.getListener();

ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
ConnectionConfiguration connConf = conn.getConnectionConfiguration();
if (connConf == null) {
// Slow: parse conf in ConnectionConfiguration constructor
connConf = new ConnectionConfiguration(conf);
}
this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
params.getWriteBufferSize() : tableConf.getWriteBufferSize();
params.getWriteBufferSize() : connConf.getWriteBufferSize();

// Set via the setter because it does value validation and starts/stops the TimerTask
long newWriteBufferPeriodicFlushTimeoutMs =
params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET
? params.getWriteBufferPeriodicFlushTimeoutMs()
: tableConf.getWriteBufferPeriodicFlushTimeoutMs();
: connConf.getWriteBufferPeriodicFlushTimeoutMs();
long newWriteBufferPeriodicFlushTimerTickMs =
params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET
? params.getWriteBufferPeriodicFlushTimerTickMs()
: tableConf.getWriteBufferPeriodicFlushTimerTickMs();
: connConf.getWriteBufferPeriodicFlushTimerTickMs();
this.setWriteBufferPeriodicFlush(
newWriteBufferPeriodicFlushTimeoutMs,
newWriteBufferPeriodicFlushTimerTickMs);

this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();

this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.operationTimeout = conn.getConfiguration().getInt(
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
params.getMaxKeyValueSize() : connConf.getMaxKeyValueSize();

this.writeRpcTimeout = connConf.getWriteRpcTimeout();
this.operationTimeout = connConf.getOperationTimeout();
// puts need to track errors globally due to how the APIs currently work.
ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
package org.apache.hadoop.hbase.client;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
Expand All @@ -26,6 +28,7 @@
*/
@InterfaceAudience.Private
public class ConnectionConfiguration {
static final Log LOG = LogFactory.getLog(ConnectionConfiguration.class);

public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
Expand All @@ -50,6 +53,10 @@ public class ConnectionConfiguration {
private final int metaReplicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
private final int readRpcTimeout;
private final int writeRpcTimeout;
private final long pause;
private final long pauseForCQTBE;

/**
* Constructor
Expand Down Expand Up @@ -90,9 +97,28 @@ public class ConnectionConfiguration {
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);

this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);

this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);

this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));

this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
}

/**
Expand All @@ -115,6 +141,10 @@ protected ConnectionConfiguration() {
HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.pause = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseForCQTBE = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
}

public long getWriteBufferSize() {
Expand Down Expand Up @@ -164,4 +194,20 @@ public int getMaxKeyValueSize() {
public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}

public int getReadRpcTimeout() {
return readRpcTimeout;
}

public int getWriteRpcTimeout() {
return writeRpcTimeout;
}

public long getPause() {
return pause;
}

public long getPauseForCQTBE() {
return pauseForCQTBE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -669,17 +669,8 @@ static class HConnectionImplementation implements ClusterConnection, Closeable {
this.managed = managed;
this.connectionConfig = new ConnectionConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
if (configuredPauseForCQTBE < pause) {
LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+ configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+ ", will use " + pause + " instead.");
this.pauseForCQTBE = pause;
} else {
this.pauseForCQTBE = configuredPauseForCQTBE;
}
this.pause = connectionConfig.getPause();
this.pauseForCQTBE = connectionConfig.getPauseForCQTBE();
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
HConstants.DEFAULT_USE_META_REPLICAS);
this.metaReplicaCallTimeoutScanInMicroSecond =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,8 @@ private void finishSetup() throws IOException {
}
this.operationTimeout = tableName.isSystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {
Expand Down

0 comments on commit ce65db3

Please sign in to comment.