Skip to content

Commit

Permalink
HBASE-28401 Introduce a close method for memstore for release active …
Browse files Browse the repository at this point in the history
…segment
  • Loading branch information
Apache9 committed Feb 26, 2024
1 parent c4a02f7 commit d0a029d
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ ImmutableSegment getSnapshot() {
return snapshot;
}

@Override
public void close() {
// active should never be null
active.close();
// for snapshot, either it is empty, where we do not reference any real segment which contains a
// memstore lab, or it is during snapshot, where we will clear it when calling clearSnapshot, so
// we do not need to close it here
}

/** Returns an ordered list of segments from most recent to oldest in memstore */
protected abstract List<Segment> getSegments() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ private void bulkLoadHFile(HStoreFile sf) throws IOException {
}

private ImmutableCollection<HStoreFile> closeWithoutLock() throws IOException {
memstore.close();
// Clear so metrics doesn't find them.
ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles();
Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -31,7 +32,7 @@
* </p>
*/
@InterfaceAudience.Private
public interface MemStore {
public interface MemStore extends Closeable {

/**
* Creates a snapshot of the current memstore. Snapshot must be cleared by call to
Expand Down Expand Up @@ -131,4 +132,15 @@ default void startReplayingFromWAL() {
default void stopReplayingFromWAL() {
return;
}

/**
* Close the memstore.
* <p>
* Usually this should only be called when there is nothing in the memstore, unless we are going
* to abort ourselves.
* <p>
* For normal cases, this method is only used to fix the reference counting, see HBASE-27941.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,8 @@ public HRegion createLocalHRegion(RegionInfo info, TableDescriptor desc) throws
*/
public HRegion createLocalHRegion(RegionInfo info, Configuration conf, TableDescriptor desc,
WAL wal) throws IOException {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
return HRegion.createHRegion(info, getDataTestDir(), conf, desc, wal);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
Expand Down Expand Up @@ -72,6 +74,8 @@ public void before() throws IOException {
TableDescriptor td = TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfd).build();
RegionInfo ri = RegionInfoBuilder.newBuilder(tn).build();
this.rss = new MockRegionServerServices(HTU.getConfiguration());
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
this.region = HRegion.openHRegion(ri, td, null, HTU.getConfiguration(), this.rss, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -134,7 +133,6 @@ public ColumnFamilyDescriptorBuilder modifyFamilySchema(ColumnFamilyDescriptorBu
private final String testDescription;
private HRegion region;
private HStore store;
private WALFactory walFactory;
private FileSystem fs;

public TestCacheOnWriteInSchema(CacheOnWriteType cowType) {
Expand Down Expand Up @@ -179,24 +177,17 @@ public void setUp() throws IOException {
fs.delete(logdir, true);

RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
walFactory = new WALFactory(conf, id);

region = TEST_UTIL.createLocalHRegion(info, conf, htd, walFactory.getWAL(info));
region.setBlockCache(BlockCacheFactory.createBlockCache(conf));
store = new HStore(region, hcd, conf, false);
region = HBaseTestingUtil.createRegionAndWAL(info, logdir, conf, htd,
BlockCacheFactory.createBlockCache(conf));
store = region.getStore(hcd.getName());
}

@After
public void tearDown() throws IOException {
IOException ex = null;
try {
region.close();
} catch (IOException e) {
LOG.warn("Caught Exception", e);
ex = e;
}
try {
walFactory.close();
HBaseTestingUtil.closeRegionAndWAL(region);
} catch (IOException e) {
LOG.warn("Caught Exception", e);
ex = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,32 +447,24 @@ public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOExcep

/**
* A test case of HBASE-21041
* @throws Exception Exception
*/
@Test
public void testFlushAndMemstoreSizeCounting() throws Exception {
byte[] family = Bytes.toBytes("family");
this.region = initHRegion(tableName, method, CONF, family);
final WALFactory wals = new WALFactory(CONF, method);
try {
for (byte[] row : HBaseTestingUtil.ROWS) {
Put put = new Put(row);
put.addColumn(family, family, row);
region.put(put);
}
region.flush(true);
// After flush, data size should be zero
assertEquals(0, region.getMemStoreDataSize());
// After flush, a new active mutable segment is created, so the heap size
// should equal to MutableSegment.DEEP_OVERHEAD
assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
// After flush, offheap should be zero
assertEquals(0, region.getMemStoreOffHeapSize());
} finally {
HBaseTestingUtil.closeRegionAndWAL(this.region);
this.region = null;
wals.close();
for (byte[] row : HBaseTestingUtil.ROWS) {
Put put = new Put(row);
put.addColumn(family, family, row);
region.put(put);
}
region.flush(true);
// After flush, data size should be zero
assertEquals(0, region.getMemStoreDataSize());
// After flush, a new active mutable segment is created, so the heap size
// should equal to MutableSegment.DEEP_OVERHEAD
assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
// After flush, offheap should be zero
assertEquals(0, region.getMemStoreOffHeapSize());
}

/**
Expand Down Expand Up @@ -1283,6 +1275,12 @@ public long getSyncedLength() {
// throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
region.close(true);
wal.close();
// release the snapshot and active segment, so netty will not report memory leak
for (HStore store : region.getStores()) {
AbstractMemStore memstore = (AbstractMemStore) store.memstore;
memstore.doClearSnapShot();
memstore.close();
}

// 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
wal.flushActions = new FlushAction[] { FlushAction.COMMIT_FLUSH };
Expand All @@ -1297,15 +1295,18 @@ public long getSyncedLength() {
// DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort
wal.flushActions = new FlushAction[] { FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH };

try {
region.flush(true);
fail("This should have thrown exception");
} catch (DroppedSnapshotException expected) {
// we expect this exception, since we were able to write the snapshot, but failed to
// write the flush marker to WAL
} catch (IOException unexpected) {
throw unexpected;
// we expect this exception, since we were able to write the snapshot, but failed to
// write the flush marker to WAL
assertThrows(DroppedSnapshotException.class, () -> region.flush(true));

region.close(true);
// release the snapshot and active segment, so netty will not report memory leak
for (HStore store : region.getStores()) {
AbstractMemStore memstore = (AbstractMemStore) store.memstore;
memstore.doClearSnapShot();
memstore.close();
}
region = null;
}

@Test
Expand Down Expand Up @@ -3735,14 +3736,14 @@ public void testGetScanner_WithRegionClosed() throws IOException {
byte[][] families = { fam1, fam2 };

// Setting up region
region = initHRegion(tableName, method, CONF, families);
region.closed.set(true);
try {
this.region = initHRegion(tableName, method, CONF, families);
} catch (IOException e) {
e.printStackTrace();
fail("Got IOException during initHRegion, " + e.getMessage());
assertThrows(NotServingRegionException.class, () -> region.getScanner(null));
} finally {
// so we can close the region in tearDown
region.closed.set(false);
}
region.closed.set(true);
assertThrows(NotServingRegionException.class, () -> region.getScanner(null));
}

@Test
Expand Down Expand Up @@ -4543,14 +4544,14 @@ public void flush() {
/**
* So can be overridden in subclasses.
*/
int getNumQualifiersForTestWritesWhileScanning() {
protected int getNumQualifiersForTestWritesWhileScanning() {
return 100;
}

/**
* So can be overridden in subclasses.
*/
int getTestCountForTestWritesWhileScanning() {
protected int getTestCountForTestWritesWhileScanning() {
return 100;
}

Expand Down Expand Up @@ -5829,12 +5830,12 @@ protected HRegion initHRegion(TableName tableName, String callingMethod, Configu
* @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
* when done.
*/
protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
private HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
boolean isReadOnly, byte[]... families) throws IOException {
return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
}

protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
private HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
throws IOException {
Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
Expand All @@ -5849,7 +5850,7 @@ protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopK
* @return A region on which you must call {@link HBaseTestingUtil#closeRegionAndWAL(HRegion)}
* when done.
*/
public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
Configuration conf, boolean isReadOnly, Durability durability, WAL wal, byte[]... families)
throws IOException {
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
Expand Down Expand Up @@ -6708,14 +6709,12 @@ public void testCloseRegionWrittenToWAL() throws Exception {
WAL wal = mockWAL();
when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);

// create and then open a region first so that it can be closed later
region =
HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
region =
HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), rss, null);

// close the region
region.close(false);
// create the region
region = HBaseTestingUtil.createRegionAndWAL(hri, rootDir, CONF, htd);
HBaseTestingUtil.closeRegionAndWAL(region);
region = null;
// open the region first and then close it
HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), rss, null).close();

// 2 times, one for region open, the other close region
verify(wal, times(2)).appendMarker(any(RegionInfo.class), (WALKeyImpl) any(WALKeyImpl.class),
Expand Down Expand Up @@ -7249,7 +7248,7 @@ public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
qual2.length));
}

HRegion initHRegion(TableName tableName, String callingMethod, byte[]... families)
private HRegion initHRegion(TableName tableName, String callingMethod, byte[]... families)
throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
}
Expand Down Expand Up @@ -7727,12 +7726,7 @@ public void run() {

holder.start();
latch.await();
try {
region.close();
} catch (IOException e) {
LOG.info("Caught expected exception", e);
}
region = null;
assertThrows(IOException.class, () -> region.close());
holder.join();

// Verify the region tried to abort the server
Expand Down
Loading

0 comments on commit d0a029d

Please sign in to comment.