Skip to content

Commit

Permalink
HBASE-28956 RSMobFileCleanerChore may close the StoreFileReader objec…
Browse files Browse the repository at this point in the history
…t which is being used by Compaction thread (#6464)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
guluo2016 authored and Apache9 committed Jan 6, 2025
1 parent 43149ef commit 2db2146
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ protected void chore() {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
if (list.isEmpty()) {
// The table is not MOB table, just skip it
continue;
}
List<HRegion> regions = rs.getRegions(htd.getTableName());
for (HRegion region : regions) {
for (ColumnFamilyDescriptor hcd : list) {
Expand All @@ -116,14 +120,27 @@ protected void chore() {
Set<String> regionMobs = new HashSet<String>();
Path currentPath = null;
try {
// collectinng referenced MOBs
// collecting referenced MOBs
for (HStoreFile sf : sfs) {
currentPath = sf.getPath();
sf.initReader();
byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
// close store file to avoid memory leaks
sf.closeStoreFile(true);
byte[] mobRefData = null;
byte[] bulkloadMarkerData = null;
if (sf.getReader() == null) {
synchronized (sf) {
boolean needCreateReader = sf.getReader() == null;
sf.initReader();
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
if (needCreateReader) {
// close store file to avoid memory leaks
sf.closeStoreFile(true);
}
}
} else {
mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
}

if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.hadoop.hbase.mob;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -41,8 +44,12 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
Expand Down Expand Up @@ -124,15 +131,15 @@ private void initConf() {
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
}

private void loadData(int start, int num) {
private void loadData(Table t, int start, int num) {
try {

for (int i = 0; i < num; i++) {
Put p = new Put(Bytes.toBytes(start + i));
p.addColumn(fam, qualifier, mobVal);
table.put(p);
t.put(p);
}
admin.flush(table.getName());
admin.flush(t.getName());
} catch (Exception e) {
LOG.error("MOB file cleaner chore test FAILED", e);
assertTrue(false);
Expand All @@ -148,8 +155,8 @@ public void tearDown() throws Exception {

@Test
public void testMobFileCleanerChore() throws InterruptedException, IOException {
loadData(0, 10);
loadData(10, 10);
loadData(table, 0, 10);
loadData(table, 10, 10);
// loadData(20, 10);
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
assertEquals(2, num);
Expand Down Expand Up @@ -225,6 +232,62 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException {
assertEquals(20, scanned);
}

@Test
public void testCleaningAndStoreFileReaderCreatedByOtherThreads()
throws IOException, InterruptedException {
TableName testTable = TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads");
ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder.newBuilder(fam)
.setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build();
TableDescriptor tDesc =
TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build();
admin.createTable(tDesc);
assertTrue(admin.tableExists(testTable));

// put some data
loadData(admin.getConnection().getTable(testTable), 0, 10);

HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0);
HStore store = region.getStore(fam);
Collection<HStoreFile> storeFiles = store.getStorefiles();
assertEquals(1, store.getStorefiles().size());
final HStoreFile sf = storeFiles.iterator().next();
assertNotNull(sf);
long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam));
assertEquals(1, mobFileNum);

ServerName serverName = null;
for (ServerName sn : admin.getRegionServers()) {
boolean flag = admin.getRegions(sn).stream().anyMatch(
r -> r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString()));
if (flag) {
serverName = sn;
break;
}
}
assertNotNull(serverName);
RSMobFileCleanerChore cleanerChore =
HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore();
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
boolean readerIsNotNull = false;
try {
sf.initReader();
Thread.sleep(1000 * 10);
readerIsNotNull = sf.getReader() != null;
sf.closeStoreFile(true);
} catch (Exception e) {
LOG.error("We occur an exception", e);
}
return readerIsNotNull;
});
Thread.sleep(100);
// The StoreFileReader object was created by another thread
cleanerChore.chore();
Boolean readerIsNotNull = future.join();
assertTrue(readerIsNotNull);
admin.disableTable(testTable);
admin.deleteTable(testTable);
}

private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
throws IOException {
FileSystem fs = FileSystem.get(conf);
Expand Down

0 comments on commit 2db2146

Please sign in to comment.