Skip to content

Commit

Permalink
HBASE-27519 Another case for FNFE on StoreFileScanner after a flush f…
Browse files Browse the repository at this point in the history
…ollowed by a compaction (#4922)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
comnetwork authored Dec 10, 2022
1 parent dd60d7f commit 01f74ac
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
long getReadPoint();

/**
* Notify observers.
* Notify observers. <br/>
* NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every
* {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a
* concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is
* invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously
* in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or
* {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived.
* @param sfs The new files
* @param memStoreScanners scanner of current memstore
* @throws IOException e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,15 +885,29 @@ private long getTotalSize(Collection<HStoreFile> sfs) {
return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
}

private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException {
// NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
// close {@link DefaultMemStore#snapshot}, which may be used by
// {@link DefaultMemStore#getScanners}.
storeEngine.addStoreFiles(sfs,
snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
// If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
// CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
// HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
// storeEngine lock. see HBASE-27519 for more details.
snapshotId > 0 ? () -> {
this.memstore.clearSnapshot(snapshotId);
HStoreFile.increaseStoreFilesRefeCount(sfs);
} : () -> {
HStoreFile.increaseStoreFilesRefeCount(sfs);
});
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
try {
notifyChangedReadersObservers(sfs);
} finally {
HStoreFile.decreaseStoreFilesRefeCount(sfs);
}
if (LOG.isTraceEnabled()) {
long totalSize = getTotalSize(sfs);
String traceMessage = "FLUSH time,count,size,store size,store files ["
Expand Down Expand Up @@ -961,7 +975,13 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
// for more details.
HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
} finally {
this.storeEngine.readUnlock();
}
Expand All @@ -982,7 +1002,7 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -48,6 +49,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

/**
Expand Down Expand Up @@ -648,4 +651,26 @@ public OptionalLong getMaximumTimestamp() {
Set<String> getCompactedStoreFiles() {
return Collections.unmodifiableSet(this.compactedStoreFiles);
}

long increaseRefCount() {
return this.fileInfo.refCount.incrementAndGet();
}

long decreaseRefCount() {
return this.fileInfo.refCount.decrementAndGet();
}

static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
if (CollectionUtils.isEmpty(storeFiles)) {
return;
}
storeFiles.forEach(HStoreFile::increaseRefCount);
}

static void decreaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
if (CollectionUtils.isEmpty(storeFiles)) {
return;
}
storeFiles.forEach(HStoreFile::decreaseRefCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
Expand All @@ -44,6 +45,7 @@
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -1531,6 +1533,106 @@ public void getScanners(MyStore store) throws IOException {
}
}

/**
* This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
* Compaction execute concurrently and theCcompaction compact and archive the flushed
* {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
* HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
*/
@Test
public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(WALFactory.WAL_ENABLED, false);
conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
byte[] r0 = Bytes.toBytes("row0");
byte[] r1 = Bytes.toBytes("row1");
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
// Initialize region
final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
@Override
public void getScanners(MyStore store) throws IOException {
try {
// Here this method is called by StoreScanner.updateReaders which is invoked by the
// following TestHStore.flushStore
if (shouldWaitRef.get()) {
// wait the following compaction Task start
cyclicBarrier.await();
// wait the following HStore.closeAndArchiveCompactedFiles end.
cyclicBarrier.await();
}
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
});

final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
Runnable compactionTask = () -> {
try {
// Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
// entering the MyStore.getScanners, compactionTask could start.
cyclicBarrier.await();
region.compactStore(family, new NoLimitThroughputController());
myStore.closeAndArchiveCompactedFiles();
// Notify StoreScanner.updateReaders could enter MyStore.getScanners.
cyclicBarrier.await();
} catch (Throwable e) {
compactionExceptionRef.set(e);
}
};

long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
byte[] value = Bytes.toBytes("value");
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(r0, qf1, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf2, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf3, ts, seqId, value), null);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);

myStore.add(createCell(r1, qf1, ts, seqId, value), null);
myStore.add(createCell(r1, qf2, ts, seqId, value), null);
myStore.add(createCell(r1, qf3, ts, seqId, value), null);

Thread.currentThread()
.setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
Scan scan = new Scan();
scan.withStartRow(r0, true);
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
List<Cell> results = new MyList<>(size -> {
switch (size) {
case 1:
shouldWaitRef.set(true);
Thread thread = new Thread(compactionTask);
thread.setName("MyCompacting Thread.");
thread.start();
try {
flushStore(myStore, id++);
thread.join();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
shouldWaitRef.set(false);
break;
default:
break;
}
});
// Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
// which used by StoreScanner.updateReaders is deleted by compactionTask.
scanner.next(results);
// The results is r0 row cells.
assertEquals(3, results.size());
assertTrue(compactionExceptionRef.get() == null);
}
}

@Test
public void testReclaimChunkWhenScaning() throws IOException {
init("testReclaimChunkWhenScaning");
Expand Down

0 comments on commit 01f74ac

Please sign in to comment.