Skip to content

Commit

Permalink
HBASE-26674 Should modify filesCompacting under storeWriteLock (#4040)
Browse files Browse the repository at this point in the history
Signed-off-by: Josh Elser <[email protected]>
  • Loading branch information
Apache9 authored Jan 19, 2022
1 parent 4a94cfc commit d63ca4f
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1230,13 +1230,14 @@ private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
allowedOnPath = ".*/(HStore|TestHStore).java")
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
boolean writeCompactionMarker) throws IOException {
storeEngine.replaceStoreFiles(compactedFiles, result);
storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
synchronized(filesCompacting) {
filesCompacting.removeAll(compactedFiles);
}
});
if (writeCompactionMarker) {
writeCompactionWalRecord(compactedFiles, result);
}
synchronized (filesCompacting) {
filesCompacting.removeAll(compactedFiles);
}
// These may be null when the RS is shutting down. The space quota Chores will fix the Region
// sizes later so it's not super-critical if we miss these.
RegionServerServices rsServices = region.getRegionServerServices();
Expand Down Expand Up @@ -1567,7 +1568,7 @@ public void cancelRequestedCompaction(CompactionContext compaction) {
finishCompactionRequest(compaction.getRequest());
}

protected void finishCompactionRequest(CompactionRequestImpl cr) {
private void finishCompactionRequest(CompactionRequestImpl cr) {
this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
if (cr.isOffPeak()) {
offPeakCompactionTracker.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);

// propogate the file changes to the underlying store file manager
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); // won't throw an exception
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
}); // won't throw an exception
}

/**
Expand Down Expand Up @@ -493,12 +494,13 @@ public void addStoreFiles(Collection<HStoreFile> storeFiles,
}

public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
Collection<HStoreFile> newFiles) throws IOException {
Collection<HStoreFile> newFiles, Runnable actionUnderLock) throws IOException {
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
StoreUtils.toStoreFileInfo(newFiles));
writeLock();
try {
storeFileManager.addCompactionResults(compactedFiles, newFiles);
actionUnderLock.run();
} finally {
writeUnlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,14 +1033,14 @@ public void testRefreshStoreFilesNotChanged() throws IOException {
// call first time after files changed
spiedStoreEngine.refreshStoreFiles();
assertEquals(2, this.store.getStorefilesCount());
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any());

// call second time
spiedStoreEngine.refreshStoreFiles();

// ensure that replaceStoreFiles is not called, i.e, the times does not change, if files are not
// refreshed,
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any(), any());
}

private long countMemStoreScanner(StoreScanner scanner) {
Expand Down

0 comments on commit d63ca4f

Please sign in to comment.