Skip to content

Commit

Permalink
Merge branch 'master' into HBASE-26867
Browse files Browse the repository at this point in the history
  • Loading branch information
frostruan authored Jul 31, 2023
2 parents 66e9807 + 0bbc8d1 commit 3f10e98
Show file tree
Hide file tree
Showing 135 changed files with 5,150 additions and 2,600 deletions.
10 changes: 5 additions & 5 deletions dev-support/create-release/release-util.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ function get_release_info {
if [[ -z "${ASF_REPO}" ]]; then
ASF_REPO="https://gitbox.apache.org/repos/asf/${PROJECT}.git"
fi
if [[ -z "${ASF_REPO_WEBUI}" ]]; then
ASF_REPO_WEBUI="https://gitbox.apache.org/repos/asf?p=${PROJECT}.git"
fi
if [[ -z "${ASF_GITHUB_REPO}" ]]; then
ASF_GITHUB_REPO="https://github.com/apache/${PROJECT}"
fi
if [[ -z "${ASF_GITHUB_WEBUI}" ]] ; then
ASF_GITHUB_WEBUI="https://raw.githubusercontent.com/apache/${PROJECT}"
fi
if [ -z "$GIT_BRANCH" ]; then
# If no branch is specified, find out the latest branch from the repo.
GIT_BRANCH="$(git ls-remote --heads "$ASF_REPO" |
Expand All @@ -167,14 +167,14 @@ function get_release_info {

# Find the current version for the branch.
local version
version="$(curl -s "$ASF_REPO_WEBUI;a=blob_plain;f=pom.xml;hb=refs/heads/$GIT_BRANCH" |
version="$(curl -s "$ASF_GITHUB_WEBUI/refs/heads/$GIT_BRANCH/pom.xml" |
parse_version)"
# We do not want to expand ${revision} here, see https://maven.apache.org/maven-ci-friendly.html
# If we use ${revision} as placeholder, we need to parse the revision property to
# get maven version
# shellcheck disable=SC2016
if [[ "${version}" == '${revision}' ]]; then
version="$(curl -s "$ASF_REPO_WEBUI;a=blob_plain;f=pom.xml;hb=refs/heads/$GIT_BRANCH" |
version="$(curl -s "$ASF_GITHUB_WEBUI/refs/heads/$GIT_BRANCH/pom.xml" |
parse_revision)"
fi
log "Current branch VERSION is $version."
Expand Down
2 changes: 1 addition & 1 deletion dev-support/git-jira-release-audit/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ blessed==1.17.0
certifi==2022.12.07
cffi==1.13.2
chardet==3.0.4
cryptography==39.0.1
cryptography==41.0.0
defusedxml==0.6.0
enlighten==1.4.0
gitdb2==2.0.6
Expand Down
5 changes: 5 additions & 0 deletions hbase-backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

/**
* Additional Asynchronous Admin capabilities for clients.
*/
@InterfaceAudience.Public
public final class AsyncAdminClientUtils {

private AsyncAdminClientUtils() {
}

/**
* Execute the given coprocessor call on all region servers.
* <p>
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
* one line lambda expression, like:
*
* <pre>
* channel -&gt; xxxService.newStub(channel)
* </pre>
*
* @param asyncAdmin the asynchronous administrative API for HBase.
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link ServiceCaller} for more details.
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @return Map of each region server to its result of the protobuf rpc call, wrapped by a
* {@link CompletableFuture}.
* @see ServiceCaller
*/
public static <S, R> CompletableFuture<Map<ServerName, Object>>
coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable) {
CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>();
FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
Map<ServerName, Object> resultMap = new ConcurrentHashMap<>();
for (ServerName regionServer : regionServers) {
FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer),
(server, err) -> {
if (err != null) {
resultMap.put(regionServer, err);
} else {
resultMap.put(regionServer, server);
}
if (resultMap.size() == regionServers.size()) {
future.complete(Collections.unmodifiableMap(resultMap));
}
});
}
});
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
Expand All @@ -44,7 +45,7 @@ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl con
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, Collections.emptyMap());
this.serverName = serverName;
this.callable = callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
Expand All @@ -35,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -56,6 +55,7 @@
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.HBaseServerExceptionPauseManager;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -102,10 +102,6 @@ class AsyncBatchRpcRetryingCaller<T> {

private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;

private final long pauseNs;

private final long pauseNsForServerOverloaded;

private final int maxAttempts;

private final long operationTimeoutNs;
Expand All @@ -116,6 +112,10 @@ class AsyncBatchRpcRetryingCaller<T> {

private final long startNs;

private final HBaseServerExceptionPauseManager pauseManager;

private final Map<String, byte[]> requestAttributes;

// we can not use HRegionLocation as the map key because the hashCode and equals method of
// HRegionLocation only consider serverName.
private static final class RegionRequest {
Expand Down Expand Up @@ -151,12 +151,11 @@ public int getPriority() {

public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
Map<String, byte[]> requestAttributes) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand All @@ -182,6 +181,9 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
}
this.action2Errors = new IdentityHashMap<>();
this.startNs = System.nanoTime();
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
this.requestAttributes = requestAttributes;
}

private static boolean hasIncrementOrAppend(Row action) {
Expand All @@ -204,10 +206,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
return false;
}

private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}

private List<ThrowableWithExtraContext> removeErrors(Action action) {
synchronized (action2Errors) {
return action2Errors.remove(action);
Expand Down Expand Up @@ -360,14 +358,14 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
}
});
if (!failedActions.isEmpty()) {
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), null);
}
}

private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
long remainingNs;
if (operationTimeoutNs > 0) {
remainingNs = remainingTimeNs();
remainingNs = pauseManager.remainingTimeNs(startNs);
if (remainingNs <= 0) {
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
tries);
Expand Down Expand Up @@ -398,6 +396,7 @@ private void sendToServer(ServerName serverName, ServerRequest serverReq, int tr
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName));
controller.setRequestAttributes(requestAttributes);
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
}
Expand Down Expand Up @@ -465,30 +464,23 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
HBaseServerException.isServerOverloaded(error));
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, error);
}

private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isServerOverloaded) {
Throwable error) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
failAll(actions, tries);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}

if (isServerOverloaded) {
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
if (!maybePauseNsToUse.isPresent()) {
failAll(actions, tries);
return;
}
long delayNs = maybePauseNsToUse.getAsLong();
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
Expand All @@ -498,7 +490,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
private void groupAndSend(Stream<Action> actions, int tries) {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
if (locateTimeoutNs <= 0) {
failAll(actions, tries);
return;
Expand Down Expand Up @@ -529,7 +521,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
sendOrDelay(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries, false, false);
tryResubmit(locateFailed.stream(), tries, false, null);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -92,9 +93,12 @@ class AsyncClientScanner {

private final Span span;

private final Map<String, byte[]> requestAttributes;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt,
Map<String, byte[]> requestAttributes) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
}
Expand All @@ -113,6 +117,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = createScanResultCache(scan);
this.requestAttributes = requestAttributes;
if (scan.isScanMetricsEnabled()) {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
Expand Down Expand Up @@ -191,15 +196,17 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
}

private void startScan(OpenScannerResponse resp) {
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
.location(resp.loc).remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.start(resp.controller, resp.resp), (hasMore, error) -> {
addListener(
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes).start(resp.controller, resp.resp),
(hasMore, error) -> {
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
Expand Down Expand Up @@ -231,8 +238,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
.priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
.call();
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes).action(this::callOpenScanner).call();
}
}

Expand Down
Loading

0 comments on commit 3f10e98

Please sign in to comment.