Skip to content

Commit

Permalink
HBASE-26488 Memory leak when MemStore retry flushing (apache#3899)
Browse files Browse the repository at this point in the history
  • Loading branch information
lijinbin committed Jan 13, 2022
1 parent 5b49795 commit f116558
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2531,7 +2531,6 @@ public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
long snapshotId = -1; // -1 means do not drop
if (dropMemstoreSnapshot && snapshot != null) {
snapshotId = snapshot.getId();
snapshot.close();
}
HStore.this.updateStorefiles(storeFiles, snapshotId);
}
Expand All @@ -2542,10 +2541,6 @@ public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
@Override
public void abort() throws IOException {
if (snapshot != null) {
//We need to close the snapshot when aborting, otherwise, the segment scanner
//won't be closed. If we are using MSLAB, the chunk referenced by those scanners
//can't be released, thus memory leak
snapshot.close();
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public String toString() {
return res;
}

/**
* We create a new {@link SnapshotSegmentScanner} to increase the reference count of
* {@link MemStoreLABImpl} used by this segment.
*/
List<KeyValueScanner> getSnapshotScanners() {
return Collections.singletonList(new SnapshotSegmentScanner(this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,38 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;

import java.io.Closeable;
import java.util.List;

/**
* Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
* count of cells in it and total memory size occupied by all the cells, timestamp information of
* all the cells and a scanner to read all cells in it.
* {@link MemStoreSnapshot} is a Context Object to hold details of the snapshot taken on a MemStore.
* Details include the snapshot's identifier, count of cells in it and total memory size occupied by
* all the cells, timestamp information of all the cells and the snapshot immutableSegment.
* <p>
* NOTE:Every time when {@link MemStoreSnapshot#getScanners} is called, we create new
* {@link SnapshotSegmentScanner}s on the {@link MemStoreSnapshot#snapshotImmutableSegment},and
* {@link Segment#incScannerCount} is invoked in the {@link SnapshotSegmentScanner} ctor to increase
* the reference count of {@link MemStoreLAB} which used by
* {@link MemStoreSnapshot#snapshotImmutableSegment}, so after we finish using these scanners, we
* must call their close method to invoke {@link Segment#decScannerCount}.
*/
@InterfaceAudience.Private
public class MemStoreSnapshot implements Closeable {
public class MemStoreSnapshot {
private final long id;
private final int cellsCount;
private final MemStoreSize memStoreSize;
private final TimeRangeTracker timeRangeTracker;
private final List<KeyValueScanner> scanners;
private final boolean tagsPresent;
private final ImmutableSegment snapshotImmutableSegment;

public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
this.id = id;
this.cellsCount = snapshot.getCellsCount();
this.memStoreSize = snapshot.getMemStoreSize();
this.timeRangeTracker = snapshot.getTimeRangeTracker();
this.scanners = snapshot.getSnapshotScanners();
this.tagsPresent = snapshot.isTagsPresent();
this.snapshotImmutableSegment = snapshot;
}

/**
Expand Down Expand Up @@ -74,10 +81,16 @@ public TimeRangeTracker getTimeRangeTracker() {
}

/**
* @return {@link KeyValueScanner} for iterating over the snapshot
* Create new {@link SnapshotSegmentScanner}s for iterating over the snapshot. <br/>
* NOTE:Here when create new {@link SnapshotSegmentScanner}s, {@link Segment#incScannerCount} is
* invoked in the {@link SnapshotSegmentScanner} ctor,so after we use these
* {@link SnapshotSegmentScanner}s, we must call {@link SnapshotSegmentScanner#close} to invoke
* {@link Segment#decScannerCount}.
* @return {@link KeyValueScanner}s(Which type is {@link SnapshotSegmentScanner}) for iterating
* over the snapshot.
*/
public List<KeyValueScanner> getScanners() {
return scanners;
return snapshotImmutableSegment.getSnapshotScanners();
}

/**
Expand All @@ -86,13 +99,4 @@ public List<KeyValueScanner> getScanners() {
public boolean isTagsPresent() {
return this.tagsPresent;
}

@Override
public void close() {
if (this.scanners != null) {
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
Expand Down Expand Up @@ -780,11 +781,12 @@ private void injectFault() throws IOException {
}
}

private static void flushStore(HStore store, long id) throws IOException {
private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
return storeFlushCtx;
}

/**
Expand Down Expand Up @@ -1807,7 +1809,7 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
flushThread.join();

if (myDefaultMemStore.shouldWait) {
SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(!memStoreLAB.chunks.isEmpty());
Expand All @@ -1834,16 +1836,16 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
}
}

private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
@SuppressWarnings("unchecked")
private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
List<T> resultScanners = new ArrayList<T>();
for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
if (keyValueScanner instanceof SegmentScanner) {
segmentScanners.add((SegmentScanner) keyValueScanner);
if (keyValueScannerClass.isInstance(keyValueScanner)) {
resultScanners.add((T) keyValueScanner);
}
}

assertTrue(segmentScanners.size() == 1);
return segmentScanners.get(0);
assertTrue(resultScanners.size() == 1);
return resultScanners.get(0);
}

/**
Expand Down Expand Up @@ -1872,6 +1874,116 @@ public CustomDefaultMemStore(Configuration conf, CellComparator c,

}

/**
* This test is for HBASE-26488
*/
@Test
public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {

Configuration conf = HBaseConfiguration.create();

byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);

conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
conf.setBoolean(WALFactory.WAL_ENABLED, false);
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
MyDefaultStoreFlusher.class.getName());

init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);

MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
store.add(smallCell, memStoreSizing);
store.add(largeCell, memStoreSizing);
flushStore(store, id++);

MemStoreLABImpl memStoreLAB =
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(memStoreLAB.getOpenScannerCount() == 0);
assertTrue(memStoreLAB.isReclaimed());
assertTrue(memStoreLAB.chunks.isEmpty());
StoreScanner storeScanner = null;
try {
storeScanner =
(StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
assertTrue(store.memstore.size().getCellsCount() == 0);
assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
assertTrue(storeScanner.currentScanners.size() == 1);
assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);

List<Cell> results = new ArrayList<>();
storeScanner.next(results);
assertEquals(2, results.size());
CellUtil.equals(smallCell, results.get(0));
CellUtil.equals(largeCell, results.get(1));
} finally {
if (storeScanner != null) {
storeScanner.close();
}
}
}


static class MyDefaultMemStore1 extends DefaultMemStore {

private ImmutableSegment snapshotImmutableSegment;

public MyDefaultMemStore1(Configuration conf, CellComparator c,
RegionServicesForStores regionServices) {
super(conf, c, regionServices);
}

@Override
public MemStoreSnapshot snapshot() {
MemStoreSnapshot result = super.snapshot();
this.snapshotImmutableSegment = snapshot;
return result;
}

}

public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
private static final AtomicInteger failCounter = new AtomicInteger(1);
private static final AtomicInteger counter = new AtomicInteger(0);

public MyDefaultStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
}

@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
counter.incrementAndGet();
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
}

@Override
protected void performFlush(InternalScanner scanner, final CellSink sink,
ThroughputController throughputController) throws IOException {

final int currentCount = counter.get();
CellSink newCellSink = (cell) -> {
if (currentCount <= failCounter.get()) {
throw new IOException("Simulated exception by tests");
}
sink.append(cell);
};
super.performFlush(scanner, newCellSink, throughputController);
}
}

private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
Expand Down Expand Up @@ -2176,7 +2288,5 @@ protected void doClearSnapShot() {
}
}
}


}
}

0 comments on commit f116558

Please sign in to comment.