Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27519 Another case for FNFE on StoreFileScanner after a flush f… #4922

Merged
merged 1 commit into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ public interface ChangedReadersObserver {
long getReadPoint();

/**
* Notify observers.
* Notify observers. <br/>
* NOTE:Before we invoke this method,{@link HStoreFile#increaseRefCount} is invoked for every
* {@link HStoreFile} in 'sfs' input parameter to prevent {@link HStoreFile} is archived after a
* concurrent compaction, and after this method is invoked,{@link HStoreFile#decreaseRefCount} is
* invoked.So if you open the {@link StoreFileReader} or {@link StoreFileScanner} asynchronously
* in this method,you may need to invoke {@link HStoreFile#increaseRefCount} or
* {@link HStoreFile#decreaseRefCount} by yourself to prevent the {@link HStoreFile}s be archived.
* @param sfs The new files
* @param memStoreScanners scanner of current memstore
* @throws IOException e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,15 +885,29 @@ private long getTotalSize(Collection<HStoreFile> sfs) {
return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
}

private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws IOException {
private boolean completeFlush(final List<HStoreFile> sfs, long snapshotId) throws IOException {
// NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
// close {@link DefaultMemStore#snapshot}, which may be used by
// {@link DefaultMemStore#getScanners}.
storeEngine.addStoreFiles(sfs,
snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them in HStore.notifyChangedReadersObservers.
// If we don't increase the refCount here, HStore.closeAndArchiveCompactedFiles called by
// CompactedHFilesDischarger may archive the storeFiles after a concurrent compaction.Because
// HStore.requestCompaction is under storeEngine lock, so here we increase the refCount under
// storeEngine lock. see HBASE-27519 for more details.
snapshotId > 0 ? () -> {
this.memstore.clearSnapshot(snapshotId);
HStoreFile.increaseStoreFilesRefeCount(sfs);
} : () -> {
HStoreFile.increaseStoreFilesRefeCount(sfs);
});
// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
try {
notifyChangedReadersObservers(sfs);
} finally {
HStoreFile.decreaseStoreFilesRefeCount(sfs);
}
if (LOG.isTraceEnabled()) {
long totalSize = getTotalSize(sfs);
String traceMessage = "FLUSH time,count,size,store size,store files ["
Expand Down Expand Up @@ -961,7 +975,13 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
storeFilesToScan = this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
includeStartRow, stopRow, includeStopRow);
memStoreScanners = this.memstore.getScanners(readPt);
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.incrementAndGet());
// NOTE: here we must increase the refCount for storeFiles because we would open the
// storeFiles and get the StoreFileScanners for them.If we don't increase the refCount here,
// HStore.closeAndArchiveCompactedFiles called by CompactedHFilesDischarger may archive the
// storeFiles after a concurrent compaction.Because HStore.requestCompaction is under
// storeEngine lock, so here we increase the refCount under storeEngine lock. see HBASE-27484
// for more details.
HStoreFile.increaseStoreFilesRefeCount(storeFilesToScan);
} finally {
this.storeEngine.readUnlock();
}
Expand All @@ -982,7 +1002,7 @@ public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread,
clearAndClose(memStoreScanners);
throw t instanceof IOException ? (IOException) t : new IOException(t);
} finally {
storeFilesToScan.stream().forEach(f -> f.getFileInfo().refCount.decrementAndGet());
HStoreFile.decreaseStoreFilesRefeCount(storeFilesToScan);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -48,6 +49,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

/**
Expand Down Expand Up @@ -648,4 +651,26 @@ public OptionalLong getMaximumTimestamp() {
Set<String> getCompactedStoreFiles() {
return Collections.unmodifiableSet(this.compactedStoreFiles);
}

long increaseRefCount() {
return this.fileInfo.refCount.incrementAndGet();
}

long decreaseRefCount() {
return this.fileInfo.refCount.decrementAndGet();
}

static void increaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
if (CollectionUtils.isEmpty(storeFiles)) {
return;
}
storeFiles.forEach(HStoreFile::increaseRefCount);
}

static void decreaseStoreFilesRefeCount(Collection<HStoreFile> storeFiles) {
if (CollectionUtils.isEmpty(storeFiles)) {
return;
}
storeFiles.forEach(HStoreFile::decreaseRefCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.security.PrivilegedExceptionAction;
Expand All @@ -44,6 +45,7 @@
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -1531,6 +1533,106 @@ public void getScanners(MyStore store) throws IOException {
}
}

/**
* This test is for HBASE-27519, when the {@link StoreScanner} is scanning,the Flush and the
* Compaction execute concurrently and theCcompaction compact and archive the flushed
* {@link HStoreFile} which is used by {@link StoreScanner#updateReaders}.Before
* HBASE-27519,{@link StoreScanner.updateReaders} would throw {@link FileNotFoundException}.
*/
@Test
public void testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(WALFactory.WAL_ENABLED, false);
conf.set(DEFAULT_COMPACTION_POLICY_CLASS_KEY, EverythingPolicy.class.getName());
byte[] r0 = Bytes.toBytes("row0");
byte[] r1 = Bytes.toBytes("row1");
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
final AtomicBoolean shouldWaitRef = new AtomicBoolean(false);
// Initialize region
final MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
@Override
public void getScanners(MyStore store) throws IOException {
try {
// Here this method is called by StoreScanner.updateReaders which is invoked by the
// following TestHStore.flushStore
if (shouldWaitRef.get()) {
// wait the following compaction Task start
cyclicBarrier.await();
// wait the following HStore.closeAndArchiveCompactedFiles end.
cyclicBarrier.await();
}
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
});

final AtomicReference<Throwable> compactionExceptionRef = new AtomicReference<Throwable>(null);
Runnable compactionTask = () -> {
try {
// Only when the StoreScanner.updateReaders invoked by TestHStore.flushStore prepares for
// entering the MyStore.getScanners, compactionTask could start.
cyclicBarrier.await();
region.compactStore(family, new NoLimitThroughputController());
myStore.closeAndArchiveCompactedFiles();
// Notify StoreScanner.updateReaders could enter MyStore.getScanners.
cyclicBarrier.await();
} catch (Throwable e) {
compactionExceptionRef.set(e);
}
};

long ts = EnvironmentEdgeManager.currentTime();
long seqId = 100;
byte[] value = Bytes.toBytes("value");
// older data whihc shouldn't be "seen" by client
myStore.add(createCell(r0, qf1, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf2, ts, seqId, value), null);
flushStore(myStore, id++);
myStore.add(createCell(r0, qf3, ts, seqId, value), null);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);

