diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java index dc4317feaa4b..4d5fcb45634e 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; +import java.lang.reflect.Constructor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.NamespaceDescriptor; @@ -27,8 +28,11 @@ import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Used to create replication storage(peer, queue) classes. @@ -36,11 +40,15 @@ @InterfaceAudience.Private public final class ReplicationStorageFactory { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class); + public static final String REPLICATION_QUEUE_TABLE_NAME = "hbase.replication.queue.table.name"; public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.impl"; + public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName) throws IOException { return TableDescriptorBuilder.newBuilder(tableName) @@ -72,15 +80,26 @@ public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Con */ public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, Configuration conf) { - return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME, - REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()))); + return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf + .get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()))); } /** * Create a new {@link ReplicationQueueStorage}. */ public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, - TableName tableName) { - return new TableReplicationQueueStorage(conn, tableName); + Configuration conf, TableName tableName) { + Class clazz = conf.getClass(REPLICATION_QUEUE_IMPL, + TableReplicationQueueStorage.class, ReplicationQueueStorage.class); + try { + Constructor c = + clazz.getConstructor(Connection.class, TableName.class); + return c.newInstance(conn, tableName); + } catch (Exception e) { + LOG.debug( + "failed to create ReplicationQueueStorage with Connection, try creating with Configuration", + e); + return ReflectionUtils.newInstance(clazz, conf, tableName); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java new file mode 100644 index 000000000000..04553e8e5808 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClientSideRegionScanner; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@InterfaceAudience.Private +public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage { + + private final Map> offsets = + new HashMap<>(); + + private final Map> lastSequenceIds = new HashMap<>(); + + private final Map> hfileRefs = new HashMap<>(); + + private void loadRegionInfo(FileSystem fs, Path regionDir, + NavigableMap startKey2RegionInfo) throws IOException { + RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + // TODO: we consider that the there will not be too many regions for hbase:replication table, so + // here we just iterate over all the regions to find out the overlapped ones. Can be optimized + // later. + Iterator> iter = startKey2RegionInfo.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (hri.isOverlap(entry.getValue())) { + if (hri.getRegionId() > entry.getValue().getRegionId()) { + // we are newer, remove the old hri, we can not break here as if hri is a merged region, + // we need to remove all its parent regions. + iter.remove(); + } else { + // we are older, just return, skip the below add + return; + } + } + + } + startKey2RegionInfo.put(hri.getStartKey(), hri); + } + + private void loadOffsets(Result result) { + NavigableMap map = + result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY); + if (map == null || map.isEmpty()) { + return; + } + Map offsetMap = new HashMap<>(); + map.forEach((k, v) -> { + String walGroup = Bytes.toString(k); + ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v)); + offsetMap.put(walGroup, offset); + }); + ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); + offsets.put(queueId, offsetMap); + } + + private void loadLastSequenceIds(Result result) { + NavigableMap map = + result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY); + if (map == null || map.isEmpty()) { + return; + } + Map lastSeqIdMap = new HashMap<>(); + map.forEach((k, v) -> { + String encodedRegionName = Bytes.toString(k); + long lastSeqId = Bytes.toLong(v); + lastSeqIdMap.put(encodedRegionName, lastSeqId); + }); + String peerId = Bytes.toString(result.getRow()); + lastSequenceIds.put(peerId, lastSeqIdMap); + } + + private void loadHFileRefs(Result result) { + NavigableMap map = + result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY); + if (map == null || map.isEmpty()) { + return; + } + Set refs = new HashSet<>(); + map.keySet().forEach(ref -> refs.add(Bytes.toString(ref))); + String peerId = Bytes.toString(result.getRow()); + hfileRefs.put(peerId, refs); + } + + private void loadReplicationQueueData(Configuration conf, TableName tableName) + throws IOException { + Path rootDir = CommonFSUtils.getRootDir(conf); + Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); + FileSystem fs = tableDir.getFileSystem(conf); + FileStatus[] regionDirs = + CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs)); + if (regionDirs == null) { + return; + } + NavigableMap startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (FileStatus regionDir : regionDirs) { + loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo); + } + TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); + for (RegionInfo hri : startKey2RegionInfo.values()) { + try (ClientSideRegionScanner scanner = + new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) { + for (;;) { + Result result = scanner.next(); + if (result == null) { + break; + } + loadOffsets(result); + loadLastSequenceIds(result); + loadHFileRefs(result); + } + } + } + } + + public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName) + throws IOException { + loadReplicationQueueData(conf, tableName); + } + + @Override + public synchronized void setOffset(ReplicationQueueId queueId, String walGroup, + ReplicationGroupOffset offset, Map lastSeqIds) throws ReplicationException { + Map offsetMap = offsets.get(queueId); + if (offsetMap == null) { + offsetMap = new HashMap<>(); + offsets.put(queueId, offsetMap); + } + offsetMap.put(walGroup, offset); + Map lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId()); + if (lastSequenceIds == null) { + lastSeqIdsMap = new HashMap<>(); + lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap); + } + for (Map.Entry entry : lastSeqIds.entrySet()) { + Long oldSeqId = lastSeqIdsMap.get(entry.getKey()); + if (oldSeqId == null || oldSeqId < entry.getValue()) { + lastSeqIdsMap.put(entry.getKey(), entry.getValue()); + } + } + } + + @Override + public synchronized Map getOffsets(ReplicationQueueId queueId) + throws ReplicationException { + Map offsetMap = offsets.get(queueId); + if (offsetMap == null) { + return Collections.emptyMap(); + } + return ImmutableMap.copyOf(offsetMap); + } + + @Override + public synchronized List listAllQueueIds(String peerId) + throws ReplicationException { + return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId)) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllQueueIds(ServerName serverName) + throws ReplicationException { + return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName)) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllQueueIds(String peerId, ServerName serverName) + throws ReplicationException { + return offsets.keySet().stream() + .filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName)) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllQueues() throws ReplicationException { + return offsets.entrySet().stream() + .map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue()))) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllReplicators() throws ReplicationException { + return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct() + .collect(Collectors.toList()); + } + + @Override + public synchronized Map claimQueue(ReplicationQueueId queueId, + ServerName targetServerName) throws ReplicationException { + Map offsetMap = offsets.remove(queueId); + if (offsetMap == null) { + return Collections.emptyMap(); + } + offsets.put(queueId.claim(targetServerName), offsetMap); + return ImmutableMap.copyOf(offsetMap); + } + + @Override + public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException { + offsets.remove(queueId); + } + + @Override + public synchronized void removeAllQueues(String peerId) throws ReplicationException { + Iterator iter = offsets.keySet().iterator(); + while (iter.hasNext()) { + if (iter.next().getPeerId().equals(peerId)) { + iter.remove(); + } + } + } + + @Override + public synchronized long getLastSequenceId(String encodedRegionName, String peerId) + throws ReplicationException { + Map lastSeqIdMap = lastSequenceIds.get(peerId); + if (lastSeqIdMap == null) { + return HConstants.NO_SEQNUM; + } + Long lastSeqId = lastSeqIdMap.get(encodedRegionName); + return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM; + } + + @Override + public synchronized void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException { + Map lastSeqIdMap = lastSequenceIds.get(peerId); + if (lastSeqIdMap == null) { + lastSeqIdMap = new HashMap<>(); + lastSequenceIds.put(peerId, lastSeqIdMap); + } + lastSeqIdMap.putAll(lastSeqIds); + } + + @Override + public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException { + lastSequenceIds.remove(peerId); + } + + @Override + public synchronized void removeLastSequenceIds(String peerId, List encodedRegionNames) + throws ReplicationException { + Map lastSeqIdMap = lastSequenceIds.get(peerId); + if (lastSeqIdMap == null) { + return; + } + for (String encodedRegionName : encodedRegionNames) { + lastSeqIdMap.remove(encodedRegionName); + } + } + + @Override + public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException { + hfileRefs.remove(peerId); + } + + @Override + public synchronized void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { + Set refs = hfileRefs.get(peerId); + if (refs == null) { + refs = new HashSet<>(); + hfileRefs.put(peerId, refs); + } + for (Pair pair : pairs) { + refs.add(pair.getSecond().getName()); + } + } + + @Override + public synchronized void removeHFileRefs(String peerId, List files) + throws ReplicationException { + Set refs = hfileRefs.get(peerId); + if (refs == null) { + return; + } + refs.removeAll(files); + } + + @Override + public synchronized List getAllPeersFromHFileRefsQueue() throws ReplicationException { + return ImmutableList.copyOf(hfileRefs.keySet()); + } + + @Override + public synchronized List getReplicableHFiles(String peerId) throws ReplicationException { + Set refs = hfileRefs.get(peerId); + if (refs == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(refs); + } + + @Override + public synchronized Set getAllHFileRefs() throws ReplicationException { + return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + } + + @Override + public boolean hasData() throws ReplicationException { + return true; + } + + @Override + public void batchUpdateQueues(ServerName serverName, List datas) + throws ReplicationException { + throw new UnsupportedOperationException(); + } + + @Override + public void batchUpdateLastSequenceIds( + List lastPushedSeqIds) + throws ReplicationException { + throw new UnsupportedOperationException(); + } + + @Override + public void batchUpdateHFileRefs(String peerId, List hfileRefs) + throws ReplicationException { + throw new UnsupportedOperationException(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index d8c1b5c64c52..bb170be64aff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -641,7 +641,7 @@ public void initialize() throws IOException { }; } return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage( - services.getConnection(), replicationQueueTableName), initializer); + services.getConnection(), conf, replicationQueueTableName), initializer); } public static ReplicationPeerManager create(MasterServices services, String clusterId) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index b63ad473719c..a4e16dd42ea6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -17,13 +17,16 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; @@ -35,11 +38,16 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -69,29 +77,77 @@ public static void main(String[] args) throws Exception { System.exit(ret); } - private Set getLiveRegionServers(ZKWatcher zkw) throws KeeperException { - List rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); - return rsZNodes == null - ? Collections.emptySet() - : rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet()); + // Find region servers under wal directory + // Here we only care about the region servers which may still be alive, as we need to add + // replications for them if missing. The dead region servers which have already been processed + // fully do not need to add their replication queues again, as the operation has already been done + // in SCP. + private Set listRegionServers(FileSystem walFs, Path walDir) throws IOException { + FileStatus[] statuses; + try { + statuses = walFs.listStatus(walDir); + } catch (FileNotFoundException e) { + System.out.println("WAL directory " + walDir + " does not exists, ignore"); + return Collections.emptySet(); + } + Set regionServers = new HashSet<>(); + for (FileStatus status : statuses) { + // All wal files under the walDir is within its region server's directory + if (!status.isDirectory()) { + continue; + } + ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath()); + if (sn != null) { + regionServers.add(sn); + } + } + return regionServers; + } + + private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer, + Set peerIds) throws ReplicationException { + Set existingQueuePeerIds = new HashSet<>(); + List queueIds = storage.listAllQueueIds(regionServer); + for (Iterator iter = queueIds.iterator(); iter.hasNext();) { + ReplicationQueueId queueId = iter.next(); + if (!queueId.isRecovered()) { + existingQueuePeerIds.add(queueId.getPeerId()); + } + } + + for (String peerId : peerIds) { + if (!existingQueuePeerIds.contains(peerId)) { + ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId); + System.out.println("Add replication queue " + queueId + " for claiming"); + storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN, + Collections.emptyMap()); + } + } + } + + private void addMissingReplicationQueues(ReplicationQueueStorage storage, + Set regionServers, Set peerIds) throws ReplicationException { + for (ServerName regionServer : regionServers) { + addMissingReplicationQueues(storage, regionServer, peerIds); + } } // When using this tool, usually the source cluster is unhealthy, so we should try to claim the // replication queues for the dead region servers first and then replicate the data out. - private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr) + private void claimReplicationQueues(ReplicationSourceManager mgr, Set regionServers) throws ReplicationException, KeeperException { - // TODO: reimplement this tool - // List replicators = mgr.getQueueStorage().getListOfReplicators(); - // Set liveRegionServers = getLiveRegionServers(zkw); - // for (ServerName sn : replicators) { - // if (!liveRegionServers.contains(sn)) { - // List replicationQueues = mgr.getQueueStorage().getAllQueues(sn); - // System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); - // for (String queue : replicationQueues) { - // mgr.claimQueue(sn, queue); - // } - // } - // } + // union the region servers from both places, i.e, from the wal directory, and the records in + // replication queue storage. + Set replicators = new HashSet<>(regionServers); + ReplicationQueueStorage queueStorage = mgr.getQueueStorage(); + replicators.addAll(queueStorage.listAllReplicators()); + for (ServerName sn : replicators) { + List replicationQueues = queueStorage.listAllQueueIds(sn); + System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); + for (ReplicationQueueId queueId : replicationQueues) { + mgr.claimQueue(queueId); + } + } } @Override @@ -116,14 +172,20 @@ public boolean isAborted() { System.out.println("Start Replication Server start"); Replication replication = new Replication(); - replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, + // use offline table replication queue storage + getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL, + OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class); + replication.initialize(new DummyServer(getConf(), zkw), fs, logDir, oldLogDir, new WALFactory(conf, ServerName .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()), null, false)); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init(); - claimReplicationQueues(zkw, manager); + Set regionServers = listRegionServers(fs, logDir); + addMissingReplicationQueues(manager.getQueueStorage(), regionServers, + manager.getReplicationPeers().getAllPeerIds()); + claimReplicationQueues(manager, regionServers); while (manager.activeFailoverTaskCount() > 0) { Thread.sleep(SLEEP_TIME); } @@ -138,23 +200,21 @@ public boolean isAborted() { return 0; } - class DummyServer implements Server { - String hostname; - ZKWatcher zkw; + private static final class DummyServer implements Server { + private final Configuration conf; + private final String hostname; + private final ZKWatcher zkw; - DummyServer(ZKWatcher zkw) { + DummyServer(Configuration conf, ZKWatcher zkw) { // a unique name in case the first run fails hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org"; + this.conf = conf; this.zkw = zkw; } - DummyServer(String hostname) { - this.hostname = hostname; - } - @Override public Configuration getConfiguration() { - return getConf(); + return conf; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index d7ba6c227c6d..5d474bc21640 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -127,8 +127,8 @@ public void beforeTest() throws Exception { TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); TEST_UTIL.getAdmin().createTable(td); TEST_UTIL.waitTableAvailable(tableName); - queueStorage = - ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName); + queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), + conf, tableName); masterServices = mock(MasterServices.class); when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 7a89af15902e..bf65d4db82e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -32,14 +32,11 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// revisit later when we implement the new ReplicationSyncUpTool -@Ignore @Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java index 1544265435c7..8731adbe4c2b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -99,8 +99,8 @@ public static void setUpBeforeClass() throws Exception { TableName repTable = TableName.valueOf("test_serial_rep"); UTIL.getAdmin() .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable)); - QUEUE_STORAGE = - ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), repTable); + QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), + UTIL.getConfiguration(), repTable); } @AfterClass