Skip to content

Commit

Permalink
HBASE-27216 Revisit the ReplicationSyncUp tool
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jan 31, 2023
1 parent 069d1ca commit 0a85981
Show file tree
Hide file tree
Showing 8 changed files with 496 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.lang.reflect.Constructor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
Expand All @@ -27,20 +28,27 @@
import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Used to create replication storage(peer, queue) classes.
*/
@InterfaceAudience.Private
public final class ReplicationStorageFactory {

private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class);

public static final String REPLICATION_QUEUE_TABLE_NAME = "hbase.replication.queue.table.name";

public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");

public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.impl";

public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName)
throws IOException {
return TableDescriptorBuilder.newBuilder(tableName)
Expand Down Expand Up @@ -72,15 +80,26 @@ public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Con
*/
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
Configuration conf) {
return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf
.get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
}

/**
* Create a new {@link ReplicationQueueStorage}.
*/
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
TableName tableName) {
return new TableReplicationQueueStorage(conn, tableName);
Configuration conf, TableName tableName) {
Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL,
TableReplicationQueueStorage.class, ReplicationQueueStorage.class);
try {
Constructor<? extends ReplicationQueueStorage> c =
clazz.getConstructor(Connection.class, TableName.class);
return c.newInstance(conn, tableName);
} catch (Exception e) {
LOG.debug(
"failed to create ReplicationQueueStorage with Connection, try creating with Configuration",
e);
return ReflectionUtils.newInstance(clazz, conf, tableName);
}
}
}
Loading

0 comments on commit 0a85981

Please sign in to comment.