diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index f1fd8f8d6b3a..2fb79786e1d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication.master; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -66,6 +69,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { // ReplicationQueueId. private Map>> replicationOffsets; private ReplicationPeerManager rpm; + private MasterServices masterServices; private Supplier> getNotFullyDeadServers; private boolean canFilter; @@ -76,6 +80,20 @@ public void preClean() { if (this.getConf() == null) { return; } + // because the replication table is actually created when the peer is added (lazy creation), + // here we need to ensure that the replication table already exists. + try { + TableName replicationQueueTableName = + TableName.valueOf(getConf().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, + ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); + if (!masterServices.getTableDescriptors().exists(replicationQueueTableName)) { + return; + } + } catch (IOException e) { + LOG.error("Error occurred when pulling replication table", e); + return; + } + canFilter = rpm.getReplicationLogCleanerBarrier().start(); if (canFilter) { notFullyDeadServers = getNotFullyDeadServers.get(); @@ -235,9 +253,9 @@ public void init(Map params) { if (MapUtils.isNotEmpty(params)) { Object master = params.get(HMaster.MASTER); if (master != null && master instanceof MasterServices) { - MasterServices m = (MasterServices) master; - rpm = m.getReplicationPeerManager(); - getNotFullyDeadServers = () -> getNotFullyDeadServers(m); + masterServices = (MasterServices) master; + rpm = masterServices.getReplicationPeerManager(); + getNotFullyDeadServers = () -> getNotFullyDeadServers(masterServices); return; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index d7ba6c227c6d..75040f2ce910 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.AfterClass; @@ -124,6 +125,8 @@ public void beforeTest() throws Exception { fs.mkdirs(OLD_WALS_DIR); TableName tableName = tableNameRule.getTableName(); + conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); + TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); TEST_UTIL.getAdmin().createTable(td); TEST_UTIL.waitTableAvailable(tableName); @@ -134,6 +137,7 @@ public void beforeTest() throws Exception { when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); when(masterServices.getReplicationPeerManager()).thenReturn(rpm); + when(masterServices.getTableDescriptors()).thenReturn(new FSTableDescriptorsTest(conf)); when(rpm.getQueueStorage()).thenReturn(queueStorage); when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); @@ -314,4 +318,24 @@ public ZKWatcher getZooKeeper() { return null; } } + + private static class FSTableDescriptorsTest extends FSTableDescriptors { + private TableName replicationQueueTableName; + + public FSTableDescriptorsTest(final Configuration conf) throws IOException { + super(conf); + replicationQueueTableName = + TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, + ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); + } + + @Override + public boolean exists(TableName tablename) throws IOException { + if (tablename.equals(replicationQueueTableName)) { + return true; + } else { + return super.exists(tablename); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java index 7a227fb0603d..2623b9a78dad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.ServerManager; @@ -52,9 +54,11 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.junit.After; import org.junit.Before; @@ -78,11 +82,12 @@ public class TestReplicationLogCleaner { private ReplicationLogCleaner cleaner; @Before - public void setUp() throws ReplicationException { + public void setUp() throws ReplicationException, IOException { services = mock(MasterServices.class); ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); when(services.getReplicationPeerManager()).thenReturn(rpm); + when(services.getTableDescriptors()).thenReturn(new FSTableDescriptorsTest(CONF)); when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); when(rpm.getQueueStorage()).thenReturn(rqs); @@ -382,4 +387,24 @@ public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationExcepti assertSame(file, iter.next()); assertFalse(iter.hasNext()); } + + private static class FSTableDescriptorsTest extends FSTableDescriptors { + private TableName replicationQueueTableName; + + public FSTableDescriptorsTest(final Configuration conf) throws IOException { + super(conf); + replicationQueueTableName = + TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, + ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); + } + + @Override + public boolean exists(TableName tablename) throws IOException { + if (tablename.equals(replicationQueueTableName)) { + return true; + } else { + return super.exists(tablename); + } + } + } }