Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26488 Memory leak when MemStore retry flushing #3899

Merged
merged 5 commits into from
Dec 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2404,7 +2404,6 @@ public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
long snapshotId = -1; // -1 means do not drop
if (dropMemstoreSnapshot && snapshot != null) {
snapshotId = snapshot.getId();
snapshot.close();
}
HStore.this.updateStorefiles(storeFiles, snapshotId);
}
Expand All @@ -2415,10 +2414,6 @@ public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
@Override
public void abort() throws IOException {
if (snapshot != null) {
//We need to close the snapshot when aborting, otherwise, the segment scanner
//won't be closed. If we are using MSLAB, the chunk referenced by those scanners
//can't be released, thus memory leak
snapshot.close();
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public String toString() {
return res;
}

/**
* We create a new {@link SnapshotSegmentScanner} to increase the reference count of
* {@link MemStoreLABImpl} used by this segment.
*/
List<KeyValueScanner> getSnapshotScanners() {
return Collections.singletonList(new SnapshotSegmentScanner(this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,38 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;

import java.io.Closeable;
import java.util.List;

/**
* Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
* count of cells in it and total memory size occupied by all the cells, timestamp information of
* all the cells and a scanner to read all cells in it.
* {@link MemStoreSnapshot} is a Context Object to hold details of the snapshot taken on a MemStore.
* Details include the snapshot's identifier, count of cells in it and total memory size occupied by
* all the cells, timestamp information of all the cells and the snapshot immutableSegment.
* <p>
* NOTE:Every time when {@link MemStoreSnapshot#getScanners} is called, we create new
* {@link SnapshotSegmentScanner}s on the {@link MemStoreSnapshot#snapshotImmutableSegment},and
* {@link Segment#incScannerCount} is invoked in the {@link SnapshotSegmentScanner} ctor to increase
* the reference count of {@link MemStoreLAB} which used by
* {@link MemStoreSnapshot#snapshotImmutableSegment}, so after we finish using these scanners, we
* must call their close method to invoke {@link Segment#decScannerCount}.
*/
@InterfaceAudience.Private
public class MemStoreSnapshot implements Closeable {
public class MemStoreSnapshot {
private final long id;
private final int cellsCount;
private final MemStoreSize memStoreSize;
private final TimeRangeTracker timeRangeTracker;
private final List<KeyValueScanner> scanners;
private final boolean tagsPresent;
private final ImmutableSegment snapshotImmutableSegment;

public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
this.id = id;
this.cellsCount = snapshot.getCellsCount();
this.memStoreSize = snapshot.getMemStoreSize();
this.timeRangeTracker = snapshot.getTimeRangeTracker();
this.scanners = snapshot.getSnapshotScanners();
this.tagsPresent = snapshot.isTagsPresent();
this.snapshotImmutableSegment = snapshot;
}

/**
Expand Down Expand Up @@ -74,10 +81,16 @@ public TimeRangeTracker getTimeRangeTracker() {
}

/**
* @return {@link KeyValueScanner} for iterating over the snapshot
* Create new {@link SnapshotSegmentScanner}s for iterating over the snapshot. <br/>
* NOTE:Here when create new {@link SnapshotSegmentScanner}s, {@link Segment#incScannerCount} is
* invoked in the {@link SnapshotSegmentScanner} ctor,so after we use these
* {@link SnapshotSegmentScanner}s, we must call {@link SnapshotSegmentScanner#close} to invoke
* {@link Segment#decScannerCount}.
* @return {@link KeyValueScanner}s(Which type is {@link SnapshotSegmentScanner}) for iterating
* over the snapshot.
*/
public List<KeyValueScanner> getScanners() {
return scanners;
return snapshotImmutableSegment.getSnapshotScanners();
}

/**
Expand All @@ -86,13 +99,4 @@ public List<KeyValueScanner> getScanners() {
public boolean isTagsPresent() {
return this.tagsPresent;
}

@Override
public void close() {
if (this.scanners != null) {
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
Expand Down Expand Up @@ -783,11 +784,12 @@ private void injectFault() throws IOException {
}
}

private static void flushStore(HStore store, long id) throws IOException {
private static StoreFlushContext flushStore(HStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
return storeFlushCtx;
}

/**
Expand Down Expand Up @@ -2222,7 +2224,7 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
flushThread.join();

if (myDefaultMemStore.shouldWait) {
SegmentScanner segmentScanner = getSegmentScanner(storeScanner);
SegmentScanner segmentScanner = getTypeKeyValueScanner(storeScanner, SegmentScanner.class);
MemStoreLABImpl memStoreLAB = (MemStoreLABImpl) (segmentScanner.segment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(!memStoreLAB.chunks.isEmpty());
Expand All @@ -2249,16 +2251,16 @@ public void testClearSnapshotGetScannerConcurrently() throws Exception {
}
}

private SegmentScanner getSegmentScanner(StoreScanner storeScanner) {
List<SegmentScanner> segmentScanners = new ArrayList<SegmentScanner>();
@SuppressWarnings("unchecked")
private <T> T getTypeKeyValueScanner(StoreScanner storeScanner, Class<T> keyValueScannerClass) {
List<T> resultScanners = new ArrayList<T>();
for (KeyValueScanner keyValueScanner : storeScanner.currentScanners) {
if (keyValueScanner instanceof SegmentScanner) {
segmentScanners.add((SegmentScanner) keyValueScanner);
if (keyValueScannerClass.isInstance(keyValueScanner)) {
resultScanners.add((T) keyValueScanner);
}
}

assertTrue(segmentScanners.size() == 1);
return segmentScanners.get(0);
assertTrue(resultScanners.size() == 1);
return resultScanners.get(0);
}

@Test
Expand Down Expand Up @@ -2310,6 +2312,116 @@ public CustomDefaultMemStore(Configuration conf, CellComparator c,

}

/**
* This test is for HBASE-26488
*/
@Test
public void testMemoryLeakWhenFlushMemStoreRetrying() throws Exception {

Configuration conf = HBaseConfiguration.create();

byte[] smallValue = new byte[3];
byte[] largeValue = new byte[9];
final long timestamp = EnvironmentEdgeManager.currentTime();
final long seqId = 100;
final Cell smallCell = createCell(qf1, timestamp, seqId, smallValue);
final Cell largeCell = createCell(qf2, timestamp, seqId, largeValue);
TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
quals.add(qf1);
quals.add(qf2);

conf.set(HStore.MEMSTORE_CLASS_NAME, MyDefaultMemStore1.class.getName());
conf.setBoolean(WALFactory.WAL_ENABLED, false);
conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
MyDefaultStoreFlusher.class.getName());

init(name.getMethodName(), conf, ColumnFamilyDescriptorBuilder.newBuilder(family).build());
MyDefaultMemStore1 myDefaultMemStore = (MyDefaultMemStore1) (store.memstore);
assertTrue((store.storeEngine.getStoreFlusher()) instanceof MyDefaultStoreFlusher);

MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
store.add(smallCell, memStoreSizing);
store.add(largeCell, memStoreSizing);
flushStore(store, id++);

MemStoreLABImpl memStoreLAB =
(MemStoreLABImpl) (myDefaultMemStore.snapshotImmutableSegment.getMemStoreLAB());
assertTrue(memStoreLAB.isClosed());
assertTrue(memStoreLAB.getOpenScannerCount() == 0);
assertTrue(memStoreLAB.isReclaimed());
assertTrue(memStoreLAB.chunks.isEmpty());
StoreScanner storeScanner = null;
try {
storeScanner =
(StoreScanner) store.getScanner(new Scan(new Get(row)), quals, seqId + 1);
assertTrue(store.storeEngine.getStoreFileManager().getStorefileCount() == 1);
assertTrue(store.memstore.size().getCellsCount() == 0);
assertTrue(store.memstore.getSnapshotSize().getCellsCount() == 0);
assertTrue(storeScanner.currentScanners.size() == 1);
assertTrue(storeScanner.currentScanners.get(0) instanceof StoreFileScanner);

List<Cell> results = new ArrayList<>();
storeScanner.next(results);
assertEquals(2, results.size());
CellUtil.equals(smallCell, results.get(0));
CellUtil.equals(largeCell, results.get(1));
} finally {
if (storeScanner != null) {
storeScanner.close();
}
}
}


static class MyDefaultMemStore1 extends DefaultMemStore {

private ImmutableSegment snapshotImmutableSegment;

public MyDefaultMemStore1(Configuration conf, CellComparator c,
RegionServicesForStores regionServices) {
super(conf, c, regionServices);
}

@Override
public MemStoreSnapshot snapshot() {
MemStoreSnapshot result = super.snapshot();
this.snapshotImmutableSegment = snapshot;
return result;
}

}

public static class MyDefaultStoreFlusher extends DefaultStoreFlusher {
private static final AtomicInteger failCounter = new AtomicInteger(1);
private static final AtomicInteger counter = new AtomicInteger(0);

public MyDefaultStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
}

@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
counter.incrementAndGet();
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
}

@Override
protected void performFlush(InternalScanner scanner, final CellSink sink,
ThroughputController throughputController) throws IOException {

final int currentCount = counter.get();
CellSink newCellSink = (cell) -> {
if (currentCount <= failCounter.get()) {
throw new IOException("Simulated exception by tests");
}
sink.append(cell);
};
super.performFlush(scanner, newCellSink, throughputController);
}
}

private HStoreFile mockStoreFileWithLength(long length) {
HStoreFile sf = mock(HStoreFile.class);
StoreFileReader sfr = mock(StoreFileReader.class);
Expand Down Expand Up @@ -3093,7 +3205,5 @@ protected void doClearSnapShot() {
}
}
}


}
}