diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index bc1754904b89..5f8f2dba7e7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -28,7 +28,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -36,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -45,21 +45,17 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableDescriptors; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.replication.ChainWALEntryFilter; import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -224,38 +220,6 @@ public void enqueueLog(Path log) { } } - @Override - public void addHFileRefs(TableName tableName, byte[] family, List> pairs) - throws ReplicationException { - String peerId = replicationPeer.getId(); - Set namespaces = replicationPeer.getNamespaces(); - Map> tableCFMap = replicationPeer.getTableCFs(); - if (tableCFMap != null) { // All peers with TableCFs - List tableCfs = tableCFMap.get(tableName); - if (tableCFMap.containsKey(tableName) - && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } else if (namespaces != null) { // Only for set NAMESPACES peers - if (namespaces.contains(tableName.getNamespaceAsString())) { - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } else { - LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", - tableName, Bytes.toString(family), peerId); - } - } else { - // user has explicitly not defined any table cfs for replication, means replicate all the - // data - this.queueStorage.addHFileRefs(peerId, pairs); - metrics.incrSizeOfHFileRefsQueue(pairs.size()); - } - } - private ReplicationEndpoint createReplicationEndpoint() throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { RegionServerCoprocessorHost rsServerHost = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 0bd90cf1ee89..33a413f73a0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -29,12 +29,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; @@ -62,17 +59,6 @@ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, */ void enqueueLog(Path log); - /** - * Add hfile names to the queue to be replicated. - * @param tableName Name of the table these files belongs to - * @param family Name of the family these files belong to - * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which - * will be added in the queue for replication} - * @throws ReplicationException If failed to add hfile references - */ - void addHFileRefs(TableName tableName, byte[] family, List> pairs) - throws ReplicationException; - /** * Start the replication */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 1a012bd5db42..a559b3d3dec6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; @@ -170,6 +171,8 @@ public class ReplicationSourceManager implements ReplicationListener { // replication peer. private final int maxRetriesMultiplier; + private final Map sourceMetrics = new HashMap<>(); + /** * Creates a replication manager and sets the watch on all the other registered region servers * @param queueStorage the interface for manipulating replication queues @@ -348,6 +351,7 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); MetricsSource metrics = new MetricsSource(queueId); + sourceMetrics.put(queueId, metrics); // init replication source src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, walFileLengthProvider, metrics); @@ -1120,7 +1124,49 @@ public String getStats() { public void addHFileRefs(TableName tableName, byte[] family, List> pairs) throws IOException { for (ReplicationSourceInterface source : this.sources.values()) { - throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs)); + throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs)); + } + } + + /** + * Add hfile names to the queue to be replicated. + * @param peerId the replication peer id + * @param tableName Name of the table these files belongs to + * @param family Name of the family these files belong to + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue for replication} + * @throws ReplicationException If failed to add hfile references + */ + private void addHFileRefs(String peerId, TableName tableName, byte[] family, + List> pairs) throws ReplicationException { + // Only the normal replication source update here, its peerId is equals to queueId. + MetricsSource metrics = sourceMetrics.get(peerId); + ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId); + Set namespaces = replicationPeer.getNamespaces(); + Map> tableCFMap = replicationPeer.getTableCFs(); + if (tableCFMap != null) { // All peers with TableCFs + List tableCfs = tableCFMap.get(tableName); + if (tableCFMap.containsKey(tableName) + && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) { + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); + } + } else if (namespaces != null) { // Only for set NAMESPACES peers + if (namespaces.contains(tableName.getNamespaceAsString())) { + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); + } else { + LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}", + tableName, Bytes.toString(family), peerId); + } + } else { + // user has explicitly not defined any table cfs for replication, means replicate all the + // data + this.queueStorage.addHFileRefs(peerId, pairs); + metrics.incrSizeOfHFileRefsQueue(pairs.size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index a361c4470604..781a1da16242 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,17 +21,16 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; /** @@ -113,12 +112,6 @@ public String getStats() { return ""; } - @Override - public void addHFileRefs(TableName tableName, byte[] family, List> files) - throws ReplicationException { - return; - } - @Override public boolean isPeerEnabled() { return true;