myStore.add(createCell(r1, qf1, ts, seqId, value), null);
myStore.add(createCell(r1, qf2, ts, seqId, value), null);
myStore.add(createCell(r1, qf3, ts, seqId, value), null);

Thread.currentThread()
.setName("testStoreScannerUpdateReadersWhenFlushAndCompactConcurrently thread");
Scan scan = new Scan();
scan.withStartRow(r0, true);
try (InternalScanner scanner = (InternalScanner) myStore.getScanner(scan, quals, seqId)) {
List<Cell> results = new MyList<>(size -> {
switch (size) {
case 1:
shouldWaitRef.set(true);
Thread thread = new Thread(compactionTask);
thread.setName("MyCompacting Thread.");
thread.start();
try {
flushStore(myStore, id++);
thread.join();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
shouldWaitRef.set(false);
break;
default:
break;
}
});
// Before HBASE-27519, here would throw java.io.FileNotFoundException because the storeFile
// which used by StoreScanner.updateReaders is deleted by compactionTask.
scanner.next(results);
// The results is r0 row cells.
assertEquals(3, results.size());
assertTrue(compactionExceptionRef.get() == null);
}
}

@Test
public void testReclaimChunkWhenScaning() throws IOException {
init("testReclaimChunkWhenScaning");
Expand Down