diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index ce3c43020bc4..7949b981624c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -23,6 +23,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -93,6 +94,7 @@
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
+import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
@@ -395,7 +397,6 @@ public void run() throws IOException {
* An Rpc callback for doing shipped() call on a RegionScanner.
*/
private class RegionScannerShippedCallBack implements RpcCallback {
-
private final String scannerName;
private final Shipper shipper;
private final Lease lease;
@@ -445,10 +446,8 @@ public void run() {
/**
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
- private static final class RegionScannerHolder {
-
+ static final class RegionScannerHolder {
private final AtomicLong nextCallSeq = new AtomicLong(0);
- private final String scannerName;
private final RegionScanner s;
private final HRegion r;
private final RpcCallback closeCallBack;
@@ -456,32 +455,39 @@ private static final class RegionScannerHolder {
private byte[] rowOfLastPartialResult;
private boolean needCursor;
private boolean fullRegionScan;
+ private final String clientIPAndPort;
- public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
+ RegionScannerHolder(RegionScanner s, HRegion r,
RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor,
- boolean fullRegionScan) {
- this.scannerName = scannerName;
+ boolean fullRegionScan, String clientIPAndPort) {
this.s = s;
this.r = r;
this.closeCallBack = closeCallBack;
this.shippedCallback = shippedCallback;
this.needCursor = needCursor;
this.fullRegionScan = fullRegionScan;
+ this.clientIPAndPort = clientIPAndPort;
}
- public long getNextCallSeq() {
+ long getNextCallSeq() {
return nextCallSeq.get();
}
- public boolean incNextCallSeq(long currentSeq) {
+ boolean incNextCallSeq(long currentSeq) {
// Use CAS to prevent multiple scan request running on the same scanner.
return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1);
}
+
+ // Should be called only when we need to print lease expired messages otherwise
+ // cache the String once made.
+ @Override
+ public String toString() {
+ return this.clientIPAndPort + ", " + this.r.getRegionInfo().getRegionNameAsString();
+ }
}
/**
- * Instantiated as a scanner lease. If the lease times out, the scanner is
- * closed
+ * Instantiated as a scanner lease. If the lease times out, the scanner is closed
*/
private class ScannerListener implements LeaseListener {
private final String scannerName;
@@ -493,31 +499,32 @@ private class ScannerListener implements LeaseListener {
@Override
public void leaseExpired() {
RegionScannerHolder rsh = scanners.remove(this.scannerName);
- if (rsh != null) {
- RegionScanner s = rsh.s;
- LOG.info("Scanner " + this.scannerName + " lease expired on region "
- + s.getRegionInfo().getRegionNameAsString());
- HRegion region = null;
+ if (rsh == null) {
+ LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
+ return;
+ }
+ LOG.info("Scanner lease {} expired {}, user={}", this.scannerName, rsh,
+ RpcServer.getRequestUserName().orElse(null));
+ RegionScanner s = rsh.s;
+ HRegion region = null;
+ try {
+ region = regionServer.getRegion(s.getRegionInfo().getRegionName());
+ if (region != null && region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preScannerClose(s);
+ }
+ } catch (IOException e) {
+ LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
+ RpcServer.getRequestUserName().orElse(null));
+ } finally {
try {
- region = regionServer.getRegion(s.getRegionInfo().getRegionName());
+ s.close();
if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().preScannerClose(s);
+ region.getCoprocessorHost().postScannerClose(s);
}
} catch (IOException e) {
- LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
- } finally {
- try {
- s.close();
- if (region != null && region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postScannerClose(s);
- }
- } catch (IOException e) {
- LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
- }
+ LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
+ RpcServer.getRequestUserName().orElse(null));
}
- } else {
- LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
- " scanner found, hence no chance to close that related scanner!");
}
}
}
@@ -1301,14 +1308,19 @@ public int getScannersCount() {
return scanners.size();
}
- public
+ /**
+ * @return The outstanding RegionScanner for scannerId
or null if none found.
+ */
RegionScanner getScanner(long scannerId) {
- String scannerIdString = Long.toString(scannerId);
- RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
- if (scannerHolder != null) {
- return scannerHolder.s;
- }
- return null;
+ RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
+ return rsh == null? null: rsh.s;
+ }
+
+ /**
+ * @return The associated RegionScannerHolder for scannerId
or null.
+ */
+ private RegionScannerHolder getRegionScannerHolder(long scannerId) {
+ return scanners.get(toScannerName(scannerId));
}
public String getScanDetailsWithId(long scannerId) {
@@ -1342,12 +1354,8 @@ public String getScanDetailsWithRequest(ScanRequest request) {
* Currently the vtime is the number of "next" calls.
*/
long getScannerVirtualTime(long scannerId) {
- String scannerIdString = Long.toString(scannerId);
- RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
- if (scannerHolder != null) {
- return scannerHolder.getNextCallSeq();
- }
- return 0L;
+ RegionScannerHolder rsh = getRegionScannerHolder(scannerId);
+ return rsh == null? 0L: rsh.getNextCallSeq();
}
/**
@@ -1391,24 +1399,36 @@ Object addSize(RpcCallContext context, Result r, Object lastBlock) {
return lastBlock;
}
+ /**
+ * @return Remote client's ip and port else null if can't be determined.
+ */
+ static String getRemoteClientIpAndPort() {
+ RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
+ if (rpcCall == null) {
+ return HConstants.EMPTY_STRING;
+ }
+ InetAddress address = rpcCall.getRemoteAddress();
+ if (address == null) {
+ return HConstants.EMPTY_STRING;
+ }
+ // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
+ // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
+ // scanning than a hostname anyways.
+ return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
+ }
+
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
Lease lease = regionServer.getLeaseManager().createLease(
scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName));
RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, shipper, lease);
- RpcCallback closeCallback;
- if (s instanceof RpcCallback) {
- closeCallback = (RpcCallback) s;
- } else {
- closeCallback = new RegionScannerCloseCallBack(s);
- }
-
- RegionScannerHolder rsh =
- new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback,
- needCursor, fullRegionScan);
+ RpcCallback closeCallback = s instanceof RpcCallback?
+ (RpcCallback)s: new RegionScannerCloseCallBack(s);
+ RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
+ needCursor, fullRegionScan, getRemoteClientIpAndPort());
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
- scannerName;
+ scannerName + ", " + existing;
return rsh;
}
@@ -3083,8 +3103,8 @@ public synchronized Throwable fillInStackTrace() {
};
private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOException {
- String scannerName = Long.toString(request.getScannerId());
- RegionScannerHolder rsh = scanners.get(scannerName);
+ String scannerName = toScannerName(request.getScannerId());
+ RegionScannerHolder rsh = this.scanners.get(scannerName);
if (rsh == null) {
// just ignore the next or close request if scanner does not exists.
if (closedScanners.getIfPresent(scannerName) != null) {
@@ -3123,8 +3143,12 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
return rsh;
}
- private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.Builder builder)
- throws IOException {
+ /**
+ * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
+ * value.
+ */
+ private Pair newRegionScanner(ScanRequest request,
+ ScanResponse.Builder builder) throws IOException {
HRegion region = getRegion(request.getRegion());
ClientProtos.Scan protoScan = request.getScan();
boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
@@ -3155,13 +3179,24 @@ private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.B
builder.setScannerId(scannerId);
builder.setMvccReadPoint(scanner.getMvccReadPoint());
builder.setTtl(scannerLeaseTimeoutPeriod);
- String scannerName = String.valueOf(scannerId);
+ String scannerName = toScannerName(scannerId);
boolean fullRegionScan = !region.getRegionInfo().getTable().isSystemTable() &&
isFullRegionScan(scan, region);
- return addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(),
- fullRegionScan);
+ return new Pair(scannerName,
+ addScanner(scannerName, scanner, shipper, region, scan.isNeedCursorResult(),
+ fullRegionScan));
+ }
+
+ /**
+ * The returned String is used as key doing look up of outstanding Scanners in this Servers'
+ * this.scanners, the Map of outstanding scanners and their current state.
+ * @param scannerId A scanner long id.
+ * @return The long id as a String.
+ */
+ private static String toScannerName(long scannerId) {
+ return Long.toString(scannerId);
}
private void checkScanNextCallSeq(ScanRequest request, RegionScannerHolder rsh)
@@ -3435,7 +3470,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
checkOpen();
} catch (IOException e) {
if (request.hasScannerId()) {
- String scannerName = Long.toString(request.getScannerId());
+ String scannerName = toScannerName(request.getScannerId());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Server shutting down and client tried to access missing scanner " + scannerName);
@@ -3458,14 +3493,19 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
rpcScanRequestCount.increment();
RegionScannerHolder rsh;
ScanResponse.Builder builder = ScanResponse.newBuilder();
+ String scannerName;
try {
if (request.hasScannerId()) {
// The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
// for more details.
- builder.setScannerId(request.getScannerId());
+ long scannerId = request.getScannerId();
+ builder.setScannerId(scannerId);
+ scannerName = toScannerName(scannerId);
rsh = getRegionScanner(request);
} else {
- rsh = newRegionScanner(request, builder);
+ Pair scannerNameAndRSH = newRegionScanner(request, builder);
+ scannerName = scannerNameAndRSH.getFirst();
+ rsh = scannerNameAndRSH.getSecond();
}
} catch (IOException e) {
if (e == SCANNER_ALREADY_CLOSED) {
@@ -3479,11 +3519,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
rpcFullScanRequestCount.increment();
}
HRegion region = rsh.r;
- String scannerName = rsh.scannerName;
LeaseManager.Lease lease;
try {
// Remove lease while its being processed in server; protects against case
- // where processing of request takes > lease expiration time.
+ // where processing of request takes > lease expiration time. or null if none found.
lease = regionServer.getLeaseManager().removeLease(scannerName);
} catch (LeaseException e) {
throw new ServiceException(e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
new file mode 100644
index 000000000000..f242ddbb6b57
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRSRpcServices.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test parts of {@link RSRpcServices}
+ */
+@Category({ RegionServerTests.class, MediumTests.class})
+public class TestRSRpcServices {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRSRpcServices.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRSRpcServices.class);
+
+ /**
+ * Simple test of the toString on RegionScannerHolder works.
+ * Just creates one and calls #toString on it.
+ */
+ @Test
+ public void testRegionScannerHolderToString() throws UnknownHostException {
+ RpcCall call = Mockito.mock(RpcCall.class);
+ int port = 1234;
+ Mockito.when(call.getRemotePort()).thenReturn(port);
+ InetAddress address = InetAddress.getLocalHost();
+ Mockito.when(call.getRemoteAddress()).thenReturn(address);
+ RpcServer.setCurrentCall(call);
+ String clientIpAndPort = RSRpcServices.getRemoteClientIpAndPort();
+ HRegion region = Mockito.mock(HRegion.class);
+ Mockito.when(region.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ RSRpcServices.RegionScannerHolder rsh = new RSRpcServices.RegionScannerHolder(null, region,
+ null, null, false, false, clientIpAndPort);
+ LOG.info("rsh={}", rsh);
+ }
+}