Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28595: check seq id of scan RPCs for closed scanners (#5910) #5924

Merged
merged 1 commit into from
May 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,11 @@ public class RSRpcServices

private ScannerIdGenerator scannerIdGenerator;
private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
// Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
// which may send next or close request to a region scanner which has already been exhausted. The
// entries will be removed automatically after scannerLeaseTimeoutPeriod.
private final Cache<String, String> closedScanners;
// Hold the name and last sequence number of a closed scanner for a while. This is used
// to keep compatible for old clients which may send next or close request to a region
// scanner which has already been exhausted. The entries will be removed automatically
// after scannerLeaseTimeoutPeriod.
private final Cache<String, Long> closedScanners;
/**
* The lease timeout period for client scanners (milliseconds).
*/
Expand Down Expand Up @@ -3127,8 +3128,18 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
RegionScannerHolder rsh = this.scanners.get(scannerName);
if (rsh == null) {
// just ignore the next or close request if scanner does not exists.
if (closedScanners.getIfPresent(scannerName) != null) {
throw SCANNER_ALREADY_CLOSED;
Long lastCallSeq = closedScanners.getIfPresent(scannerName);
if (lastCallSeq != null) {
// Check the sequence number to catch if the last call was incorrectly retried.
// The only allowed scenario is when the scanner is exhausted and one more scan
// request arrives - in this case returning 0 rows is correct.
if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
+ (lastCallSeq + 1) + " But the nextCallSeq got from client: "
+ request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
} else {
throw SCANNER_ALREADY_CLOSED;
}
} else {
LOG.warn("Client tried to access missing scanner " + scannerName);
throw new UnknownScannerException(
Expand Down Expand Up @@ -3699,7 +3710,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
scannerClosed = true;
closeScanner(region, scanner, scannerName, rpcCall);
closeScanner(region, scanner, scannerName, rpcCall, false);
}

// There's no point returning to a timed out client. Throwing ensures scanner is closed
Expand All @@ -3715,7 +3726,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
// The scanner state might be left in a dirty state, so we will tell the Client to
// fail this RPC and close the scanner while opening up another one from the start of
// row that the client has last seen.
closeScanner(region, scanner, scannerName, rpcCall);
closeScanner(region, scanner, scannerName, rpcCall, true);

// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
// used in two different semantics.
Expand Down Expand Up @@ -3779,7 +3790,7 @@ private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException
}

private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
RpcCallContext context) throws IOException {
RpcCallContext context, boolean isError) throws IOException {
if (region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
// bypass the actual close.
Expand All @@ -3796,7 +3807,9 @@ private void closeScanner(HRegion region, RegionScanner scanner, String scannerN
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
closedScanners.put(scannerName, scannerName);
if (!isError) {
closedScanners.put(scannerName, rsh.getNextCallSeq());
}
}
}

Expand Down