Skip to content

Commit

Permalink
HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it t…
Browse files Browse the repository at this point in the history
…o ReplicationSourceManager (#2020)

Signed-off-by: Wellington Chevreuil <[email protected]>
  • Loading branch information
infraio committed Jul 21, 2020
1 parent 6cf013d commit bb9cae1
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -45,21 +45,17 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
Expand Down Expand Up @@ -224,38 +220,6 @@ public void enqueueLog(Path log) {
}
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = replicationPeer.getId();
Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else if (namespaces != null) { // Only for set NAMESPACES peers
if (namespaces.contains(tableName.getNamespaceAsString())) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}

private ReplicationEndpoint createReplicationEndpoint()
throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
RegionServerCoprocessorHost rsServerHost = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -62,17 +59,6 @@ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
*/
void enqueueLog(Path log);

/**
* Add hfile names to the queue to be replicated.
* @param tableName Name of the table these files belongs to
* @param family Name of the family these files belong to
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue for replication}
* @throws ReplicationException If failed to add hfile references
*/
void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException;

/**
* Start the replication
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
Expand Down Expand Up @@ -170,6 +171,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// replication peer.
private final int maxRetriesMultiplier;

private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();

/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
Expand Down Expand Up @@ -348,6 +351,7 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);

MetricsSource metrics = new MetricsSource(queueId);
sourceMetrics.put(queueId, metrics);
// init replication source
src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
walFileLengthProvider, metrics);
Expand Down Expand Up @@ -1120,7 +1124,49 @@ public String getStats() {
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws IOException {
for (ReplicationSourceInterface source : this.sources.values()) {
throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs));
}
}

/**
* Add hfile names to the queue to be replicated.
* @param peerId the replication peer id
* @param tableName Name of the table these files belongs to
* @param family Name of the family these files belong to
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue for replication}
* @throws ReplicationException If failed to add hfile references
*/
private void addHFileRefs(String peerId, TableName tableName, byte[] family,
List<Pair<Path, Path>> pairs) throws ReplicationException {
// Only the normal replication source update here, its peerId is equals to queueId.
MetricsSource metrics = sourceMetrics.get(peerId);
ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else if (namespaces != null) { // Only for set NAMESPACES peers
if (namespaces.contains(tableName.getNamespaceAsString())) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,16 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;

/**
Expand Down Expand Up @@ -113,12 +112,6 @@ public String getStats() {
return "";
}

@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
throws ReplicationException {
return;
}

@Override
public boolean isPeerEnabled() {
return true;
Expand Down

0 comments on commit bb9cae1

Please sign in to comment.