Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25542 Add client detail to scan name so when lease expires, we … #2930

Merged
merged 2 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -93,6 +94,7 @@
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
Expand Down Expand Up @@ -395,7 +397,6 @@ public void run() throws IOException {
* An Rpc callback for doing shipped() call on a RegionScanner.
*/
private class RegionScannerShippedCallBack implements RpcCallback {

private final String scannerName;
private final Shipper shipper;
private final Lease lease;
Expand Down Expand Up @@ -445,43 +446,48 @@ public void run() {
/**
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
private static final class RegionScannerHolder {

static final class RegionScannerHolder {
private final AtomicLong nextCallSeq = new AtomicLong(0);
private final String scannerName;
private final RegionScanner s;
private final HRegion r;
private final RpcCallback closeCallBack;
private final RpcCallback shippedCallback;
private byte[] rowOfLastPartialResult;
private boolean needCursor;
private boolean fullRegionScan;
private final String clientIPAndPort;

public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
RegionScannerHolder(RegionScanner s, HRegion r,
RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor,
boolean fullRegionScan) {
this.scannerName = scannerName;
boolean fullRegionScan, String clientIPAndPort) {
this.s = s;
this.r = r;
this.closeCallBack = closeCallBack;
this.shippedCallback = shippedCallback;
this.needCursor = needCursor;
this.fullRegionScan = fullRegionScan;
this.clientIPAndPort = clientIPAndPort;
}

public long getNextCallSeq() {
long getNextCallSeq() {
return nextCallSeq.get();
}

public boolean incNextCallSeq(long currentSeq) {
boolean incNextCallSeq(long currentSeq) {
// Use CAS to prevent multiple scan request running on the same scanner.
return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
}

// Should be called only when we need to print lease expired messages otherwise
// cache the String once made.
@Override
public String toString() {
return this.clientIPAndPort + ", " + this.r.getRegionInfo().getRegionNameAsString();
}
}

/**
* Instantiated as a scanner lease. If the lease times out, the scanner is
* closed
* Instantiated as a scanner lease. If the lease times out, the scanner is closed
*/
private class ScannerListener implements LeaseListener {
private final String scannerName;
Expand All @@ -493,31 +499,32 @@ private class ScannerListener implements LeaseListener {
@Override
public void leaseExpired() {
RegionScannerHolder rsh = scanners.remove(this.scannerName);
if (rsh != null) {
RegionScanner s = rsh.s;
LOG.info("Scanner " + this.scannerName + " lease expired on region "
+ s.getRegionInfo().getRegionNameAsString());
HRegion region = null;
if (rsh == null) {
LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
return;
}
LOG.info("Scanner lease {} expired {}, user={}", this.scannerName, rsh,
RpcServer.getRequestUserName().orElse(null));
RegionScanner s = rsh.s;
HRegion region = null;
try {
region = regionServer.getRegion(s.getRegionInfo().getRegionName());
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preScannerClose(s);
}
} catch (IOException e) {
LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
RpcServer.getRequestUserName().orElse(null));
} finally {
try {
region = regionServer.getRegion(s.getRegionInfo().getRegionName());
s.close();
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preScannerClose(s);
region.getCoprocessorHost().postScannerClose(s);
}
} catch (IOException e) {
LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
} finally {
try {
s.close();
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(s);
}
} catch (IOException e) {
LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
}
LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
RpcServer.getRequestUserName().orElse(null));
}
} else {
LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
" scanner found, hence no chance to close that related scanner!");
}
}
}
Expand Down Expand Up @@ -1301,14 +1308,19 @@ public int getScannersCount() {
return scanners.size();
}

public
/**
* @return The outstanding RegionScanner for <code>scannerId</code> or null if none found.
*/
RegionScanner getScanner(long scannerId) {
String scannerIdString = Long.toString(scannerId);
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
if (scannerHolder != null) {
return scannerHolder.s;
}
return null;
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
return rsh == null? null: rsh.s;
}

/**
* @return The associated RegionScannerHolder for <code>scannerId</code> or null.
*/
private RegionScannerHolder getRegionScannerHolder(long scannerId) {
return scanners.get(toScannerName(scannerId));
}

public String getScanDetailsWithId(long scannerId) {
Expand Down Expand Up @@ -1342,12 +1354,8 @@ public String getScanDetailsWithRequest(ScanRequest request) {
* Currently the vtime is the number of "next" calls.
*/
long getScannerVirtualTime(long scannerId) {
String scannerIdString = Long.toString(scannerId);
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
if (scannerHolder != null) {
return scannerHolder.getNextCallSeq();
}
return 0L;
RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
return rsh == null? 0L: rsh.getNextCallSeq();
}

/**
Expand Down Expand Up @@ -1391,24 +1399,36 @@ Object addSize(RpcCallContext context, Result r, Object lastBlock) {
return lastBlock;
}

/**
* @return Remote client's ip and port else null if can't be determined.
*/
static String getRemoteClientIpAndPort() {
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
if (rpcCall == null) {
return HConstants.EMPTY_STRING;
}
InetAddress address = rpcCall.getRemoteAddress();
if (address == null) {
return HConstants.EMPTY_STRING;
}
// Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
// resolution. Just use the IP. It is generally a smaller amount of info to keep around while
// scanning than a hostname anyways.
return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
}

private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
Lease lease = regionServer.getLeaseManager().createLease(
scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
RpcCallback closeCallback;
if (s instanceof RpcCallback) {
closeCallback = (RpcCallback) s;
} else {
closeCallback = new RegionScannerCloseCallBack(s);
}

RegionScannerHolder rsh =
new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback,
needCursor, fullRegionScan);
RpcCallback closeCallback = s instanceof RpcCallback?
(RpcCallback)s: new RegionScannerCloseCallBack(s);
RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
needCursor, fullRegionScan, getRemoteClientIpAndPort());
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
scannerName;
scannerName + ", " + existing;
return rsh;
}

