Skip to content

Commit

Permalink
HBASE-27529 Provide RS coproc ability to attach WAL extended attribut…
Browse files Browse the repository at this point in the history
…es to mutations at replication sink (#4924)

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
virajjasani authored Jan 16, 2023
1 parent 7ed2cb9 commit cc54d22
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

import java.io.IOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

/**
* Defines coprocessor hooks for interacting with operations on the
* {@link org.apache.hadoop.hbase.regionserver.HRegionServer} process. Since most implementations
Expand Down Expand Up @@ -137,4 +140,33 @@ default void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnviron
default void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
}

/**
* This will be called before replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void preReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {

}

/**
* This will be called after replication sink mutations are executed on the sink table as part of
* batch call.
* @param ctx the environment to interact with the framework and region server.
* @param walEntry wal entry from which mutation is formed.
* @param mutation mutation to be applied at sink cluster.
* @throws IOException if something goes wrong.
*/
default void postReplicationSinkBatchMutate(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, AdminProtos.WALEntry walEntry,
Mutation mutation) throws IOException {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.SharedConnection;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
Expand All @@ -40,6 +41,8 @@

import org.apache.hbase.thirdparty.com.google.protobuf.Service;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;

@InterfaceAudience.Private
public class RegionServerCoprocessorHost
extends CoprocessorHost<RegionServerCoprocessor, RegionServerCoprocessorEnvironment> {
Expand Down Expand Up @@ -166,6 +169,26 @@ public void call(RegionServerObserver observer) throws IOException {
});
}

public void preReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}

public void postReplicationSinkBatchMutate(AdminProtos.WALEntry walEntry, Mutation mutation)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postReplicationSinkBatchMutate(this, walEntry, mutation);
}
});
}

public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
Expand Down Expand Up @@ -72,7 +74,11 @@ public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir

@Override
public void startReplicationService() throws IOException {
this.replicationSink = new ReplicationSink(this.conf);
RegionServerCoprocessorHost rsServerHost = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
}
this.replicationSink = new ReplicationSink(this.conf, rsServerHost);
this.server.getChoreService().scheduleChore(new ReplicationStatisticsChore(
"ReplicationSinkStatistics", server, (int) TimeUnit.SECONDS.toMillis(statsPeriodInSecond)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -113,13 +114,17 @@ public class ReplicationSink {
private final int rowSizeWarnThreshold;
private boolean replicationSinkTrackerEnabled;

private final RegionServerCoprocessorHost rsServerHost;

/**
* Create a sink for replication
* @param conf conf object
* @throws IOException thrown when HDFS goes bad or bad file name
*/
public ReplicationSink(Configuration conf) throws IOException {
public ReplicationSink(Configuration conf, RegionServerCoprocessorHost rsServerHost)
throws IOException {
this.conf = HBaseConfiguration.create(conf);
this.rsServerHost = rsServerHost;
rowSizeWarnThreshold =
conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
Expand Down Expand Up @@ -178,6 +183,8 @@ private void decorateConf() {
/**
* Replicate this array of entries directly into the local cluster using the native client. Only
* operates against raw protobuf type saving on a conversion from pb to pojo.
* @param entries WAL entries to be replicated.
* @param cells cell scanner for iteration.
* @param replicationClusterId Id which will uniquely identify source cluster FS client
* configurations in the replication configuration directory
* @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
Expand All @@ -200,6 +207,8 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();

Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
new Pair<>(new ArrayList<>(), new ArrayList<>());
for (WALEntry entry : entries) {
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
Expand Down Expand Up @@ -265,6 +274,11 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
mutation.setClusterIds(clusterIds);
mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
HConstants.EMPTY_BYTE_ARRAY);
if (rsServerHost != null) {
rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
mutationsToWalEntriesPairs.getFirst().add(mutation);
mutationsToWalEntriesPairs.getSecond().add(entry);
}
addToHashMultiMap(rowMap, table, clusterIds, mutation);
}
if (CellUtil.isDelete(cell)) {
Expand All @@ -287,6 +301,14 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
LOG.debug("Finished replicating mutations.");
}

if (rsServerHost != null) {
List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
for (int i = 0; i < mutations.size(); i++) {
rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
}
}

if (bulkLoadsPerClusters != null) {
for (Entry<List<String>,
Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
Expand Down
Loading

0 comments on commit cc54d22

Please sign in to comment.