Skip to content

Commit

Permalink
HBASE-27405 Fix the replication hfile/log cleaner report that the rep…
Browse files Browse the repository at this point in the history
…lication table does not exist
  • Loading branch information
2005hithlj committed Oct 8, 2022
1 parent 59dd1be commit 91ba77f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.master;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -28,6 +29,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
Expand All @@ -39,6 +41,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -66,6 +69,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
// ReplicationQueueId.
private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
private ReplicationPeerManager rpm;
private MasterServices masterServices;
private Supplier<Set<ServerName>> getNotFullyDeadServers;

private boolean canFilter;
Expand All @@ -76,6 +80,20 @@ public void preClean() {
if (this.getConf() == null) {
return;
}
// because the replication table is actually created when the peer is added (lazy creation),
// here we need to ensure that the replication table already exists.
try {
TableName replicationQueueTableName =
TableName.valueOf(getConf().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
if (!masterServices.getTableDescriptors().exists(replicationQueueTableName)) {
return;
}
} catch (IOException e) {
LOG.error("Error occurred when pulling replication table", e);
return;
}

canFilter = rpm.getReplicationLogCleanerBarrier().start();
if (canFilter) {
notFullyDeadServers = getNotFullyDeadServers.get();
Expand Down Expand Up @@ -235,9 +253,9 @@ public void init(Map<String, Object> params) {
if (MapUtils.isNotEmpty(params)) {
Object master = params.get(HMaster.MASTER);
if (master != null && master instanceof MasterServices) {
MasterServices m = (MasterServices) master;
rpm = m.getReplicationPeerManager();
getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
masterServices = (MasterServices) master;
rpm = masterServices.getReplicationPeerManager();
getNotFullyDeadServers = () -> getNotFullyDeadServers(masterServices);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
Expand Down Expand Up @@ -124,6 +125,8 @@ public void beforeTest() throws Exception {
fs.mkdirs(OLD_WALS_DIR);

TableName tableName = tableNameRule.getTableName();
conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());

TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
TEST_UTIL.getAdmin().createTable(td);
TEST_UTIL.waitTableAvailable(tableName);
Expand All @@ -134,6 +137,7 @@ public void beforeTest() throws Exception {
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
when(masterServices.getTableDescriptors()).thenReturn(new FSTableDescriptorsTest(conf));
when(rpm.getQueueStorage()).thenReturn(queueStorage);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
Expand Down Expand Up @@ -314,4 +318,24 @@ public ZKWatcher getZooKeeper() {
return null;
}
}

private static class FSTableDescriptorsTest extends FSTableDescriptors {
private TableName replicationQueueTableName;

public FSTableDescriptorsTest(final Configuration conf) throws IOException {
super(conf);
replicationQueueTableName =
TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
}

@Override
public boolean exists(TableName tablename) throws IOException {
if (tablename.equals(replicationQueueTableName)) {
return true;
} else {
return super.exists(tablename);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -39,6 +40,7 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
Expand All @@ -52,9 +54,11 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.After;
import org.junit.Before;
Expand All @@ -78,11 +82,12 @@ public class TestReplicationLogCleaner {
private ReplicationLogCleaner cleaner;

@Before
public void setUp() throws ReplicationException {
public void setUp() throws ReplicationException, IOException {
services = mock(MasterServices.class);
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(services.getReplicationPeerManager()).thenReturn(rpm);
when(services.getTableDescriptors()).thenReturn(new FSTableDescriptorsTest(CONF));
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
when(rpm.getQueueStorage()).thenReturn(rqs);
Expand Down Expand Up @@ -382,4 +387,24 @@ public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationExcepti
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}

private static class FSTableDescriptorsTest extends FSTableDescriptors {
private TableName replicationQueueTableName;

public FSTableDescriptorsTest(final Configuration conf) throws IOException {
super(conf);
replicationQueueTableName =
TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
}

@Override
public boolean exists(TableName tablename) throws IOException {
if (tablename.equals(replicationQueueTableName)) {
return true;
} else {
return super.exists(tablename);
}
}
}
}

0 comments on commit 91ba77f

Please sign in to comment.