diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 8564c105331e..8a26972c4810 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -63,6 +65,11 @@ public class CopyTable extends Configured implements Tool { String startRow = null; String stopRow = null; String dstTableName = null; + URI peerUri = null; + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead. + */ + @Deprecated String peerAddress = null; String families = null; boolean allCells = false; @@ -89,7 +96,7 @@ private Path generateUniqTempDir(boolean withDirCreated) throws IOException { return newDir; } - private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException { + private void initCopyTableMapperJob(Job job, Scan scan) throws IOException { Class mapper = bulkload ? CellImporter.class : Importer.class; if (readingSnapshot) { TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, @@ -166,7 +173,7 @@ public Job createSubmittableJob(String[] args) throws IOException { job.setNumReduceTasks(0); if (bulkload) { - initCopyTableMapperReducerJob(job, scan); + initCopyTableMapperJob(job, scan); // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); @@ -180,8 +187,15 @@ public Job createSubmittableJob(String[] args) throws IOException { admin.getDescriptor((TableName.valueOf(dstTableName)))); } } else { - initCopyTableMapperReducerJob(job, scan); - TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress); + initCopyTableMapperJob(job, scan); + if (peerUri != null) { + TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri); + } else if (peerAddress != null) { + TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress); + } else { + TableMapReduceUtil.initTableReducerJob(dstTableName, null, job); + } + } return job; @@ -195,7 +209,7 @@ private static void printUsage(final String errorMsg) { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " - + "[--new.name=NEW] [--peer.adr=ADR] "); + + "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] "); System.err.println(); System.err.println("Options:"); System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); @@ -208,9 +222,12 @@ private static void printUsage(final String errorMsg) { System.err.println(" endtime end of the time range. Ignored if no starttime specified."); System.err.println(" versions number of cell versions to copy"); System.err.println(" new.name new table's name"); + System.err.println(" peer.uri The URI of the peer cluster"); System.err.println(" peer.adr Address of the peer cluster given in the format"); System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client" + ".port:zookeeper.znode.parent"); + System.err.println(" Do not take effect if peer.uri is specified"); + System.err.println(" Deprecated, please use peer.uri instead"); System.err.println(" families comma-separated list of families to copy"); System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. "); System.err.println(" To keep the same name, just give \"cfName\""); @@ -247,144 +264,149 @@ private boolean doCommandLine(final String[] args) { printUsage(null); return false; } - try { - for (int i = 0; i < args.length; i++) { - String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { - printUsage(null); - return false; - } - - final String startRowArgKey = "--startrow="; - if (cmd.startsWith(startRowArgKey)) { - startRow = cmd.substring(startRowArgKey.length()); - continue; - } - - final String stopRowArgKey = "--stoprow="; - if (cmd.startsWith(stopRowArgKey)) { - stopRow = cmd.substring(stopRowArgKey.length()); - continue; - } - - final String startTimeArgKey = "--starttime="; - if (cmd.startsWith(startTimeArgKey)) { - startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); - continue; - } - - final String endTimeArgKey = "--endtime="; - if (cmd.startsWith(endTimeArgKey)) { - endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); - continue; - } - - final String batchArgKey = "--batch="; - if (cmd.startsWith(batchArgKey)) { - batch = Integer.parseInt(cmd.substring(batchArgKey.length())); - continue; - } - - final String cacheRowArgKey = "--cacheRow="; - if (cmd.startsWith(cacheRowArgKey)) { - cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); - continue; - } + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + if (cmd.equals("-h") || cmd.startsWith("--h")) { + printUsage(null); + return false; + } - final String versionsArgKey = "--versions="; - if (cmd.startsWith(versionsArgKey)) { - versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); - continue; - } + final String startRowArgKey = "--startrow="; + if (cmd.startsWith(startRowArgKey)) { + startRow = cmd.substring(startRowArgKey.length()); + continue; + } - final String newNameArgKey = "--new.name="; - if (cmd.startsWith(newNameArgKey)) { - dstTableName = cmd.substring(newNameArgKey.length()); - continue; - } + final String stopRowArgKey = "--stoprow="; + if (cmd.startsWith(stopRowArgKey)) { + stopRow = cmd.substring(stopRowArgKey.length()); + continue; + } - final String peerAdrArgKey = "--peer.adr="; - if (cmd.startsWith(peerAdrArgKey)) { - peerAddress = cmd.substring(peerAdrArgKey.length()); - continue; - } + final String startTimeArgKey = "--starttime="; + if (cmd.startsWith(startTimeArgKey)) { + startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); + continue; + } - final String familiesArgKey = "--families="; - if (cmd.startsWith(familiesArgKey)) { - families = cmd.substring(familiesArgKey.length()); - continue; - } + final String endTimeArgKey = "--endtime="; + if (cmd.startsWith(endTimeArgKey)) { + endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); + continue; + } - if (cmd.startsWith("--all.cells")) { - allCells = true; - continue; - } + final String batchArgKey = "--batch="; + if (cmd.startsWith(batchArgKey)) { + batch = Integer.parseInt(cmd.substring(batchArgKey.length())); + continue; + } - if (cmd.startsWith("--bulkload")) { - bulkload = true; - continue; - } + final String cacheRowArgKey = "--cacheRow="; + if (cmd.startsWith(cacheRowArgKey)) { + cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length())); + continue; + } - if (cmd.startsWith("--shuffle")) { - shuffle = true; - continue; - } + final String versionsArgKey = "--versions="; + if (cmd.startsWith(versionsArgKey)) { + versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); + continue; + } - if (cmd.startsWith("--snapshot")) { - readingSnapshot = true; - continue; - } + final String newNameArgKey = "--new.name="; + if (cmd.startsWith(newNameArgKey)) { + dstTableName = cmd.substring(newNameArgKey.length()); + continue; + } - if (i == args.length - 1) { - if (readingSnapshot) { - snapshot = cmd; - } else { - tableName = cmd; - } - } else { - printUsage("Invalid argument '" + cmd + "'"); + final String peerUriArgKey = "--peer.uri="; + if (cmd.startsWith(peerUriArgKey)) { + try { + peerUri = new URI(cmd.substring(peerUriArgKey.length())); + } catch (URISyntaxException e) { + LOG.error("Malformed peer uri specified: {}", cmd, e); return false; } + continue; } - if (dstTableName == null && peerAddress == null) { - printUsage("At least a new table name or a peer address must be specified"); - return false; + + final String peerAdrArgKey = "--peer.adr="; + if (cmd.startsWith(peerAdrArgKey)) { + peerAddress = cmd.substring(peerAdrArgKey.length()); + continue; } - if ((endTime != 0) && (startTime > endTime)) { - printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); - return false; + + final String familiesArgKey = "--families="; + if (cmd.startsWith(familiesArgKey)) { + families = cmd.substring(familiesArgKey.length()); + continue; } - if (bulkload && peerAddress != null) { - printUsage("Remote bulkload is not supported!"); - return false; + if (cmd.startsWith("--all.cells")) { + allCells = true; + continue; } - if (readingSnapshot && peerAddress != null) { - printUsage("Loading data from snapshot to remote peer cluster is not supported."); - return false; + if (cmd.startsWith("--bulkload")) { + bulkload = true; + continue; } - if (readingSnapshot && dstTableName == null) { - printUsage("The --new.name= for destination table should be " - + "provided when copying data from snapshot ."); - return false; + if (cmd.startsWith("--shuffle")) { + shuffle = true; + continue; } - if (readingSnapshot && snapshot == null) { - printUsage("Snapshot shouldn't be null when --snapshot is enabled."); - return false; + if (cmd.startsWith("--snapshot")) { + readingSnapshot = true; + continue; } - // set dstTableName if necessary - if (dstTableName == null) { - dstTableName = tableName; + if (i == args.length - 1) { + if (readingSnapshot) { + snapshot = cmd; + } else { + tableName = cmd; + } + } else { + printUsage("Invalid argument '" + cmd + "'"); + return false; } - } catch (Exception e) { - LOG.error("Failed to parse commandLine arguments", e); - printUsage("Can't start because " + e.getMessage()); + } + if (dstTableName == null && peerAddress == null) { + printUsage("At least a new table name or a peer address must be specified"); + return false; + } + if ((endTime != 0) && (startTime > endTime)) { + printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime); return false; } + + if (bulkload && (peerUri != null || peerAddress != null)) { + printUsage("Remote bulkload is not supported!"); + return false; + } + + if (readingSnapshot && (peerUri != null || peerAddress != null)) { + printUsage("Loading data from snapshot to remote peer cluster is not supported."); + return false; + } + + if (readingSnapshot && dstTableName == null) { + printUsage("The --new.name=
for destination table should be " + + "provided when copying data from snapshot ."); + return false; + } + + if (readingSnapshot && snapshot == null) { + printUsage("Snapshot shouldn't be null when --snapshot is enabled."); + return false; + } + + // set dstTableName if necessary + if (dstTableName == null) { + dstTableName = tableName; + } return true; } @@ -401,7 +423,9 @@ public static void main(String[] args) throws Exception { @Override public int run(String[] args) throws Exception { Job job = createSubmittableJob(args); - if (job == null) return 1; + if (job == null) { + return 1; + } if (!job.waitForCompletion(true)) { LOG.info("Map-reduce job failed!"); if (bulkload) { diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 146f4ec6511f..3b083b33dbdf 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -18,8 +18,11 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Collections; import java.util.Iterator; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; @@ -65,7 +68,17 @@ public class SyncTable extends Configured implements Tool { static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir"; static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name"; static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name"; + static final String SOURCE_URI_CONF_KEY = "sync.table.source.uri"; + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #SOURCE_URI_CONF_KEY} instead. + */ + @Deprecated static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster"; + static final String TARGET_URI_CONF_KEY = "sync.table.target.uri"; + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #TARGET_URI_CONF_KEY} instead. + */ + @Deprecated static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster"; static final String DRY_RUN_CONF_KEY = "sync.table.dry.run"; static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes"; @@ -76,7 +89,17 @@ public class SyncTable extends Configured implements Tool { String sourceTableName; String targetTableName; + URI sourceUri; + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #sourceUri} instead. + */ + @Deprecated String sourceZkCluster; + URI targetUri; + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #targetUri} instead. + */ + @Deprecated String targetZkCluster; boolean dryRun; boolean doDeletes = true; @@ -89,9 +112,9 @@ public SyncTable(Configuration conf) { super(conf); } - private void initCredentialsForHBase(String zookeeper, Job job) throws IOException { + private void initCredentialsForHBase(String clusterKey, Job job) throws IOException { Configuration peerConf = - HBaseConfiguration.createClusterConf(job.getConfiguration(), zookeeper); + HBaseConfiguration.createClusterConf(job.getConfiguration(), clusterKey); TableMapReduceUtil.initCredentialsForCluster(job, peerConf); } @@ -142,11 +165,17 @@ public Job createSubmittableJob(String[] args) throws IOException { jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString()); jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName); jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName); - if (sourceZkCluster != null) { + if (sourceUri != null) { + jobConf.set(SOURCE_URI_CONF_KEY, sourceUri.toString()); + TableMapReduceUtil.initCredentialsForCluster(job, jobConf, sourceUri); + } else if (sourceZkCluster != null) { jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster); initCredentialsForHBase(sourceZkCluster, job); } - if (targetZkCluster != null) { + if (targetUri != null) { + jobConf.set(TARGET_URI_CONF_KEY, targetUri.toString()); + TableMapReduceUtil.initCredentialsForCluster(job, jobConf, targetUri); + } else if (targetZkCluster != null) { jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster); initCredentialsForHBase(targetZkCluster, job); } @@ -165,8 +194,11 @@ public Job createSubmittableJob(String[] args) throws IOException { } else { // No reducers. Just write straight to table. Call initTableReducerJob // because it sets up the TableOutputFormat. - TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster); - + if (targetUri != null) { + TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetUri); + } else { + TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster); + } // would be nice to add an option for bulk load instead } @@ -214,9 +246,10 @@ public static enum Counter { protected void setup(Context context) throws IOException { Configuration conf = context.getConfiguration(); sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); - sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); - targetConnection = - openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, TableOutputFormat.OUTPUT_CONF_PREFIX); + sourceConnection = + openConnection(conf, SOURCE_URI_CONF_KEY, SOURCE_ZK_CLUSTER_CONF_KEY, null); + targetConnection = openConnection(conf, TARGET_URI_CONF_KEY, TARGET_ZK_CLUSTER_CONF_KEY, + TableOutputFormat.OUTPUT_CONF_PREFIX); sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false); @@ -241,12 +274,22 @@ protected void setup(Context context) throws IOException { targetHasher.ignoreTimestamps = ignoreTimestamp; } - private static Connection openConnection(Configuration conf, String zkClusterConfKey, - String configPrefix) throws IOException { - String zkCluster = conf.get(zkClusterConfKey); - Configuration clusterConf = - HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix); - return ConnectionFactory.createConnection(clusterConf); + private static Connection openConnection(Configuration conf, String uriConfKey, + String zkClusterConfKey, String configPrefix) throws IOException { + String uri = conf.get(uriConfKey); + if (!StringUtils.isBlank(uri)) { + try { + return ConnectionFactory.createConnection(new URI(uri), conf); + } catch (URISyntaxException e) { + throw new IOException( + "malformed connection uri: " + uri + ", please check config " + uriConfKey, e); + } + } else { + String zkCluster = conf.get(zkClusterConfKey); + Configuration clusterConf = + HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix); + return ConnectionFactory.createConnection(clusterConf); + } } private static Table openTable(Connection connection, Configuration conf, @@ -747,10 +790,18 @@ private static void printUsage(final String errorMsg) { System.err.println(); System.err.println("Options:"); + System.err.println(" sourceuri Cluster connection uri of the source table"); + System.err.println(" (defaults to cluster in classpath's config)"); System.err.println(" sourcezkcluster ZK cluster key of the source table"); System.err.println(" (defaults to cluster in classpath's config)"); + System.err.println(" Do not take effect if sourceuri is specified"); + System.err.println(" Deprecated, please use sourceuri instead"); + System.err.println(" targeturi Cluster connection uri of the target table"); + System.err.println(" (defaults to cluster in classpath's config)"); System.err.println(" targetzkcluster ZK cluster key of the target table"); System.err.println(" (defaults to cluster in classpath's config)"); + System.err.println(" Do not take effect if targeturi is specified"); + System.err.println(" Deprecated, please use targeturi instead"); System.err.println(" dryrun if true, output counters but no writes"); System.err.println(" (defaults to false)"); System.err.println(" doDeletes if false, does not perform deletes"); @@ -792,6 +843,11 @@ private boolean doCommandLine(final String[] args) { printUsage(null); return false; } + final String sourceUriKey = "--sourceuri="; + if (cmd.startsWith(sourceUriKey)) { + sourceUri = new URI(cmd.substring(sourceUriKey.length())); + continue; + } final String sourceZkClusterKey = "--sourcezkcluster="; if (cmd.startsWith(sourceZkClusterKey)) { @@ -799,6 +855,12 @@ private boolean doCommandLine(final String[] args) { continue; } + final String targetUriKey = "--targeturi="; + if (cmd.startsWith(targetUriKey)) { + targetUri = new URI(cmd.substring(targetUriKey.length())); + continue; + } + final String targetZkClusterKey = "--targetzkcluster="; if (cmd.startsWith(targetZkClusterKey)) { targetZkCluster = cmd.substring(targetZkClusterKey.length()); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index a23393ff804c..3179afd46829 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -20,6 +20,8 @@ import com.codahale.metrics.MetricRegistry; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.net.URLDecoder; import java.util.ArrayList; @@ -31,8 +33,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,6 +45,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Scan; @@ -49,12 +54,13 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.IOExceptionRunnable; +import org.apache.hadoop.hbase.util.IOExceptionSupplier; import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -442,6 +448,13 @@ public static void initTableMapperJob(List scans, Class connSupplier, User user, + Job job) throws IOException, InterruptedException { + try (Connection conn = connSupplier.get()) { + TokenUtil.addTokenForJob(conn, user, job); + } + } + public static void initCredentials(Job job) throws IOException { UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); if (userProvider.isHadoopSecurityEnabled()) { @@ -453,27 +466,32 @@ public static void initCredentials(Job job) throws IOException { } if (userProvider.isHBaseSecurityEnabled()) { + User user = userProvider.getCurrent(); try { // init credentials for remote cluster - String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); - User user = userProvider.getCurrent(); - if (quorumAddress != null) { - Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), - quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); - Connection peerConn = ConnectionFactory.createConnection(peerConf); - try { - TokenUtil.addTokenForJob(peerConn, user, job); - } finally { - peerConn.close(); - } + String outputCluster = job.getConfiguration().get(TableOutputFormat.OUTPUT_CLUSTER); + if (!StringUtils.isBlank(outputCluster)) { + addTokenForJob(() -> { + URI uri; + try { + uri = new URI(outputCluster); + } catch (URISyntaxException e) { + throw new IOException("malformed connection uri: " + outputCluster + + ", please check config " + TableOutputFormat.OUTPUT_CLUSTER, e); + } + return ConnectionFactory.createConnection(uri, job.getConfiguration()); + }, user, job); } - - Connection conn = ConnectionFactory.createConnection(job.getConfiguration()); - try { - TokenUtil.addTokenForJob(conn, user, job); - } finally { - conn.close(); + String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); + if (!StringUtils.isBlank(quorumAddress)) { + addTokenForJob(() -> { + Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), + quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); + return ConnectionFactory.createConnection(peerConf, user); + }, user, job); } + // init credentials for source cluster + addTokenForJob(() -> ConnectionFactory.createConnection(job.getConfiguration()), user, job); } catch (InterruptedException ie) { LOG.info("Interrupted obtaining user authentication token"); Thread.currentThread().interrupt(); @@ -489,15 +507,24 @@ public static void initCredentials(Job job) throws IOException { * @throws IOException When the authentication token cannot be obtained. */ public static void initCredentialsForCluster(Job job, Configuration conf) throws IOException { + initCredentialsForCluster(job, conf, null); + } + + /** + * Obtain an authentication token, for the specified cluster, on behalf of the current user and + * add it to the credentials for the given map reduce job. + * @param job The job that requires the permission. + * @param conf The configuration to use in connecting to the peer cluster + * @param uri The connection uri for the given peer cluster + * @throws IOException When the authentication token cannot be obtained. + */ + public static void initCredentialsForCluster(Job job, Configuration conf, URI uri) + throws IOException { UserProvider userProvider = UserProvider.instantiate(conf); if (userProvider.isHBaseSecurityEnabled()) { try { - Connection peerConn = ConnectionFactory.createConnection(conf); - try { - TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); - } finally { - peerConn.close(); - } + addTokenForJob(() -> ConnectionFactory.createConnection(uri, conf), + userProvider.getCurrent(), job); } catch (InterruptedException e) { LOG.info("Interrupted obtaining user authentication token"); Thread.interrupted(); @@ -549,7 +576,7 @@ public static void initTableReducerJob(String table, Class reducer, Job job, Class partitioner) throws IOException { - initTableReducerJob(table, reducer, job, partitioner, null); + initTableReducerJob(table, reducer, job, partitioner, (URI) null); } /** @@ -570,7 +597,11 @@ public static void initTableReducerJob(String table, Class such as server,server2,server3:2181:/hbase. * @throws IOException When determining the region count fails. + * @deprecated Since 3.0.0, will be removed in 4.0.0. Use + * {@link #initTableReducerJob(String, Class, Job, Class, URI)} instead, where we use + * the connection uri to specify the target cluster. */ + @Deprecated public static void initTableReducerJob(String table, Class reducer, Job job, Class partitioner, String quorumAddress) throws IOException { initTableReducerJob(table, reducer, job, partitioner, quorumAddress, true); @@ -596,23 +627,78 @@ public static void initTableReducerJob(String table, Class reducer, Job job, Class partitioner, String quorumAddress, boolean addDependencyJars) throws IOException { + initTableReducerJob(table, reducer, job, partitioner, () -> { + // If passed a quorum/ensemble address, pass it on to TableOutputFormat. + if (quorumAddress != null) { + // Calling this will validate the format + ZKConfig.validateClusterKey(quorumAddress); + job.getConfiguration().set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); + } + }, addDependencyJars); + } + + /** + * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. Make sure the passed job is carrying all + * necessary HBase configuration. + * @param partitioner Partitioner to use. Pass null to use default partitioner. + * @param outputCluster The HBase cluster you want to write to. Default is null which means output + * to the same cluster you read from, i.e, the cluster when initializing by + * the job's Configuration instance. + * @throws IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, Class reducer, + Job job, Class partitioner, URI outputCluster) throws IOException { + initTableReducerJob(table, reducer, job, partitioner, outputCluster, true); + } + + /** + * Use this before submitting a TableReduce job. It will appropriately set up the JobConf. + * @param table The output table. + * @param reducer The reducer class to use. + * @param job The current job to adjust. Make sure the passed job is carrying all + * necessary HBase configuration. + * @param partitioner Partitioner to use. Pass null to use default partitioner. + * @param outputCluster The HBase cluster you want to write to. Default is null which means + * output to the same cluster you read from, i.e, the cluster when + * initializing by the job's Configuration instance. + * @param addDependencyJars upload HBase jars and jars for any of the configured job classes via + * the distributed cache (tmpjars). + * @throws IOException When determining the region count fails. + */ + public static void initTableReducerJob(String table, Class reducer, + Job job, Class partitioner, URI outputCluster, boolean addDependencyJars) throws IOException { + initTableReducerJob(table, reducer, job, partitioner, () -> { + if (outputCluster != null) { + ConnectionRegistryFactory.validate(outputCluster); + job.getConfiguration().set(TableOutputFormat.OUTPUT_CLUSTER, outputCluster.toString()); + } + }, addDependencyJars); + } + + private static void initTableReducerJob(String table, Class reducer, + Job job, Class partitioner, IOExceptionRunnable setOutputCluster, boolean addDependencyJars) + throws IOException { Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(TableOutputFormat.class); - if (reducer != null) job.setReducerClass(reducer); + if (reducer != null) { + job.setReducerClass(reducer); + } conf.set(TableOutputFormat.OUTPUT_TABLE, table); conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName()); - // If passed a quorum/ensemble address, pass it on to TableOutputFormat. - if (quorumAddress != null) { - // Calling this will validate the format - ZKConfig.validateClusterKey(quorumAddress); - conf.set(TableOutputFormat.QUORUM_ADDRESS, quorumAddress); - } + setOutputCluster.run(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); if (partitioner == HRegionPartitioner.class) { @@ -853,9 +939,10 @@ public static void addDependencyJarsForClasses(Configuration conf, Class... c } jars.add(path.toString()); } - if (jars.isEmpty()) return; - - conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[jars.size()]))); + if (jars.isEmpty()) { + return; + } + conf.set("tmpjars", jars.stream().collect(Collectors.joining(","))); } /** diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index a8ec67c9b237..e84d5234b1fc 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -53,13 +56,25 @@ public class TableOutputFormat extends OutputFormat implemen /** Job parameter that specifies the output table. */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; + /** + * Optional job parameter to specify a peer cluster. Used specifying remote cluster when copying + * between hbase clusters (the source is picked up from hbase-site.xml). + * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, + * Class, java.net.URI) + */ + public static final String OUTPUT_CLUSTER = "hbase.mapred.outputcluster"; + /** * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. For * keys matching this prefix, the prefix is stripped, and the value is set in the configuration * with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" would be set in the * configuration as "key1 = value1". Use this to set properties which should only be applied to * the {@code TableOutputFormat} configuration and not the input configuration. + * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for + * specifying configurations any more, you can specify any configuration with the + * connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter. */ + @Deprecated public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output."; /** @@ -67,10 +82,19 @@ public class TableOutputFormat extends OutputFormat implemen * between hbase clusters (the source is picked up from hbase-site.xml). * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, * Class, String) + * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #OUTPUT_CLUSTER} to specify the + * peer cluster instead. */ + @Deprecated public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum"; - /** Optional job parameter to specify peer cluster's ZK client port */ + /** + * Optional job parameter to specify peer cluster's ZK client port. + * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for + * specifying configurations any more, you can specify any configuration with the + * connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter. + */ + @Deprecated public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port"; /** @@ -91,6 +115,23 @@ public class TableOutputFormat extends OutputFormat implemen /** The configuration. */ private Configuration conf = null; + private static Connection createConnection(Configuration conf) throws IOException { + String outputCluster = conf.get(OUTPUT_CLUSTER); + if (!StringUtils.isBlank(outputCluster)) { + URI uri; + try { + uri = new URI(outputCluster); + } catch (URISyntaxException e) { + throw new IOException( + "malformed connection uri: " + outputCluster + ", please check config " + OUTPUT_CLUSTER, + e); + } + return ConnectionFactory.createConnection(uri, conf); + } else { + return ConnectionFactory.createConnection(conf); + } + } + /** * Writes the reducer output to an HBase table. */ @@ -99,13 +140,9 @@ protected class TableRecordWriter extends RecordWriter { private Connection connection; private BufferedMutator mutator; - /** - * - * - */ public TableRecordWriter() throws IOException { + this.connection = createConnection(conf); String tableName = conf.get(OUTPUT_TABLE); - this.connection = ConnectionFactory.createConnection(conf); this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); LOG.info("Created table instance for " + tableName); } @@ -175,8 +212,7 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted hConf = context.getConfiguration(); } - try (Connection connection = ConnectionFactory.createConnection(hConf); - Admin admin = connection.getAdmin()) { + try (Connection connection = createConnection(hConf); Admin admin = connection.getAdmin()) { TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); if (!admin.tableExists(tableName)) { throw new TableNotFoundException( diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index d83fa1d52522..36422b6e9f4a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mapreduce.replication; import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -60,6 +62,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; @@ -210,13 +213,18 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) final InputSplit tableSplit = context.getInputSplit(); - String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); - Configuration peerConf = - HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX); - + String peerQuorumAddress = conf.get(NAME + ".peerQuorumAddress"); + URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress); + Configuration peerConf; + if (connectionUri != null) { + peerConf = HBaseConfiguration.create(conf); + } else { + peerConf = + HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); + } String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString()); TableName peerTableName = TableName.valueOf(peerName); - replicatedConnection = ConnectionFactory.createConnection(peerConf); + replicatedConnection = ConnectionFactory.createConnection(connectionUri, peerConf); replicatedTable = replicatedConnection.getTable(peerTableName); scan.withStartRow(value.getRow()); @@ -408,10 +416,22 @@ public boolean isAborted() { } } + private Configuration applyURIConf(Configuration conf, URI uri) { + Configuration peerConf = HBaseConfiguration.subset(conf, PEER_CONFIG_PREFIX); + HBaseConfiguration.merge(peerConf, conf); + Strings.applyURIQueriesToConf(uri, peerConf); + return peerConf; + } + private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) throws IOException { - Configuration peerConf = - HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); + URI uri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress); + Configuration peerConf; + if (uri != null) { + peerConf = applyURIConf(conf, uri); + } else { + peerConf = HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); + } FileSystem.setDefaultUri(peerConf, peerFSAddress); CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); FileSystem fs = FileSystem.get(peerConf); @@ -526,16 +546,24 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); } - Configuration peerClusterConf; + Configuration peerClusterBaseConf; if (peerId != null) { assert peerConfigPair != null; - peerClusterConf = peerConfigPair.getSecond(); + peerClusterBaseConf = peerConfigPair.getSecond(); + } else { + peerClusterBaseConf = conf; + } + Configuration peerClusterConf; + URI uri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress); + if (uri != null) { + peerClusterConf = new Configuration(peerClusterBaseConf); + applyURIConf(peerClusterConf, uri); } else { - peerClusterConf = - HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); + peerClusterConf = HBaseConfiguration.createClusterConf(peerClusterBaseConf, peerQuorumAddress, + PEER_CONFIG_PREFIX); } // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); + TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf, uri); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); @@ -775,6 +803,9 @@ public boolean doCommandLine(final String[] args) { } private boolean isPeerQuorumAddress(String cmd) { + if (ConnectionRegistryFactory.tryParseAsConnectionURI(cmd) != null) { + return true; + } try { ZKConfig.validateClusterKey(cmd); } catch (IOException e) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableToPeerClusterTestBase.java similarity index 76% rename from hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java rename to hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableToPeerClusterTestBase.java index f483e00c9177..d9219c9420f4 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableToPeerClusterTestBase.java @@ -19,32 +19,23 @@ import static org.junit.Assert.assertFalse; -import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; -import org.junit.experimental.categories.Category; /** * Test CopyTable between clusters */ -@Category({ MapReduceTests.class, LargeTests.class }) -public class TestCopyTableToPeerCluster extends CopyTableTestBase { +public abstract class CopyTableToPeerClusterTestBase extends CopyTableTestBase { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestCopyTableToPeerCluster.class); + protected static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); - private static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil(); - - private static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); + protected static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil(); @BeforeClass public static void beforeClass() throws Exception { @@ -80,7 +71,7 @@ protected void dropTargetTable(TableName tableName) throws Exception { @Override protected String[] getPeerClusterOptions() throws Exception { - return new String[] { "--peer.adr=" + UTIL2.getClusterKey() }; + return new String[] { "--peer.uri=" + UTIL2.getRpcConnnectionURI() }; } /** @@ -118,9 +109,9 @@ public void testBulkLoadNotSupported() throws Exception { TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); try (Table t1 = UTIL1.createTable(tableName1, FAMILY_A); Table t2 = UTIL2.createTable(tableName2, FAMILY_A)) { - assertFalse(runCopy(UTIL1.getConfiguration(), - new String[] { "--new.name=" + tableName2.getNameAsString(), "--bulkload", - "--peer.adr=" + UTIL2.getClusterKey(), tableName1.getNameAsString() })); + String[] args = ArrayUtils.addAll(getPeerClusterOptions(), + "--new.name=" + tableName2.getNameAsString(), "--bulkload", tableName1.getNameAsString()); + assertFalse(runCopy(UTIL1.getConfiguration(), args)); } finally { UTIL1.deleteTable(tableName1); UTIL2.deleteTable(tableName2); @@ -135,14 +126,13 @@ public void testSnapshotNotSupported() throws Exception { try (Table t1 = UTIL1.createTable(tableName1, FAMILY_A); Table t2 = UTIL2.createTable(tableName2, FAMILY_A)) { UTIL1.getAdmin().snapshot(snapshot, tableName1); - assertFalse(runCopy(UTIL1.getConfiguration(), - new String[] { "--new.name=" + tableName2.getNameAsString(), "--snapshot", - "--peer.adr=" + UTIL2.getClusterKey(), snapshot })); + String[] args = ArrayUtils.addAll(getPeerClusterOptions(), + "--new.name=" + tableName2.getNameAsString(), "--snapshot", snapshot); + assertFalse(runCopy(UTIL1.getConfiguration(), args)); } finally { UTIL1.getAdmin().deleteSnapshot(snapshot); UTIL1.deleteTable(tableName1); UTIL2.deleteTable(tableName2); } - } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithClusterKey.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithClusterKey.java new file mode 100644 index 000000000000..6ff9afda5357 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithClusterKey.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ MapReduceTests.class, LargeTests.class }) +public class TestCopyTableToPeerClusterWithClusterKey extends CopyTableToPeerClusterTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCopyTableToPeerClusterWithClusterKey.class); + + @Override + protected String[] getPeerClusterOptions() throws Exception { + return new String[] { "--peer.adr=" + UTIL2.getClusterKey() }; + } + +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithRpcUri.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithRpcUri.java new file mode 100644 index 000000000000..4e6293712ec2 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithRpcUri.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ MapReduceTests.class, LargeTests.class }) +public class TestCopyTableToPeerClusterWithRpcUri extends CopyTableToPeerClusterTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCopyTableToPeerClusterWithRpcUri.class); + + @Override + protected String[] getPeerClusterOptions() throws Exception { + return new String[] { "--peer.uri=" + UTIL2.getZkConnectionURI() }; + } + +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithZkUri.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithZkUri.java new file mode 100644 index 000000000000..720c367eb739 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerClusterWithZkUri.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ MapReduceTests.class, LargeTests.class }) +public class TestCopyTableToPeerClusterWithZkUri extends CopyTableToPeerClusterTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCopyTableToPeerClusterWithZkUri.class); + + @Override + protected String[] getPeerClusterOptions() throws Exception { + return new String[] { "--peer.uri=" + UTIL2.getRpcConnnectionURI() }; + } + +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java index d775f256ef12..2434df6adf51 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -121,11 +121,17 @@ public void testSyncTable() throws Exception { @Test public void testSyncTableToPeerCluster() throws Exception { - testSyncTable(UTIL1, UTIL2, "--sourcezkcluster=" + UTIL1.getClusterKey()); + testSyncTable(UTIL1, UTIL2, "--sourceuri=" + UTIL1.getRpcConnnectionURI()); } @Test public void testSyncTableFromSourceToPeerCluster() throws Exception { + testSyncTable(UTIL2, UTIL1, "--sourceuri=" + UTIL2.getRpcConnnectionURI(), + "--targeturi=" + UTIL1.getZkConnectionURI()); + } + + @Test + public void testSyncTableFromSourceToPeerClusterWithClusterKey() throws Exception { testSyncTable(UTIL2, UTIL1, "--sourcezkcluster=" + UTIL2.getClusterKey(), "--targetzkcluster=" + UTIL1.getClusterKey()); } @@ -185,7 +191,7 @@ public void testSyncTableIgnoreTimestampsTrue() throws Exception { writeTestData(UTIL1, sourceTableName, UTIL2, targetTableName, current - 1000, current); hashSourceTable(UTIL1, sourceTableName, testDir, "--ignoreTimestamps=true"); Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName, - testDir, "--ignoreTimestamps=true", "--sourcezkcluster=" + UTIL1.getClusterKey()); + testDir, "--ignoreTimestamps=true", "--sourceuri=" + UTIL1.getRpcConnnectionURI()); assertEqualTables(90, UTIL1, sourceTableName, UTIL2, targetTableName, true); assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java index 03cf6a441f4d..3b68787c0620 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -24,6 +24,7 @@ import java.io.Closeable; import java.io.File; +import java.net.URI; import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -287,4 +288,43 @@ public void testInitCredentialsForCluster4() throws Exception { kdc.stop(); } } + + @Test + @SuppressWarnings("unchecked") + public void testInitCredentialsForClusterUri() throws Exception { + HBaseTestingUtil util1 = new HBaseTestingUtil(); + HBaseTestingUtil util2 = new HBaseTestingUtil(); + + File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath()); + MiniKdc kdc = util1.setupMiniKdc(keytab); + try { + String username = UserGroupInformation.getLoginUser().getShortUserName(); + String userPrincipal = username + "/localhost"; + kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL); + loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath()); + + try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal); + Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) { + Configuration conf1 = util1.getConfiguration(); + Job job = Job.getInstance(conf1); + + // use Configuration from util1 and URI from util2, to make sure that we use the URI instead + // of rely on the Configuration + TableMapReduceUtil.initCredentialsForCluster(job, util1.getConfiguration(), + new URI(util2.getRpcConnnectionURI())); + + Credentials credentials = job.getCredentials(); + Collection> tokens = credentials.getAllTokens(); + assertEquals(1, tokens.size()); + + String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher()); + Token tokenForCluster = + (Token) credentials.getToken(new Text(clusterId)); + assertEquals(userPrincipal + '@' + kdc.getRealm(), + tokenForCluster.decodeIdentifier().getUsername()); + } + } finally { + kdc.stop(); + } + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java index 7044b002a5eb..db7cead8c5db 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationAdjunct.java @@ -62,8 +62,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** - * We moved some of {@link TestVerifyReplication}'s tests here because it could take too long to - * complete. In here we have miscellaneous. + * We moved some of {@link TestVerifyReplicationZkClusterKey}'s tests here because it could take too + * long to complete. In here we have miscellaneous. */ @Category({ ReplicationTests.class, LargeTests.class }) public class TestVerifyReplicationAdjunct extends TestReplicationBase { @@ -171,7 +171,7 @@ public void testHBase14905() throws Exception { assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size()); String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; - TestVerifyReplication.runVerifyReplication(args, 0, 1); + TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1); } // VerifyReplication should honor versions option @@ -237,7 +237,7 @@ public void testVersionMismatchHBase14905() throws Exception { assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size()); String[] args = new String[] { "--versions=100", PEER_ID, tableName.getNameAsString() }; - TestVerifyReplication.runVerifyReplication(args, 0, 1); + TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, 1); } finally { hbaseAdmin.enableReplicationPeer(PEER_ID); } @@ -254,7 +254,7 @@ public void testVerifyReplicationPrefixFiltering() throws Exception { waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4); String[] args = new String[] { "--row-prefixes=prefixrow,secondrow", PEER_ID, tableName.getNameAsString() }; - TestVerifyReplication.runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0); + TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH * 2, 0); } @Test @@ -317,9 +317,9 @@ public void testVerifyReplicationWithSnapshotSupport() throws Exception { "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() }; - TestVerifyReplication.runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); - TestVerifyReplication.checkRestoreTmpDir(CONF1, temPath1, 1); - TestVerifyReplication.checkRestoreTmpDir(CONF2, temPath2, 1); + TestVerifyReplicationZkClusterKey.runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); + TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 1); + TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 1); Scan scan = new Scan(); ResultScanner rs = htable2.getScanner(scan); @@ -347,9 +347,9 @@ public void testVerifyReplicationWithSnapshotSupport() throws Exception { "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() }; - TestVerifyReplication.runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); - TestVerifyReplication.checkRestoreTmpDir(CONF1, temPath1, 2); - TestVerifyReplication.checkRestoreTmpDir(CONF2, temPath2, 2); + TestVerifyReplicationZkClusterKey.runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); + TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF1, temPath1, 2); + TestVerifyReplicationZkClusterKey.checkRestoreTmpDir(CONF2, temPath2, 2); } @AfterClass diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRpcConnectionUri.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRpcConnectionUri.java new file mode 100644 index 000000000000..3e603ec41ac8 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRpcConnectionUri.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestVerifyReplicationRpcConnectionUri extends VerifyReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyReplicationRpcConnectionUri.class); + + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + return util.getRpcConnnectionURI(); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationZkClusterKey.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationZkClusterKey.java new file mode 100644 index 000000000000..718cba231ff4 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationZkClusterKey.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestVerifyReplicationZkClusterKey extends VerifyReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyReplicationZkClusterKey.class); + + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + return util.getClusterKey(); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationZkConnectionUri.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationZkConnectionUri.java new file mode 100644 index 000000000000..046d2d06664c --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationZkConnectionUri.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestVerifyReplicationZkConnectionUri extends VerifyReplicationTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyReplicationZkConnectionUri.class); + + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + return util.getZkConnectionURI(); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java similarity index 94% rename from hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java rename to hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java index c7b0ed4c4b05..e263076677a5 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/VerifyReplicationTestBase.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -52,8 +51,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -62,22 +59,16 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestVerifyReplication extends TestReplicationBase { +public abstract class VerifyReplicationTestBase extends TestReplicationBase { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestVerifyReplication.class); - - private static final Logger LOG = LoggerFactory.getLogger(TestVerifyReplication.class); + private static final Logger LOG = + LoggerFactory.getLogger(TestVerifyReplicationZkClusterKey.class); private static final String PEER_ID = "2"; private static final TableName peerTableName = TableName.valueOf("peerTest"); @@ -86,14 +77,6 @@ public class TestVerifyReplication extends TestReplicationBase { @Rule public TestName name = new TestName(); - @Override - protected String getClusterKey(HBaseTestingUtil util) throws Exception { - // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster - // key, as in this test we will pass the cluster key config in peer config directly to - // VerifyReplication job. - return util.getClusterKey(); - } - @Before public void setUp() throws Exception { cleanUp(); @@ -268,7 +251,7 @@ public void testVerifyRepJobWithQuorumAddress() throws Exception { runSmallBatchTest(); // with a quorum address (a cluster key) - String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() }; + String[] args = new String[] { getClusterKey(UTIL2), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); Scan scan = new Scan(); @@ -313,7 +296,7 @@ public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Excepti String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), + "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); checkRestoreTmpDir(CONF1, tmpPath1, 1); @@ -343,7 +326,7 @@ public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Excepti args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), + "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), tableName.getNameAsString() }; runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); checkRestoreTmpDir(CONF1, tmpPath1, 2); @@ -385,7 +368,7 @@ public void testVerifyRepJobWithPeerTableName() throws Exception { // with a peerTableName along with quorum address (a cluster key) String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(), - UTIL2.getClusterKey(), tableName.getNameAsString() }; + getClusterKey(UTIL2), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); UTIL2.deleteTableData(peerTableName); @@ -419,7 +402,7 @@ public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Excepti "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), + "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), tableName.getNameAsString() }; runVerifyReplication(args, NB_ROWS_IN_BATCH, 0); checkRestoreTmpDir(CONF1, tmpPath1, 1); @@ -450,7 +433,7 @@ public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Excepti "--sourceSnapshotName=" + sourceSnapshotName, "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName, "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress, - "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), UTIL2.getClusterKey(), + "--peerHBaseRootAddress=" + CommonFSUtils.getRootDir(CONF2), getClusterKey(UTIL2), tableName.getNameAsString() }; runVerifyReplication(args, 0, NB_ROWS_IN_BATCH); checkRestoreTmpDir(CONF1, tmpPath1, 2); @@ -479,7 +462,7 @@ public void testVerifyReplicationThreadedRecompares() throws Exception { String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3", "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(), - UTIL2.getClusterKey(), tableName.getNameAsString() }; + getClusterKey(UTIL2), tableName.getNameAsString() }; Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); assertEquals( counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); @@ -523,7 +506,7 @@ public void testFailsRemainingComparesAfterShutdown() throws Exception { */ String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1", "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(), - UTIL2.getClusterKey(), tableName.getNameAsString() }; + getClusterKey(UTIL2), tableName.getNameAsString() }; Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); assertEquals( @@ -561,7 +544,7 @@ public void testVerifyReplicationSynchronousRecompares() throws Exception { htable1.put(put); String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1", - "--peerTableName=" + peerTableName.getNameAsString(), UTIL2.getClusterKey(), + "--peerTableName=" + peerTableName.getNameAsString(), getClusterKey(UTIL2), tableName.getNameAsString() }; Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); assertEquals(