diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index 2d634b93c887..cf31d79d8e9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -19,11 +19,12 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.Timer; + import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; /** @@ -40,7 +41,7 @@ public interface Callable { private final Callable callable; private ServerName serverName; - public AsyncAdminRequestRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, @@ -69,10 +70,4 @@ protected void doCall() { future.complete(result); }); } - - @Override - CompletableFuture call() { - doCall(); - return future; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index e268b2e88b65..55590bd42008 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -56,7 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timer; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -80,7 +80,7 @@ class AsyncBatchRpcRetryingCaller { private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class); - private final HashedWheelTimer retryTimer; + private final Timer retryTimer; private final AsyncConnectionImpl conn; @@ -130,7 +130,7 @@ public void addAction(HRegionLocation loc, Action action) { } } - public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, List actions, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index ac2d3d768563..6d4aefde2a4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -19,17 +19,27 @@ import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; -import static org.apache.hadoop.hbase.client.ConnectionUtils.*; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; +import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics; +import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; +import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.Timer; + import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; @@ -59,6 +69,8 @@ class AsyncClientScanner { private final AsyncConnectionImpl conn; + private final Timer retryTimer; + private final long pauseNs; private final int maxAttempts; @@ -72,7 +84,7 @@ class AsyncClientScanner { private final ScanResultCache resultCache; public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, - AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs, + AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); @@ -84,6 +96,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN this.consumer = consumer; this.tableName = tableName; this.conn = conn; + this.retryTimer = retryTimer; this.pauseNs = pauseNs; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; @@ -120,20 +133,19 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In } } - private int openScannerTries; + private final AtomicInteger openScannerTries = new AtomicInteger(); private CompletableFuture callOpenScanner(HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub) { boolean isRegionServerRemote = isRemote(loc.getHostname()); incRPCCallsMetrics(scanMetrics, isRegionServerRemote); - if (openScannerTries > 1) { + if (openScannerTries.getAndIncrement() > 1) { incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); } - openScannerTries++; CompletableFuture future = new CompletableFuture<>(); try { - ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(), - scan, scan.getCaching(), false); + ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan, + scan.getCaching(), false); stub.scan(controller, request, resp -> { if (controller.failed()) { future.completeExceptionally(controller.getFailed()); @@ -148,40 +160,53 @@ private CompletableFuture callOpenScanner(HBaseRpcControlle } private void startScan(OpenScannerResponse resp) { - conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc) - .remote(resp.isRegionServerRemote) - .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) - .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .start(resp.controller, resp.resp).whenComplete((hasMore, error) -> { - if (error != null) { - consumer.onError(error); - return; - } - if (hasMore) { - openScanner(); - } else { - consumer.onComplete(); - } - }); + addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()) + .location(resp.loc).remote(resp.isRegionServerRemote) + .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub) + .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) + .start(resp.controller, resp.resp), (hasMore, error) -> { + if (error != null) { + consumer.onError(error); + return; + } + if (hasMore) { + openScanner(); + } else { + consumer.onComplete(); + } + }); + } + + private CompletableFuture openScanner(int replicaId) { + return conn.callerFactory. single().table(tableName) + .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) + .call(); + } + + private long getPrimaryTimeoutNs() { + return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs() + : conn.connConf.getPrimaryScanTimeoutNs(); } private void openScanner() { incRegionCountMetrics(scanMetrics); - openScannerTries = 1; - conn.callerFactory. single().table(tableName).row(scan.getStartRow()) - .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner) - .call().whenComplete((resp, error) -> { - if (error != null) { - consumer.onError(error); - return; - } - startScan(resp); - }); + openScannerTries.set(1); + addListener( + timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), + getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer), + (resp, error) -> { + if (error != null) { + consumer.onError(error); + return; + } + startScan(resp); + }); } public void start() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index fa051a5d412a..84a51504d4a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; @@ -41,6 +43,8 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; @@ -100,6 +104,10 @@ class AsyncConnectionConfiguration { // timeout, we will send request to secondaries. private final long primaryCallTimeoutNs; + private final long primaryScanTimeoutNs; + + private final long primaryMetaScanTimeoutNs; + @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( @@ -132,6 +140,11 @@ class AsyncConnectionConfiguration { WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT)); this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos( conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT)); + this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos( + conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT)); + this.primaryMetaScanTimeoutNs = + TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, + HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT)); } long getMetaOperationTimeoutNs() { @@ -193,4 +206,12 @@ long getWriteBufferPeriodicFlushTimeoutNs() { long getPrimaryCallTimeoutNs() { return primaryCallTimeoutNs; } + + long getPrimaryScanTimeoutNs() { + return primaryScanTimeoutNs; + } + + long getPrimaryMetaScanTimeoutNs() { + return primaryMetaScanTimeoutNs; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 1b99f84ea207..3cbd9503702d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -275,7 +275,7 @@ public AsyncTableBuilder getTableBuilder(TableName t @Override public AsyncTable build() { - return new RawAsyncTableImpl(AsyncConnectionImpl.this, this); + return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); } }; } @@ -287,7 +287,8 @@ public AsyncTableBuilder getTableBuilder(TableName tableName @Override public AsyncTable build() { - RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this); + RawAsyncTableImpl rawTable = + new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index 1c8a0e1370f2..a52e79979740 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; - import java.util.concurrent.CompletableFuture; - -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.Timer; + import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; /** @@ -39,7 +39,7 @@ public interface Callable { private final Callable callable; - public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, Callable callable, long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, @@ -66,10 +66,4 @@ protected void doCall() { }); }); } - - @Override - public CompletableFuture call() { - doCall(); - return future; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index e03049a52bd9..5383ff8b84ee 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import java.util.ArrayList; import java.util.List; @@ -31,20 +30,21 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; - import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import org.apache.hbase.thirdparty.io.netty.util.Timer; @InterfaceAudience.Private public abstract class AsyncRpcRetryingCaller { private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class); - private final HashedWheelTimer retryTimer; + private final Timer retryTimer; private final long startNs; @@ -68,9 +68,8 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; - public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - long pauseNs, int maxAttempts, long operationTimeoutNs, - long rpcTimeoutNs, int startLogErrorsCnt) { + public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.pauseNs = pauseNs; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index a660e7457e70..f019fc460a91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timer; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; @@ -45,9 +45,9 @@ class AsyncRpcRetryingCallerFactory { private final AsyncConnectionImpl conn; - private final HashedWheelTimer retryTimer; + private final Timer retryTimer; - public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { + public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) { this.conn = conn; this.retryTimer = retryTimer; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 9fdb284c36ed..584bfacf7643 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -34,7 +34,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; @@ -49,9 +48,11 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; +import org.apache.hbase.thirdparty.io.netty.util.Timer; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -72,7 +73,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private static final Logger LOG = LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class); - private final HashedWheelTimer retryTimer; + private final Timer retryTimer; private final Scan scan; @@ -297,7 +298,7 @@ synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) { } } - public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, + public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 20b7c311aae6..54b055a9ed4d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; - import java.io.IOException; import java.util.concurrent.CompletableFuture; - import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.Timer; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; /** @@ -42,7 +42,7 @@ public interface Callable { private final Callable callable; private ServerName serverName; - public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, @@ -71,10 +71,4 @@ protected void doCall() { future.complete(result); }); } - - @Override - CompletableFuture call() { - doCall(); - return future; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 1a52e5cc6adc..4b60b1802257 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; +import org.apache.hbase.thirdparty.io.netty.util.Timer; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -53,7 +53,7 @@ CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, private final Callable callable; - public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, + public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, Callable callable, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { @@ -114,10 +114,4 @@ protected void doCall() { call(loc); }); } - - @Override - public CompletableFuture call() { - doCall(); - return future; - } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 55c62e70c1ba..53859c2ac6a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -41,6 +41,9 @@ public class ConnectionConfiguration { public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND = "hbase.client.primaryCallTimeout.get"; public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms + public static final String PRIMARY_SCAN_TIMEOUT_MICROSECOND = + "hbase.client.replicaCallTimeout.scan"; + public static final int PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT = 1000000; // 1s private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; @@ -92,11 +95,11 @@ public class ConnectionConfiguration { conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT); this.replicaCallTimeoutMicroSecondScan = - conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms + conf.getInt(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT); this.metaReplicaCallTimeoutMicroSecondScan = - conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, - HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT); + conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, + HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT); this.retries = conf.getInt( HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 63ef865ed42a..122d75422f6b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -20,6 +20,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW; import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; @@ -31,11 +32,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -53,6 +56,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.io.netty.util.Timer; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; @@ -123,9 +127,8 @@ public static void setServerSideHConnectionRetriesConfig(final Configuration c, } /** - * A ClusterConnection that will short-circuit RPC making direct invocations against the - * localhost if the invocation target is 'this' server; save on network and protobuf - * invocations. + * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost + * if the invocation target is 'this' server; save on network and protobuf invocations. */ // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid. @VisibleForTesting // Class is visible so can assert we are short-circuiting when expected. @@ -136,8 +139,7 @@ public static class ShortCircuitingClusterConnection extends ConnectionImplement private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user, ServerName serverName, AdminService.BlockingInterface admin, - ClientService.BlockingInterface client) - throws IOException { + ClientService.BlockingInterface client) throws IOException { super(conf, pool, user); this.serverName = serverName; this.localHostAdmin = admin; @@ -157,7 +159,8 @@ public ClientService.BlockingInterface getClient(ServerName sn) throws IOExcepti @Override public MasterKeepAliveConnection getMaster() throws IOException { if (this.localHostClient instanceof MasterService.BlockingInterface) { - return new ShortCircuitMasterConnection((MasterService.BlockingInterface)this.localHostClient); + return new ShortCircuitMasterConnection( + (MasterService.BlockingInterface) this.localHostClient); } return super.getMaster(); } @@ -335,8 +338,8 @@ static Result filterCells(Result result, Cell keepCellsAfter) { return result; } Cell[] rawCells = result.rawCells(); - int index = - Arrays.binarySearch(rawCells, keepCellsAfter, CellComparator.getInstance()::compareWithoutRow); + int index = Arrays.binarySearch(rawCells, keepCellsAfter, + CellComparator.getInstance()::compareWithoutRow); if (index < 0) { index = -index - 1; } else { @@ -406,7 +409,7 @@ static boolean noMoreResultsForReverseScan(Scan scan, RegionInfo info) { static CompletableFuture> allOf(List> futures) { return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); + .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); } public static ScanResultCache createScanResultCache(Scan scan) { @@ -489,4 +492,84 @@ static void incRegionCountMetrics(ScanMetrics scanMetrics) { } scanMetrics.countOfRegions.incrementAndGet(); } + + /** + * Connect the two futures, if the src future is done, then mark the dst future as done. And if + * the dst future is done, then cancel the src future. This is used for timeline consistent read. + */ + private static void connect(CompletableFuture srcFuture, CompletableFuture dstFuture) { + addListener(srcFuture, (r, e) -> { + if (e != null) { + dstFuture.completeExceptionally(e); + } else { + dstFuture.complete(r); + } + }); + // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. + // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst + // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in + // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the + // tie. + addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); + } + + private static void sendRequestsToSecondaryReplicas( + Function> requestReplica, RegionLocations locs, + CompletableFuture future) { + if (future.isDone()) { + // do not send requests to secondary replicas if the future is done, i.e, the primary request + // has already been finished. + return; + } + for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { + CompletableFuture secondaryFuture = requestReplica.apply(replicaId); + connect(secondaryFuture, future); + } + } + + static CompletableFuture timelineConsistentRead(AsyncRegionLocator locator, + TableName tableName, Query query, byte[] row, RegionLocateType locateType, + Function> requestReplica, long rpcTimeoutNs, + long primaryCallTimeoutNs, Timer retryTimer) { + if (query.getConsistency() == Consistency.STRONG) { + return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); + } + // user specifies a replica id explicitly, just send request to the specific replica + if (query.getReplicaId() >= 0) { + return requestReplica.apply(query.getReplicaId()); + } + // Timeline consistent read, where we may send requests to other region replicas + CompletableFuture primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID); + CompletableFuture future = new CompletableFuture<>(); + connect(primaryFuture, future); + long startNs = System.nanoTime(); + // after the getRegionLocations, all the locations for the replicas of this region should have + // been cached, so it is not big deal to locate them again when actually sending requests to + // these replicas. + addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs), + (locs, error) -> { + if (error != null) { + LOG.warn( + "Failed to locate all the replicas for table={}, row='{}', locateType={}" + + " give up timeline consistent read", + tableName, Bytes.toStringBinary(row), locateType, error); + return; + } + if (locs.size() <= 1) { + LOG.warn( + "There are no secondary replicas for region {}, give up timeline consistent read", + locs.getDefaultRegionLocation().getRegion()); + return; + } + long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); + if (delayNs <= 0) { + sendRequestsToSecondaryReplicas(requestReplica, locs, future); + } else { + retryTimer.newTimeout( + timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs, + TimeUnit.NANOSECONDS); + } + }); + return future; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 2ab9f6ac940b..3a9456678802 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -20,6 +20,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; +import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.RpcChannel; @@ -36,7 +37,6 @@ import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.filter.BinaryComparator; @@ -45,11 +45,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.io.netty.util.Timer; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; @@ -77,10 +76,10 @@ @InterfaceAudience.Private class RawAsyncTableImpl implements AsyncTable { - private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class); - private final AsyncConnectionImpl conn; + private final Timer retryTimer; + private final TableName tableName; private final int defaultScannerCaching; @@ -103,8 +102,9 @@ class RawAsyncTableImpl implements AsyncTable { private final int startLogErrorsCnt; - RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase builder) { + RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase builder) { this.conn = conn; + this.retryTimer = retryTimer; this.tableName = builder.tableName; this.rpcTimeoutNs = builder.rpcTimeoutNs; this.readRpcTimeoutNs = builder.readRpcTimeoutNs; @@ -219,8 +219,8 @@ private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) return newCaller(row.getRow(), rpcTimeoutNs); } - private CompletableFuture get(Get get, int replicaId, long timeoutNs) { - return this. newCaller(get, timeoutNs) + private CompletableFuture get(Get get, int replicaId) { + return this. newCaller(get, readRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl . call(controller, loc, stub, get, RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), @@ -228,78 +228,11 @@ private CompletableFuture get(Get get, int replicaId, long timeoutNs) { .replicaId(replicaId).call(); } - // Connect the two futures, if the src future is done, then mark the dst future as done. And if - // the dst future is done, then cancel the src future. This is used for timeline consistent read. - private void connect(CompletableFuture srcFuture, CompletableFuture dstFuture) { - addListener(srcFuture, (r, e) -> { - if (e != null) { - dstFuture.completeExceptionally(e); - } else { - dstFuture.complete(r); - } - }); - // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture. - // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst - // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in - // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the - // tie. - addListener(dstFuture, (r, e) -> srcFuture.cancel(false)); - } - - private void timelineConsistentGet(Get get, RegionLocations locs, - CompletableFuture future) { - if (future.isDone()) { - // do not send requests to secondary replicas if the future is done, i.e, the primary request - // has already been finished. - return; - } - for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) { - CompletableFuture secondaryFuture = get(get, replicaId, readRpcTimeoutNs); - connect(secondaryFuture, future); - } - } - @Override public CompletableFuture get(Get get) { - if (get.getConsistency() == Consistency.STRONG) { - return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs); - } - // user specifies a replica id explicitly, just send request to the specific replica - if (get.getReplicaId() >= 0) { - return get(get, get.getReplicaId(), readRpcTimeoutNs); - } - - // Timeline consistent read, where we may send requests to other region replicas - CompletableFuture primaryFuture = - get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs); - CompletableFuture future = new CompletableFuture<>(); - connect(primaryFuture, future); - long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs(); - long startNs = System.nanoTime(); - addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(), - RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> { - if (error != null) { - LOG.warn( - "Failed to locate all the replicas for table={}, row='{}'," + - " give up timeline consistent read", - tableName, Bytes.toStringBinary(get.getRow()), error); - return; - } - if (locs.size() <= 1) { - LOG.warn( - "There are no secondary replicas for region {}," + " give up timeline consistent read", - locs.getDefaultRegionLocation().getRegion()); - return; - } - long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs); - if (delayNs <= 0) { - timelineConsistentGet(get, locs, future); - } else { - AsyncConnectionImpl.RETRY_TIMER.newTimeout( - timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS); - } - }); - return future; + return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), + RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, + conn.connConf.getPrimaryCallTimeoutNs(), retryTimer); } @Override @@ -494,8 +427,8 @@ private Scan setDefaultScanConfig(Scan scan) { } public void scan(Scan scan, AdvancedScanResultConsumer consumer) { - new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs, - maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); + new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, + pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start(); } private long resultSize2CacheSize(long maxResultSize) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java new file mode 100644 index 000000000000..c70af5123289 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +public abstract class AbstractTestAsyncTableRegionReplicasRead { + + protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + protected static TableName TABLE_NAME = TableName.valueOf("async"); + + protected static byte[] FAMILY = Bytes.toBytes("cf"); + + protected static byte[] QUALIFIER = Bytes.toBytes("cq"); + + protected static byte[] ROW = Bytes.toBytes("row"); + + protected static byte[] VALUE = Bytes.toBytes("value"); + + protected static int REPLICA_COUNT = 3; + + protected static AsyncConnection ASYNC_CONN; + + @Rule + public TestName testName = new TestName(); + + @Parameter + public Supplier> getTable; + + private static AsyncTable getRawTable() { + return ASYNC_CONN.getTable(TABLE_NAME); + } + + private static AsyncTable getTable() { + return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + } + + @Parameters + public static List params() { + return Arrays.asList( + new Supplier[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable }, + new Supplier[] { AbstractTestAsyncTableRegionReplicasRead::getTable }); + } + + protected static volatile boolean FAIL_PRIMARY_GET = false; + + protected static ConcurrentMap REPLICA_ID_TO_COUNT = + new ConcurrentHashMap<>(); + + public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + private void recordAndTryFail(ObserverContext c) + throws IOException { + RegionInfo region = c.getEnvironment().getRegionInfo(); + if (!region.getTable().equals(TABLE_NAME)) { + return; + } + REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()) + .incrementAndGet(); + if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) { + throw new IOException("Inject error"); + } + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + recordAndTryFail(c); + } + + @Override + public void preScannerOpen(ObserverContext c, Scan scan) + throws IOException { + recordAndTryFail(c); + } + } + + private static boolean allReplicasHaveRow(byte[] row) throws IOException { + for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { + if (region.get(new Get(row), false).isEmpty()) { + return false; + } + } + } + return true; + } + + protected static void startClusterAndCreateTable() throws Exception { + // 10 mins + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + TimeUnit.MINUTES.toMillis(10)); + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, + TimeUnit.MINUTES.toMillis(10)); + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + TimeUnit.MINUTES.toMillis(10)); + + // 1 second + TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND, + TimeUnit.SECONDS.toMicros(1)); + TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND, + TimeUnit.SECONDS.toMicros(1)); + + // set a small pause so we will retry very quickly + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); + + // infinite retry + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE); + + TEST_UTIL.startMiniCluster(3); + TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT) + .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException { + // this is the fastest way to let all replicas have the row + TEST_UTIL.getAdmin().disableTable(TABLE_NAME); + TEST_UTIL.getAdmin().enableTable(TABLE_NAME); + TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row)); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + IOUtils.closeQuietly(ASYNC_CONN); + TEST_UTIL.shutdownMiniCluster(); + } + + protected static int getSecondaryGetCount() { + return REPLICA_ID_TO_COUNT.entrySet().stream() + .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID) + .mapToInt(e -> e.getValue().get()).sum(); + } + + protected static int getPrimaryGetCount() { + AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID); + return primaryGetCount != null ? primaryGetCount.get() : 0; + } + + // replicaId = -1 means do not set replica + protected abstract void readAndCheck(AsyncTable table, int replicaId) throws Exception; + + @Test + public void testNoReplicaRead() throws Exception { + FAIL_PRIMARY_GET = false; + REPLICA_ID_TO_COUNT.clear(); + AsyncTable table = getTable.get(); + readAndCheck(table, -1); + // the primary region is fine and the primary timeout is 1 second which is long enough, so we + // should not send any requests to secondary replicas even if the consistency is timeline. + Thread.sleep(5000); + assertEquals(0, getSecondaryGetCount()); + } + + @Test + public void testReplicaRead() throws Exception { + // fail the primary get request + FAIL_PRIMARY_GET = true; + REPLICA_ID_TO_COUNT.clear(); + // make sure that we could still get the value from secondary replicas + AsyncTable table = getTable.get(); + readAndCheck(table, -1); + // make sure that the primary request has been canceled + Thread.sleep(5000); + int count = getPrimaryGetCount(); + Thread.sleep(10000); + assertEquals(count, getPrimaryGetCount()); + } + + @Test + public void testReadSpecificReplica() throws Exception { + FAIL_PRIMARY_GET = false; + REPLICA_ID_TO_COUNT.clear(); + AsyncTable table = getTable.get(); + for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) { + readAndCheck(table, replicaId); + assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java index 2117116d7457..3e1d994cc81c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java @@ -18,211 +18,38 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; -import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncTableRegionReplicasGet { +public class TestAsyncTableRegionReplicasGet extends AbstractTestAsyncTableRegionReplicasRead { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private static TableName TABLE_NAME = TableName.valueOf("async"); - - private static byte[] FAMILY = Bytes.toBytes("cf"); - - private static byte[] QUALIFIER = Bytes.toBytes("cq"); - - private static byte[] ROW = Bytes.toBytes("row"); - - private static byte[] VALUE = Bytes.toBytes("value"); - - private static int REPLICA_COUNT = 3; - - private static AsyncConnection ASYNC_CONN; - - @Rule - public TestName testName = new TestName(); - - @Parameter - public Supplier> getTable; - - private static AsyncTable getRawTable() { - return ASYNC_CONN.getTable(TABLE_NAME); - } - - private static AsyncTable getTable() { - return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); - } - - @Parameters - public static List params() { - return Arrays.asList(new Supplier[] { TestAsyncTableRegionReplicasGet::getRawTable }, - new Supplier[] { TestAsyncTableRegionReplicasGet::getTable }); - } - - private static volatile boolean FAIL_PRIMARY_GET = false; - - private static ConcurrentMap REPLICA_ID_TO_COUNT = - new ConcurrentHashMap<>(); - - public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor { - - @Override - public Optional getRegionObserver() { - return Optional.of(this); - } - - @Override - public void preGetOp(ObserverContext c, Get get, - List result) throws IOException { - RegionInfo region = c.getEnvironment().getRegionInfo(); - if (!region.getTable().equals(TABLE_NAME)) { - return; - } - REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger()) - .incrementAndGet(); - if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) { - throw new IOException("Inject error"); - } - } - } - - private static boolean allReplicasHaveRow() throws IOException { - for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) { - for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) { - if (region.get(new Get(ROW), false).isEmpty()) { - return false; - } - } - } - return true; - } - @BeforeClass public static void setUpBeforeClass() throws Exception { - // 10 mins - TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, - TimeUnit.MINUTES.toMillis(10)); - // 1 second - TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND, - TimeUnit.SECONDS.toMicros(1)); - // set a small pause so we will retry very quickly - TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); - // infinite retry - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE); - TEST_UTIL.startMiniCluster(3); - TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT) - .setCoprocessor(FailPrimaryGetCP.class.getName()).build()); - TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + startClusterAndCreateTable(); AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); - // this is the fastest way to let all replicas have the row - TEST_UTIL.getAdmin().disableTable(TABLE_NAME); - TEST_UTIL.getAdmin().enableTable(TABLE_NAME); - TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow()); + waitUntilAllReplicasHaveRow(ROW); } - @AfterClass - public static void tearDownAfterClass() throws Exception { - IOUtils.closeQuietly(ASYNC_CONN); - TEST_UTIL.shutdownMiniCluster(); - } - - private static int getSecondaryGetCount() { - return REPLICA_ID_TO_COUNT.entrySet().stream() - .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID) - .mapToInt(e -> e.getValue().get()).sum(); - } - - private static int getPrimaryGetCount() { - AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID); - return primaryGetCount != null ? primaryGetCount.get() : 0; - } - - @Test - public void testNoReplicaRead() throws Exception { - FAIL_PRIMARY_GET = false; - REPLICA_ID_TO_COUNT.clear(); - AsyncTable table = getTable.get(); - Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); - for (int i = 0; i < 1000; i++) { - assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); - } - // the primary region is fine and the primary timeout is 1 second which is long enough, so we - // should not send any requests to secondary replicas even if the consistency is timeline. - Thread.sleep(5000); - assertEquals(0, getSecondaryGetCount()); - } - - @Test - public void testReplicaRead() throws Exception { - // fail the primary get request - FAIL_PRIMARY_GET = true; - REPLICA_ID_TO_COUNT.clear(); + @Override + protected void readAndCheck(AsyncTable table, int replicaId) throws Exception { Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); - // make sure that we could still get the value from secondary replicas - AsyncTable table = getTable.get(); - assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); - // make sure that the primary request has been canceled - Thread.sleep(5000); - int count = getPrimaryGetCount(); - Thread.sleep(10000); - assertEquals(count, getPrimaryGetCount()); - } - - @Test - public void testReadSpecificReplica() throws Exception { - FAIL_PRIMARY_GET = false; - REPLICA_ID_TO_COUNT.clear(); - Get get = new Get(ROW).setConsistency(Consistency.TIMELINE); - AsyncTable table = getTable.get(); - for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) { + if (replicaId >= 0) { get.setReplicaId(replicaId); - assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); - assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get()); } + assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java new file mode 100644 index 000000000000..dd5c8e57f602 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableRegionReplicasScan extends AbstractTestAsyncTableRegionReplicasRead { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasScan.class); + + private static int ROW_COUNT = 1000; + + private static byte[] getRow(int i) { + return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(ROW), i)); + } + + private static byte[] getValue(int i) { + return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(VALUE), i)); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + startClusterAndCreateTable(); + AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME); + for (int i = 0; i < ROW_COUNT; i++) { + table.put(new Put(getRow(i)).addColumn(FAMILY, QUALIFIER, getValue(i))).get(); + } + waitUntilAllReplicasHaveRow(getRow(ROW_COUNT - 1)); + } + + @Override + protected void readAndCheck(AsyncTable table, int replicaId) throws IOException { + Scan scan = new Scan().setConsistency(Consistency.TIMELINE).setCaching(1); + if (replicaId >= 0) { + scan.setReplicaId(replicaId); + } + try (ResultScanner scanner = table.getScanner(scan)) { + for (int i = 0; i < 1000; i++) { + Result result = scanner.next(); + assertNotNull(result); + assertArrayEquals(getValue(i), result.getValue(FAMILY, QUALIFIER)); + } + } + } +}