Skip to content

Commit

Permalink
HBASE-27795: Define RPC API for cache cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanmukha Kota committed Nov 17, 2023
1 parent 69d980a commit f721c7c
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2615,4 +2615,13 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server
* Flush master local region
*/
void flushMasterStore() throws IOException;

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param serverName ServerName
* @return A map of filename and number of blocks evicted.
* @throws IOException if a remote or network exception occurs
*/
Map<String, Integer> uncacheStaleBlocks(ServerName serverName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1115,4 +1115,9 @@ public List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType,
public void flushMasterStore() throws IOException {
get(admin.flushMasterStore());
}

@Override
public Map<String, Integer> uncacheStaleBlocks(ServerName serverName) throws IOException {
return get(admin.uncacheStaleBlocks(serverName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1837,4 +1837,12 @@ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, Str
* Flush master local region
*/
CompletableFuture<Void> flushMasterStore();

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param serverName ServerName
* @return A map of filename and number of blocks evicted.
*/
CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -990,4 +990,9 @@ public CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNam
public CompletableFuture<Void> flushMasterStore() {
return wrap(rawAdmin.flushMasterStore());
}

@Override
public CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName) {
return wrap(rawAdmin.uncacheStaleBlocks(serverName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
Expand Down Expand Up @@ -4453,4 +4455,15 @@ Void> call(controller, stub, request.build(),
(s, c, req, done) -> s.flushMasterStore(c, req, done), resp -> null))
.call();
}

@Override
public CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName) {
UncacheStaleBlocksRequest.Builder request = UncacheStaleBlocksRequest.newBuilder();
return this.<Map<String, Integer>> newAdminCaller()
.action((controller, stub) -> this.<UncacheStaleBlocksRequest, UncacheStaleBlocksResponse,
Map<String, Integer>> adminCall(controller, stub, request.build(),
(s, c, req, done) -> s.uncacheStaleBlocks(c, req, done),
resp -> resp.getUncachedFilesMap()))
.serverName(serverName).call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
Expand Down Expand Up @@ -3767,4 +3769,20 @@ public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T>
return parser.parseFrom(bytes);
}
}

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
*/
public static Map<String, Integer> uncacheStaleBlocks(final RpcController controller,
final AdminService.BlockingInterface admin) throws IOException {
UncacheStaleBlocksRequest request = UncacheStaleBlocksRequest.newBuilder().build();
UncacheStaleBlocksResponse response = null;
try {
response = admin.uncacheStaleBlocks(controller, request);
} catch (ServiceException se) {
throw getRemoteException(se);
}
return response.getUncachedFilesMap();
}
}
10 changes: 10 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ message ClearSlowLogResponses {
required bool is_cleaned = 1;
}

message UncacheStaleBlocksRequest {
}

message UncacheStaleBlocksResponse {
map<string, int32> uncached_files = 1;
}

service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
Expand Down Expand Up @@ -405,4 +412,7 @@ service AdminService {
rpc GetLogEntries(LogRequest)
returns(LogEntry);

rpc UncacheStaleBlocks(UncacheStaleBlocksRequest)
returns(UncacheStaleBlocksResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.io.hfile;

import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -161,4 +164,14 @@ default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repe
default boolean isMetaBlock(BlockType blockType) {
return blockType != null && blockType.getCategory() != BlockType.BlockCategory.DATA;
}

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param server HRegionServer
* @return A map of filename and number of blocks evicted.
*/
default Optional<Map<String, Integer>> uncacheStaleBlocks(HRegionServer server) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -400,4 +404,13 @@ public FirstLevelBlockCache getFirstLevelCache() {
public BlockCache getSecondLevelCache() {
return l2Cache;
}

@Override
public Optional<Map<String, Integer>> uncacheStaleBlocks(HRegionServer server) {
Map<String, Integer> uncachedStaleBlocksMap =
l1Cache.uncacheStaleBlocks(server).orElseGet(HashMap::new);
l2Cache.uncacheStaleBlocks(server).ifPresent(
map2 -> map2.forEach((key, value) -> uncachedStaleBlocksMap.merge(key, value, Integer::sum)));
return Optional.of(uncachedStaleBlocksMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
Expand All @@ -75,6 +77,7 @@
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -2002,4 +2005,27 @@ public void fileCacheCompleted(Path filePath, long size) {
regionCachedSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize);
}

@Override
public Optional<Map<String, Integer>> uncacheStaleBlocks(HRegionServer server) {
Map<String, Integer> evictedFilesWithStaleBlocks = new HashMap<>();

fullyCachedFiles.forEach((fileName, value) -> {
int blocksEvicted;
try {
if (!server.getRegionByEncodedName(value.getFirst()).isAvailable()) {
blocksEvicted = evictBlocksByHfileName(fileName);
} else {
blocksEvicted = 0;
}
} catch (NotServingRegionException nsre) {
LOG.debug(
"Evicting blocks for file {} as the region {} is not served by the Region Server {} anymore.",
fileName, fullyCachedFiles.get(fileName).getFirst(),
server.getServerName().getServerName());
blocksEvicted = evictBlocksByHfileName(fileName);
}
evictedFilesWithStaleBlocks.put(fileName, blocksEvicted);
});
return Optional.of(evictedFilesWithStaleBlocks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
Expand Down Expand Up @@ -3609,4 +3611,10 @@ public FlushTableResponse flushTable(RpcController controller, FlushTableRequest
throw new ServiceException(ioe);
}
}

@Override
public UncacheStaleBlocksResponse uncacheStaleBlocks(RpcController controller,
UncacheStaleBlocksRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
Expand Down Expand Up @@ -3933,4 +3935,15 @@ public void onConfigurationChange(Configuration conf) {
super.onConfigurationChange(conf);
setReloadableGuardrails(conf);
}

@Override
public UncacheStaleBlocksResponse uncacheStaleBlocks(RpcController controller,
UncacheStaleBlocksRequest request) throws ServiceException {
UncacheStaleBlocksResponse.Builder responseBuilder = UncacheStaleBlocksResponse.newBuilder();
Map<String, Integer> evictedFilesWithStaleBlocks = new HashMap<>();
server.getBlockCache().flatMap(bc -> bc.uncacheStaleBlocks(server))
.ifPresent(evictedFilesWithStaleBlocks::putAll);
responseBuilder.putAllUncachedFiles(evictedFilesWithStaleBlocks);
return responseBuilder.build();
}
}
Loading

0 comments on commit f721c7c

Please sign in to comment.