Skip to content

Commit

Permalink
HBASE-28595: check seq id of scan RPCs for closed scanners (#5910) (#…
Browse files Browse the repository at this point in the history
…5924)

Signed-off-by: Duo Zhang <[email protected]>
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Co-authored-by: csringhofer <[email protected]>
  • Loading branch information
taklwu and csringhofer authored May 18, 2024
1 parent e7d0bea commit 8acfc07
Showing 1 changed file with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,11 @@ public class RSRpcServices

private ScannerIdGenerator scannerIdGenerator;
private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
// Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
// which may send next or close request to a region scanner which has already been exhausted. The
// entries will be removed automatically after scannerLeaseTimeoutPeriod.
private final Cache<String, String> closedScanners;
// Hold the name and last sequence number of a closed scanner for a while. This is used
// to keep compatible for old clients which may send next or close request to a region
// scanner which has already been exhausted. The entries will be removed automatically
// after scannerLeaseTimeoutPeriod.
private final Cache<String, Long> closedScanners;
/**
* The lease timeout period for client scanners (milliseconds).
*/
Expand Down Expand Up @@ -3127,8 +3128,18 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
RegionScannerHolder rsh = this.scanners.get(scannerName);
if (rsh == null) {
// just ignore the next or close request if scanner does not exists.
if (closedScanners.getIfPresent(scannerName) != null) {
throw SCANNER_ALREADY_CLOSED;
Long lastCallSeq = closedScanners.getIfPresent(scannerName);
if (lastCallSeq != null) {
// Check the sequence number to catch if the last call was incorrectly retried.
// The only allowed scenario is when the scanner is exhausted and one more scan
// request arrives - in this case returning 0 rows is correct.
if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
+ (lastCallSeq + 1) + " But the nextCallSeq got from client: "
+ request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
} else {
throw SCANNER_ALREADY_CLOSED;
}
} else {
LOG.warn("Client tried to access missing scanner " + scannerName);
throw new UnknownScannerException(
Expand Down Expand Up @@ -3699,7 +3710,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
scannerClosed = true;
closeScanner(region, scanner, scannerName, rpcCall);
closeScanner(region, scanner, scannerName, rpcCall, false);
}

// There's no point returning to a timed out client. Throwing ensures scanner is closed
Expand All @@ -3715,7 +3726,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
// The scanner state might be left in a dirty state, so we will tell the Client to
// fail this RPC and close the scanner while opening up another one from the start of
// row that the client has last seen.
closeScanner(region, scanner, scannerName, rpcCall);
closeScanner(region, scanner, scannerName, rpcCall, true);

// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
// used in two different semantics.
Expand Down Expand Up @@ -3779,7 +3790,7 @@ private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException
}

private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
RpcCallContext context) throws IOException {
RpcCallContext context, boolean isError) throws IOException {
if (region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
// bypass the actual close.
Expand All @@ -3796,7 +3807,9 @@ private void closeScanner(HRegion region, RegionScanner scanner, String scannerN
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
closedScanners.put(scannerName, scannerName);
if (!isError) {
closedScanners.put(scannerName, rsh.getNextCallSeq());
}
}
}

Expand Down

0 comments on commit 8acfc07

Please sign in to comment.