Skip to content

Commit

Permalink
HBASE-21663 Add replica scan support
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jan 11, 2019
1 parent 7bebdff commit fbf7937
Show file tree
Hide file tree
Showing 17 changed files with 549 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;

/**
Expand All @@ -40,7 +41,7 @@ public interface Callable<T> {
private final Callable<T> callable;
private ServerName serverName;

public AsyncAdminRequestRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
Expand Down Expand Up @@ -69,10 +70,4 @@ protected void doCall() {
future.complete(result);
});
}

@Override
CompletableFuture<T> call() {
doCall();
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
Expand All @@ -80,7 +80,7 @@ class AsyncBatchRpcRetryingCaller<T> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);

private final HashedWheelTimer retryTimer;
private final Timer retryTimer;

private final AsyncConnectionImpl conn;

Expand Down Expand Up @@ -130,7 +130,7 @@ public void addAction(HRegionLocation loc, Action action) {
}
}

public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,27 @@

import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
Expand Down Expand Up @@ -59,6 +69,8 @@ class AsyncClientScanner {

private final AsyncConnectionImpl conn;

private final Timer retryTimer;

private final long pauseNs;

private final int maxAttempts;
Expand All @@ -72,7 +84,7 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
Expand All @@ -84,6 +96,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.consumer = consumer;
this.tableName = tableName;
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
Expand Down Expand Up @@ -120,20 +133,19 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In
}
}

private int openScannerTries;
private final AtomicInteger openScannerTries = new AtomicInteger();

private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries > 1) {
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
openScannerTries++;
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
scan, scan.getCaching(), false);
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
Expand All @@ -148,40 +160,53 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
}

private void startScan(OpenScannerResponse resp) {
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp).whenComplete((hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
}
});
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
.location(resp.loc).remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp), (hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
}
});
}

private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call();
}

private long getPrimaryTimeoutNs() {
return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs()
: conn.connConf.getPrimaryScanTimeoutNs();
}

private void openScanner() {
incRegionCountMetrics(scanMetrics);
openScannerTries = 1;
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call().whenComplete((resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
startScan(resp);
});
openScannerTries.set(1);
addListener(
timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer),
(resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
startScan(resp);
});
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
Expand All @@ -41,6 +43,8 @@
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
Expand Down Expand Up @@ -100,6 +104,10 @@ class AsyncConnectionConfiguration {
// timeout, we will send request to secondaries.
private final long primaryCallTimeoutNs;

private final long primaryScanTimeoutNs;

private final long primaryMetaScanTimeoutNs;

@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
Expand Down Expand Up @@ -132,6 +140,11 @@ class AsyncConnectionConfiguration {
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
this.primaryMetaScanTimeoutNs =
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
}

long getMetaOperationTimeoutNs() {
Expand Down Expand Up @@ -193,4 +206,12 @@ long getWriteBufferPeriodicFlushTimeoutNs() {
long getPrimaryCallTimeoutNs() {
return primaryCallTimeoutNs;
}

long getPrimaryScanTimeoutNs() {
return primaryScanTimeoutNs;
}

long getPrimaryMetaScanTimeoutNs() {
return primaryMetaScanTimeoutNs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName t

@Override
public AsyncTable<AdvancedScanResultConsumer> build() {
return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
}
};
}
Expand All @@ -287,7 +287,8 @@ public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName

@Override
public AsyncTable<ScanResultConsumer> build() {
RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
RawAsyncTableImpl rawTable =
new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hbase.client;

import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;

import java.util.concurrent.CompletableFuture;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;

/**
Expand All @@ -39,7 +39,7 @@ public interface Callable<T> {

private final Callable<T> callable;

public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
Expand All @@ -66,10 +66,4 @@ protected void doCall() {
});
});
}

@Override
public CompletableFuture<T> call() {
doCall();
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

@InterfaceAudience.Private
public abstract class AsyncRpcRetryingCaller<T> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class);

private final HashedWheelTimer retryTimer;
private final Timer retryTimer;

private final long startNs;

Expand All @@ -68,9 +68,8 @@ public abstract class AsyncRpcRetryingCaller<T> {

protected final HBaseRpcController controller;

public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.pauseNs = pauseNs;
Expand Down
Loading

0 comments on commit fbf7937

Please sign in to comment.