From 2db2146133904e95513e3b0af3d32edd47e97ce3 Mon Sep 17 00:00:00 2001 From: Peng Lu Date: Fri, 15 Nov 2024 19:02:08 +0800 Subject: [PATCH] HBASE-28956 RSMobFileCleanerChore may close the StoreFileReader object which is being used by Compaction thread (#6464) Signed-off-by: Wellington Chevreuil --- .../hbase/mob/RSMobFileCleanerChore.java | 29 ++++++-- .../hbase/mob/TestRSMobFileCleanerChore.java | 73 +++++++++++++++++-- 2 files changed, 91 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java index 06e349887337..c791a50bdef5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java @@ -108,6 +108,10 @@ protected void chore() { // Now clean obsolete files for a table LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName()); List list = MobUtils.getMobColumnFamilies(htd); + if (list.isEmpty()) { + // The table is not MOB table, just skip it + continue; + } List regions = rs.getRegions(htd.getTableName()); for (HRegion region : regions) { for (ColumnFamilyDescriptor hcd : list) { @@ -116,14 +120,27 @@ protected void chore() { Set regionMobs = new HashSet(); Path currentPath = null; try { - // collectinng referenced MOBs + // collecting referenced MOBs for (HStoreFile sf : sfs) { currentPath = sf.getPath(); - sf.initReader(); - byte[] mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); - byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); - // close store file to avoid memory leaks - sf.closeStoreFile(true); + byte[] mobRefData = null; + byte[] bulkloadMarkerData = null; + if (sf.getReader() == null) { + synchronized (sf) { + boolean needCreateReader = sf.getReader() == null; + sf.initReader(); + mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); + bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); + if (needCreateReader) { + // close store file to avoid memory leaks + sf.closeStoreFile(true); + } + } + } else { + mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS); + bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY); + } + if (mobRefData == null) { if (bulkloadMarkerData == null) { LOG.warn( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java index 98187631d964..d9470420e995 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hbase.mob; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -41,8 +44,12 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; @@ -124,15 +131,15 @@ private void initConf() { conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2); } - private void loadData(int start, int num) { + private void loadData(Table t, int start, int num) { try { for (int i = 0; i < num; i++) { Put p = new Put(Bytes.toBytes(start + i)); p.addColumn(fam, qualifier, mobVal); - table.put(p); + t.put(p); } - admin.flush(table.getName()); + admin.flush(t.getName()); } catch (Exception e) { LOG.error("MOB file cleaner chore test FAILED", e); assertTrue(false); @@ -148,8 +155,8 @@ public void tearDown() throws Exception { @Test public void testMobFileCleanerChore() throws InterruptedException, IOException { - loadData(0, 10); - loadData(10, 10); + loadData(table, 0, 10); + loadData(table, 10, 10); // loadData(20, 10); long num = getNumberOfMobFiles(conf, table.getName(), new String(fam)); assertEquals(2, num); @@ -225,6 +232,62 @@ public void testMobFileCleanerChore() throws InterruptedException, IOException { assertEquals(20, scanned); } + @Test + public void testCleaningAndStoreFileReaderCreatedByOtherThreads() + throws IOException, InterruptedException { + TableName testTable = TableName.valueOf("testCleaningAndStoreFileReaderCreatedByOtherThreads"); + ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder.newBuilder(fam) + .setMobEnabled(true).setMobThreshold(mobLen).setMaxVersions(1).build(); + TableDescriptor tDesc = + TableDescriptorBuilder.newBuilder(testTable).setColumnFamily(cfDesc).build(); + admin.createTable(tDesc); + assertTrue(admin.tableExists(testTable)); + + // put some data + loadData(admin.getConnection().getTable(testTable), 0, 10); + + HRegion region = HTU.getHBaseCluster().getRegions(testTable).get(0); + HStore store = region.getStore(fam); + Collection storeFiles = store.getStorefiles(); + assertEquals(1, store.getStorefiles().size()); + final HStoreFile sf = storeFiles.iterator().next(); + assertNotNull(sf); + long mobFileNum = getNumberOfMobFiles(conf, testTable, new String(fam)); + assertEquals(1, mobFileNum); + + ServerName serverName = null; + for (ServerName sn : admin.getRegionServers()) { + boolean flag = admin.getRegions(sn).stream().anyMatch( + r -> r.getRegionNameAsString().equals(region.getRegionInfo().getRegionNameAsString())); + if (flag) { + serverName = sn; + break; + } + } + assertNotNull(serverName); + RSMobFileCleanerChore cleanerChore = + HTU.getHBaseCluster().getRegionServer(serverName).getRSMobFileCleanerChore(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + boolean readerIsNotNull = false; + try { + sf.initReader(); + Thread.sleep(1000 * 10); + readerIsNotNull = sf.getReader() != null; + sf.closeStoreFile(true); + } catch (Exception e) { + LOG.error("We occur an exception", e); + } + return readerIsNotNull; + }); + Thread.sleep(100); + // The StoreFileReader object was created by another thread + cleanerChore.chore(); + Boolean readerIsNotNull = future.join(); + assertTrue(readerIsNotNull); + admin.disableTable(testTable); + admin.deleteTable(testTable); + } + private long getNumberOfMobFiles(Configuration conf, TableName tableName, String family) throws IOException { FileSystem fs = FileSystem.get(conf);