Skip to content

Commit

Permalink
HBASE-28504 Implement eviction logic for scanners in Rest APIs to pre…
Browse files Browse the repository at this point in the history
…vent scanner leakage (#5802)


Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
stoty authored Apr 15, 2024
1 parent 5a8ae13 commit 4b7a39a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
4 changes: 4 additions & 0 deletions hbase-rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public interface Constants {
String REST_DNS_NAMESERVER = "hbase.rest.dns.nameserver";
String REST_DNS_INTERFACE = "hbase.rest.dns.interface";

String REST_SCANNERCACHE_SIZE = "hbase.rest.scannercache.size";
final int DEFAULT_REST_SCANNERCACHE_SIZE = 10000;

String REST_SCANNERCACHE_EXPIRE_TIME = "hbase.rest.scannercache.expire.time";
final long DEFAULT_REST_SCANNERCACHE_EXPIRE_TIME_MS = 60 * 60 * 1000;

String FILTER_CLASSES = "hbase.rest.filter.classes";
String SCAN_START_ROW = "startrow";
String SCAN_END_ROW = "endrow";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.rest.model.ScannerModel;
Expand All @@ -46,9 +49,7 @@ public class ScannerResource extends ResourceBase {

private static final Logger LOG = LoggerFactory.getLogger(ScannerResource.class);

static final Map<String, ScannerInstanceResource> scanners =
Collections.synchronizedMap(new HashMap<String, ScannerInstanceResource>());

private static final Cache<String, ScannerInstanceResource> scanners = setupScanners();
TableResource tableResource;

/**
Expand All @@ -59,8 +60,23 @@ public ScannerResource(TableResource tableResource) throws IOException {
this.tableResource = tableResource;
}

private static Cache<String, ScannerInstanceResource> setupScanners() {
final Configuration conf = HBaseConfiguration.create();

int size = conf.getInt(REST_SCANNERCACHE_SIZE, DEFAULT_REST_SCANNERCACHE_SIZE);
long evictTimeoutMs = conf.getTimeDuration(REST_SCANNERCACHE_EXPIRE_TIME,
DEFAULT_REST_SCANNERCACHE_EXPIRE_TIME_MS, TimeUnit.MILLISECONDS);

Cache<String, ScannerInstanceResource> cache =
Caffeine.newBuilder().removalListener(ScannerResource::removalListener).maximumSize(size)
.expireAfterAccess(evictTimeoutMs, TimeUnit.MILLISECONDS)
.<String, ScannerInstanceResource> build();

return cache;
}

static boolean delete(final String id) {
ScannerInstanceResource instance = scanners.remove(id);
ScannerInstanceResource instance = scanners.asMap().remove(id);
if (instance != null) {
instance.generator.close();
return true;
Expand All @@ -69,6 +85,12 @@ static boolean delete(final String id) {
}
}

static void removalListener(String key, ScannerInstanceResource value, RemovalCause cause) {
if (cause.wasEvicted()) {
delete(key);
}
}

Response update(final ScannerModel model, final boolean replace, final UriInfo uriInfo) {
servlet.getMetrics().incrementRequests(1);
if (servlet.isReadOnly()) {
Expand Down Expand Up @@ -140,7 +162,7 @@ public Response post(final ScannerModel model, final @Context UriInfo uriInfo) {
@Path("{scanner: .+}")
public ScannerInstanceResource getScannerInstanceResource(final @PathParam("scanner") String id)
throws IOException {
ScannerInstanceResource instance = scanners.get(id);
ScannerInstanceResource instance = scanners.getIfPresent(id);
if (instance == null) {
servlet.getMetrics().incrementFailedGetRequests(1);
return new ScannerInstanceResource();
Expand Down

0 comments on commit 4b7a39a

Please sign in to comment.