Expand Down Expand Up @@ -3083,8 +3103,8 @@ public synchronized Throwable fillInStackTrace() {
};

private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
String scannerName = Long.toString(request.getScannerId());
RegionScannerHolder rsh = scanners.get(scannerName);
String scannerName = toScannerName(request.getScannerId());
RegionScannerHolder rsh = this.scanners.get(scannerName);
if (rsh == null) {
// just ignore the next or close request if scanner does not exists.
if (closedScanners.getIfPresent(scannerName) != null) {
Expand Down Expand Up @@ -3123,8 +3143,12 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
return rsh;
}

private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
throws IOException {
/**
* @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
* value.
*/
private Pair<String, RegionScannerHolder> newRegionScanner(ScanRequest request,
ScanResponse.Builder builder) throws IOException {
HRegion region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
Expand Down Expand Up @@ -3155,13 +3179,24 @@ private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.B
builder.setScannerId(scannerId);
builder.setMvccReadPoint(scanner.getMvccReadPoint());
builder.setTtl(scannerLeaseTimeoutPeriod);
String scannerName = String.valueOf(scannerId);
String scannerName = toScannerName(scannerId);

boolean fullRegionScan = !region.getRegionInfo().getTable().isSystemTable() &&
isFullRegionScan(scan, region);

return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(),
fullRegionScan);
return new Pair<String, RegionScannerHolder>(scannerName,
addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(),
fullRegionScan));
}

/**
* The returned String is used as key doing look up of outstanding Scanners in this Servers'
* this.scanners, the Map of outstanding scanners and their current state.
* @param scannerId A scanner long id.
* @return The long id as a String.
*/
private static String toScannerName(long scannerId) {
return Long.toString(scannerId);
}

private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
Expand Down Expand Up @@ -3435,7 +3470,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
checkOpen();
} catch (IOException e) {
if (request.hasScannerId()) {
String scannerName = Long.toString(request.getScannerId());
String scannerName = toScannerName(request.getScannerId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Server shutting down and client tried to access missing scanner " + scannerName);
Expand All @@ -3458,14 +3493,19 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
rpcScanRequestCount.increment();
RegionScannerHolder rsh;
ScanResponse.Builder builder = ScanResponse.newBuilder();
String scannerName;
try {
if (request.hasScannerId()) {
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
// for more details.
builder.setScannerId(request.getScannerId());
long scannerId = request.getScannerId();
builder.setScannerId(scannerId);
scannerName = toScannerName(scannerId);
rsh = getRegionScanner(request);
} else {
rsh = newRegionScanner(request, builder);
Pair<String, RegionScannerHolder> scannerNameAndRSH = newRegionScanner(request, builder);
scannerName = scannerNameAndRSH.getFirst();
rsh = scannerNameAndRSH.getSecond();
}
} catch (IOException e) {
if (e == SCANNER_ALREADY_CLOSED) {
Expand All @@ -3479,11 +3519,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
rpcFullScanRequestCount.increment();
}
HRegion region = rsh.r;
String scannerName = rsh.scannerName;
LeaseManager.Lease lease;
try {
// Remove lease while its being processed in server; protects against case
// where processing of request takes > lease expiration time.
// where processing of request takes > lease expiration time. or null if none found.
lease = regionServer.getLeaseManager().removeLease(scannerName);
} catch (LeaseException e) {
throw new ServiceException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;

import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test parts of {@link RSRpcServices}
*/
@Category({ RegionServerTests.class, MediumTests.class})
public class TestRSRpcServices {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRSRpcServices.class);

private static final Logger LOG = LoggerFactory.getLogger(TestRSRpcServices.class);

/**
* Simple test of the toString on RegionScannerHolder works.
* Just creates one and calls #toString on it.
*/
@Test
public void testRegionScannerHolderToString() throws UnknownHostException {
RpcCall call = Mockito.mock(RpcCall.class);
int port = 1234;
Mockito.when(call.getRemotePort()).thenReturn(port);
InetAddress address = InetAddress.getLocalHost();
Mockito.when(call.getRemoteAddress()).thenReturn(address);
RpcServer.setCurrentCall(call);
String clientIpAndPort = RSRpcServices.getRemoteClientIpAndPort();
HRegion region = Mockito.mock(HRegion.class);
Mockito.when(region.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
RSRpcServices.RegionScannerHolder rsh = new RSRpcServices.RegionScannerHolder(null, region,
null, null, false, false, clientIpAndPort);
LOG.info("rsh={}", rsh);
}
}