diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 4c571e486599..0c2247349a24 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -323,35 +323,60 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po this.id = COUNTER.incrementAndGet(); - this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } - this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + ConnectionConfiguration connConf = + hc.getConfiguration() == conf + ? hc.getConnectionConfiguration() + // Slow: parse conf in ConnectionConfiguration constructor + : new ConnectionConfiguration(conf); + if (connConf == null) { + // Slow: parse conf in ConnectionConfiguration constructor + connConf = new ConnectionConfiguration(conf); + } + + this.pause = connConf.getPause(); + this.pauseForCQTBE = connConf.getPauseForCQTBE(); + + this.numTries = connConf.getRetriesNumber(); this.rpcTimeout = rpcTimeout; - this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); - this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); - - this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); - this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS); - this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, - HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS); - this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, - DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE); - this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE); + this.operationTimeout = connConf.getOperationTimeout(); + + // Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for put + // Can be null when constructing hc's AsyncProcess or it's not reusable + AsyncProcess globalAsyncProcess = hc.getConfiguration() == conf ? hc.getAsyncProcess() : null; + + this.primaryCallTimeoutMicroseconds = + globalAsyncProcess == null + ? conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000) + : globalAsyncProcess.primaryCallTimeoutMicroseconds; + + this.maxTotalConcurrentTasks = + globalAsyncProcess == null + ? conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS) + : globalAsyncProcess.maxTotalConcurrentTasks; + this.maxConcurrentTasksPerServer = + globalAsyncProcess == null + ? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS) + : globalAsyncProcess.maxConcurrentTasksPerServer; + this.maxConcurrentTasksPerRegion = + globalAsyncProcess == null + ? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS, + HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS) + : globalAsyncProcess.maxConcurrentTasksPerRegion; + this.maxHeapSizePerRequest = + globalAsyncProcess == null + ? conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE, + DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE) + : globalAsyncProcess.maxHeapSizePerRequest; + this.maxHeapSizeSubmit = + globalAsyncProcess == null + ? conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE) + : globalAsyncProcess.maxHeapSizeSubmit; this.startLogErrorsCnt = - conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); + globalAsyncProcess == null + ? conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT) + : globalAsyncProcess.startLogErrorsCnt; if (this.maxTotalConcurrentTasks <= 0) { throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks); @@ -387,11 +412,16 @@ public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService po this.rpcCallerFactory = rpcCaller; this.rpcFactory = rpcFactory; - this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); + this.logBatchErrorDetails = + globalAsyncProcess == null + ? conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false) + : globalAsyncProcess.logBatchErrorDetails; this.thresholdToLogUndoneTaskDetails = - conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, - DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); + globalAsyncProcess == null + ? conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, + DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS) + : globalAsyncProcess.thresholdToLogUndoneTaskDetails; } public void setRpcTimeout(int rpcTimeout) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index e33bd7ce369f..dafc66fd03a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -34,7 +34,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -112,32 +111,32 @@ public class BufferedMutatorImpl implements BufferedMutator { this.pool = params.getPool(); this.listener = params.getListener(); - ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); + ConnectionConfiguration connConf = conn.getConnectionConfiguration(); + if (connConf == null) { + // Slow: parse conf in ConnectionConfiguration constructor + connConf = new ConnectionConfiguration(conf); + } this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ? - params.getWriteBufferSize() : tableConf.getWriteBufferSize(); + params.getWriteBufferSize() : connConf.getWriteBufferSize(); // Set via the setter because it does value validation and starts/stops the TimerTask long newWriteBufferPeriodicFlushTimeoutMs = params.getWriteBufferPeriodicFlushTimeoutMs() != UNSET ? params.getWriteBufferPeriodicFlushTimeoutMs() - : tableConf.getWriteBufferPeriodicFlushTimeoutMs(); + : connConf.getWriteBufferPeriodicFlushTimeoutMs(); long newWriteBufferPeriodicFlushTimerTickMs = params.getWriteBufferPeriodicFlushTimerTickMs() != UNSET ? params.getWriteBufferPeriodicFlushTimerTickMs() - : tableConf.getWriteBufferPeriodicFlushTimerTickMs(); + : connConf.getWriteBufferPeriodicFlushTimerTickMs(); this.setWriteBufferPeriodicFlush( newWriteBufferPeriodicFlushTimeoutMs, newWriteBufferPeriodicFlushTimerTickMs); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? - params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); - - this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.operationTimeout = conn.getConfiguration().getInt( - HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, - HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + params.getMaxKeyValueSize() : connConf.getMaxKeyValueSize(); + + this.writeRpcTimeout = connConf.getWriteRpcTimeout(); + this.operationTimeout = connConf.getOperationTimeout(); // puts need to track errors globally due to how the APIs currently work. ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 0e5164447e28..1189802edf02 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -12,6 +12,8 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -26,6 +28,7 @@ */ @InterfaceAudience.Private public class ConnectionConfiguration { + static final Log LOG = LogFactory.getLog(ConnectionConfiguration.class); public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; @@ -50,6 +53,10 @@ public class ConnectionConfiguration { private final int metaReplicaCallTimeoutMicroSecondScan; private final int retries; private final int maxKeyValueSize; + private final int readRpcTimeout; + private final int writeRpcTimeout; + private final long pause; + private final long pauseForCQTBE; /** * Constructor @@ -90,9 +97,28 @@ public class ConnectionConfiguration { HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT); this.retries = conf.getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + + this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + + this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); + if (configuredPauseForCQTBE < pause) { + LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " + + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE + + ", will use " + pause + " instead."); + this.pauseForCQTBE = pause; + } else { + this.pauseForCQTBE = configuredPauseForCQTBE; + } } /** @@ -115,6 +141,10 @@ protected ConnectionConfiguration() { HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; + this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.pause = HConstants.DEFAULT_HBASE_CLIENT_PAUSE; + this.pauseForCQTBE = HConstants.DEFAULT_HBASE_CLIENT_PAUSE; } public long getWriteBufferSize() { @@ -164,4 +194,20 @@ public int getMaxKeyValueSize() { public long getScannerMaxResultSize() { return scannerMaxResultSize; } + + public int getReadRpcTimeout() { + return readRpcTimeout; + } + + public int getWriteRpcTimeout() { + return writeRpcTimeout; + } + + public long getPause() { + return pause; + } + + public long getPauseForCQTBE() { + return pauseForCQTBE; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 7d9af647feb6..e8498c4c597b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -669,17 +669,8 @@ static class HConnectionImplementation implements ClusterConnection, Closeable { this.managed = managed; this.connectionConfig = new ConnectionConfiguration(conf); this.closed = false; - this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, - HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); - if (configuredPauseForCQTBE < pause) { - LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " - + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE - + ", will use " + pause + " instead."); - this.pauseForCQTBE = pause; - } else { - this.pauseForCQTBE = configuredPauseForCQTBE; - } + this.pause = connectionConfig.getPause(); + this.pauseForCQTBE = connectionConfig.getPauseForCQTBE(); this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, HConstants.DEFAULT_USE_META_REPLICAS); this.metaReplicaCallTimeoutScanInMicroSecond = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 91a8f922bfb1..cbb7b01bcee7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -366,12 +366,8 @@ private void finishSetup() throws IOException { } this.operationTimeout = tableName.isSystemTable() ? connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, - configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.readRpcTimeout = connConfiguration.getReadRpcTimeout(); + this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout(); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) {