From 01ae213156c2c5b0e8977451068896ae8512adfa Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Mon, 7 Aug 2023 08:12:45 -0400 Subject: [PATCH] Revert "VerifyReplication recompare async (#5051)" This reverts commit 2b3d32288b730784c4364fae1133dcdfa1042f48. --- .../replication/VerifyReplication.java | 186 ++++-------------- .../VerifyReplicationRecompareRunnable.java | 162 --------------- .../replication/TestVerifyReplication.java | 125 +----------- ...estVerifyReplicationRecompareRunnable.java | 154 --------------- 4 files changed, 36 insertions(+), 591 deletions(-) delete mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java delete mode 100644 hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 7ed4cb87194e..c004b23dae21 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -19,12 +19,7 @@ import java.io.IOException; import java.util.Arrays; -import java.util.List; import java.util.UUID; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +30,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -50,7 +46,6 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; -import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; @@ -60,12 +55,12 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -89,11 +84,6 @@ public class VerifyReplication extends Configured implements Tool { public final static String NAME = "verifyrep"; private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; - private static ThreadPoolExecutor reCompareExecutor = null; - int reCompareTries = 0; - int reCompareBackoffExponent = 0; - int reCompareThreads = 0; - int sleepMsBeforeReCompare = 0; long startTime = 0; long endTime = Long.MAX_VALUE; int batch = -1; @@ -104,6 +94,7 @@ public class VerifyReplication extends Configured implements Tool { String peerId = null; String peerQuorumAddress = null; String rowPrefixes = null; + int sleepMsBeforeReCompare = 0; boolean verbose = false; boolean includeDeletedCells = false; // Source table snapshot name @@ -133,12 +124,7 @@ public enum Counters { BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, - CONTENT_DIFFERENT_ROWS, - RECOMPARES, - MAIN_THREAD_RECOMPARES, - SOURCE_ROW_CHANGED, - PEER_ROW_CHANGED, - FAILED_RECOMPARE + CONTENT_DIFFERENT_ROWS } private Connection sourceConnection; @@ -147,9 +133,6 @@ public enum Counters { private Table replicatedTable; private ResultScanner replicatedScanner; private Result currentCompareRowInPeerTable; - private Scan tableScan; - private int reCompareTries; - private int reCompareBackoffExponent; private int sleepMsBeforeReCompare; private String delimiter = ""; private boolean verbose = false; @@ -167,12 +150,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) throws IOException { if (replicatedScanner == null) { Configuration conf = context.getConfiguration(); - reCompareTries = conf.getInt(NAME + ".recompareTries", 0); - reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1); sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0); - if (sleepMsBeforeReCompare > 0) { - reCompareTries = Math.max(reCompareTries, 1); - } delimiter = conf.get(NAME + ".delimiter", ""); verbose = conf.getBoolean(NAME + ".verbose", false); batch = conf.getInt(NAME + ".batch", -1); @@ -201,12 +179,9 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) if (versions >= 0) { scan.setMaxVersions(versions); } - int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0); - reCompareExecutor = buildReCompareExecutor(reCompareThreads, context); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); sourceConnection = ConnectionFactory.createConnection(conf); sourceTable = sourceConnection.getTable(tableName); - tableScan = scan; final InputSplit tableSplit = context.getInputSplit(); @@ -251,7 +226,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) while (true) { if (currentCompareRowInPeerTable == null) { // reach the region end of peer table, row only in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); @@ -265,77 +240,55 @@ public void map(ImmutableBytesWritable row, final Result value, Context context) "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter); } } catch (Exception e) { - logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value, - currentCompareRowInPeerTable); + logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); } currentCompareRowInPeerTable = replicatedScanner.next(); break; } else if (rowCmpRet < 0) { // row only exists in source table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); break; } else { // row only exists in peer table - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } } } - @SuppressWarnings("FutureReturnValueIgnored") - private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row, - Result replicatedRow) { - byte[] rowKey = getRow(row, replicatedRow); - if (reCompareTries == 0) { - context.getCounter(counter).increment(1); - context.getCounter(Counters.BADROWS).increment(1); - LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter); - return; - } - - VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, - row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, - reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); - - if (reCompareExecutor == null) { - runnable.run(); - return; - } - - reCompareExecutor.submit(runnable); - } - - @Override - protected void cleanup(Context context) { - if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) { - reCompareExecutor.shutdown(); + private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { + if (sleepMsBeforeReCompare > 0) { + Threads.sleep(sleepMsBeforeReCompare); try { - boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); - if (!terminated) { - List queue = reCompareExecutor.shutdownNow(); - for (Runnable runnable : queue) { - ((VerifyReplicationRecompareRunnable) runnable).fail(); - } - - terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); - - if (!terminated) { - int activeCount = Math.max(1, reCompareExecutor.getActiveCount()); - LOG.warn("Found {} possible recompares still running in the executable" - + " incrementing BADROWS and FAILED_RECOMPARE", activeCount); - context.getCounter(Counters.BADROWS).increment(activeCount); - context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount); + Result sourceResult = sourceTable.get(new Get(row.getRow())); + Result replicatedResult = replicatedTable.get(new Get(row.getRow())); + Result.compareResults(sourceResult, replicatedResult, false); + if (!sourceResult.isEmpty()) { + context.getCounter(Counters.GOODROWS).increment(1); + if (verbose) { + LOG.info("Good row key (with recompare): " + delimiter + + Bytes.toStringBinary(row.getRow()) + delimiter); } } - } catch (InterruptedException e) { - throw new RuntimeException("Failed to await executor termination in cleanup", e); + return; + } catch (Exception e) { + LOG.error("recompare fail after sleep, rowkey=" + delimiter + + Bytes.toStringBinary(row.getRow()) + delimiter); } } + context.getCounter(counter).increment(1); + context.getCounter(Counters.BADROWS).increment(1); + LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + + delimiter); + } + + @Override + protected void cleanup(Context context) { if (replicatedScanner != null) { try { while (currentCompareRowInPeerTable != null) { - logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, + logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, currentCompareRowInPeerTable); currentCompareRowInPeerTable = replicatedScanner.next(); } @@ -471,10 +424,6 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); - conf.setInt(NAME + ".recompareTries", reCompareTries); - conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent); - conf.setInt(NAME + ".recompareThreads", reCompareThreads); - // Set Snapshot specific parameters if (peerSnapshotName != null) { conf.set(NAME + ".peerSnapshotName", peerSnapshotName); @@ -542,15 +491,6 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce return job; } - protected static byte[] getRow(Result sourceResult, Result replicatedResult) { - if (sourceResult != null) { - return sourceResult.getRow(); - } else if (replicatedResult != null) { - return replicatedResult.getRow(); - } - throw new RuntimeException("Both sourceResult and replicatedResult are null!"); - } - private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { if (rowPrefixes != null && !rowPrefixes.isEmpty()) { String[] rowPrefixArray = rowPrefixes.split(","); @@ -635,20 +575,11 @@ public boolean doCommandLine(final String[] args) { continue; } - final String deprecatedSleepToReCompareKey = "--recomparesleep="; - final String sleepToReCompareKey = "--recompareSleep="; - if (cmd.startsWith(deprecatedSleepToReCompareKey)) { - LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0." - + " Use --recompareSleep instead."); - sleepMsBeforeReCompare = - Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length())); - continue; - } + final String sleepToReCompareKey = "--recomparesleep="; if (cmd.startsWith(sleepToReCompareKey)) { sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); continue; } - final String verboseKey = "--verbose"; if (cmd.startsWith(verboseKey)) { verbose = true; @@ -697,25 +628,6 @@ public boolean doCommandLine(final String[] args) { continue; } - final String reCompareThreadArgs = "--recompareThreads="; - if (cmd.startsWith(reCompareThreadArgs)) { - reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length())); - continue; - } - - final String reCompareTriesKey = "--recompareTries="; - if (cmd.startsWith(reCompareTriesKey)) { - reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length())); - continue; - } - - final String reCompareBackoffExponentKey = "--recompareBackoffExponent="; - if (cmd.startsWith(reCompareBackoffExponentKey)) { - reCompareBackoffExponent = - Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length())); - continue; - } - if (cmd.startsWith("--")) { printUsage("Invalid argument '" + cmd + "'"); return false; @@ -792,8 +704,7 @@ private static void printUsage(final String errorMsg) { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: verifyrep [--starttime=X]" - + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] " - + "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]" + + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] "); @@ -809,14 +720,8 @@ private static void printUsage(final String errorMsg) { System.err.println(" families comma-separated list of families to copy"); System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); System.err.println(" delimiter the delimiter used in display around rowkey"); - System.err.println(" recompareSleep milliseconds to sleep before recompare row, " + System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + "default value is 0 which disables the recompare."); - System.err.println(" recompareThreads number of threads to run recompares in"); - System.err.println(" recompareTries number of recompare attempts before incrementing " - + "the BADROWS counter. Defaults to 1 recompare"); - System.out.println(" recompareBackoffExponent exponential multiplier to increase " - + "recompareSleep after each recompare attempt, " - + "default value is 0 which results in a constant sleep time"); System.err.println(" verbose logs row keys of good rows"); System.err.println(" peerTableName Peer Table Name"); System.err.println(" sourceSnapshotName Source Snapshot Name"); @@ -883,27 +788,6 @@ private static void printUsage(final String errorMsg) { + "2181:/cluster-b \\\n" + " TestTable"); } - private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) { - if (maxThreads == 0) { - return null; - } - - return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), - buildRejectedReComparePolicy(context)); - } - - private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) { - return new CallerRunsPolicy() { - @Override - public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { - LOG.debug("Re-comparison execution rejected. Running in main thread."); - context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1); - // will run in the current thread - super.rejectedExecution(runnable, e); - } - }; - } - @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java deleted file mode 100644 index 47f5e606b846..000000000000 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce.replication; - -import java.io.IOException; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@InterfaceAudience.Private -public class VerifyReplicationRecompareRunnable implements Runnable { - - private static final Logger LOG = - LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class); - - private final Mapper.Context context; - private final VerifyReplication.Verifier.Counters originalCounter; - private final String delimiter; - private final byte[] row; - private final Scan tableScan; - private final Table sourceTable; - private final Table replicatedTable; - - private final int reCompareTries; - private final int sleepMsBeforeReCompare; - private final int reCompareBackoffExponent; - private final boolean verbose; - - private Result sourceResult; - private Result replicatedResult; - - public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult, - Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter, - Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries, - int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) { - this.context = context; - this.sourceResult = sourceResult; - this.replicatedResult = replicatedResult; - this.originalCounter = originalCounter; - this.delimiter = delimiter; - this.tableScan = tableScan; - this.sourceTable = sourceTable; - this.replicatedTable = replicatedTable; - this.reCompareTries = reCompareTries; - this.sleepMsBeforeReCompare = sleepMsBeforeReCompare; - this.reCompareBackoffExponent = reCompareBackoffExponent; - this.verbose = verbose; - this.row = VerifyReplication.getRow(sourceResult, replicatedResult); - } - - @Override - public void run() { - Get get = new Get(row); - get.setCacheBlocks(tableScan.getCacheBlocks()); - get.setFilter(tableScan.getFilter()); - - int sleepMs = sleepMsBeforeReCompare; - int tries = 0; - - while (++tries <= reCompareTries) { - context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1); - - try { - Thread.sleep(sleepMs); - } catch (InterruptedException e) { - LOG.warn("Sleeping interrupted, incrementing bad rows and aborting"); - incrementOriginalAndBadCounter(); - context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); - Thread.currentThread().interrupt(); - return; - } - - try { - if (fetchLatestRows(get) && matches(sourceResult, replicatedResult, null)) { - if (verbose) { - LOG.info("Good row key (with recompare): {}{}{}", delimiter, Bytes.toStringBinary(row), - delimiter); - } - context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).increment(1); - return; - } else { - context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); - } - } catch (IOException e) { - context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); - if (verbose) { - LOG.info("Got an exception during recompare for rowkey={}", Bytes.toStringBinary(row), e); - } - } - - sleepMs = sleepMs * (2 ^ reCompareBackoffExponent); - } - - LOG.error("{}, rowkey={}{}{}", originalCounter, delimiter, Bytes.toStringBinary(row), - delimiter); - incrementOriginalAndBadCounter(); - } - - public void fail() { - if (LOG.isDebugEnabled()) { - LOG.debug("Called fail on row={}", Bytes.toStringBinary(row)); - } - incrementOriginalAndBadCounter(); - context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).increment(1); - } - - private boolean fetchLatestRows(Get get) throws IOException { - Result sourceResult = sourceTable.get(get); - Result replicatedResult = replicatedTable.get(get); - - boolean sourceMatches = matches(sourceResult, this.sourceResult, - VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED); - boolean replicatedMatches = matches(replicatedResult, this.replicatedResult, - VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED); - - this.sourceResult = sourceResult; - this.replicatedResult = replicatedResult; - return sourceMatches && replicatedMatches; - } - - private boolean matches(Result original, Result updated, - VerifyReplication.Verifier.Counters failCounter) { - try { - Result.compareResults(original, updated); - return true; - } catch (Exception e) { - if (failCounter != null) { - context.getCounter(failCounter).increment(1); - if (LOG.isDebugEnabled()) { - LOG.debug("{} for rowkey={}", failCounter, Bytes.toStringBinary(row)); - } - } - return false; - } - } - - private void incrementOriginalAndBadCounter() { - context.getCounter(originalCounter).increment(1); - context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).increment(1); - } -} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java index bb5396b73309..4ccba1578d9a 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.junit.AfterClass; import org.junit.Before; @@ -109,7 +108,7 @@ public static void setUpBeforeClass() throws Exception { htable3 = connection2.getTable(peerTableName); } - static Counters runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) + static void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows) throws IOException, InterruptedException, ClassNotFoundException { Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args); if (job == null) { @@ -122,7 +121,6 @@ static Counters runVerifyReplication(String[] args, int expectedGoodRows, int ex job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); assertEquals(expectedBadRows, job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); - return job.getCounters(); } /** @@ -449,127 +447,6 @@ public void testVerifyRepJobWithPeerTableNameAndSnapshotSupport() throws Excepti checkRestoreTmpDir(CONF2, tmpPath2, 2); } - @Test - public void testVerifyReplicationThreadedRecompares() throws Exception { - // Populate the tables with same data - runBatchCopyTest(); - - // ONLY_IN_PEER_TABLE_ROWS - Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); - put.addColumn(noRepfamName, row, row); - htable3.put(put); - - // CONTENT_DIFFERENT_ROWS - put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); - put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); - htable3.put(put); - - // ONLY_IN_SOURCE_TABLE_ROWS - put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); - put.addColumn(noRepfamName, row, row); - htable1.put(put); - - String[] args = new String[] { "--recompareThreads=10", "--recompareTries=3", - "--recompareSleep=1", "--peerTableName=" + peerTableName.getNameAsString(), - UTIL2.getClusterKey(), tableName.getNameAsString() }; - Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); - assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), - 9); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), - 1); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), - 1); - assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) - .getValue(), 1); - } - - @Test - public void testFailsRemainingComparesAfterShutdown() throws Exception { - // Populate the tables with same data - runBatchCopyTest(); - - // ONLY_IN_PEER_TABLE_ROWS - Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); - put.addColumn(noRepfamName, row, row); - htable3.put(put); - - // CONTENT_DIFFERENT_ROWS - put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); - put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); - htable3.put(put); - - // ONLY_IN_SOURCE_TABLE_ROWS - put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); - put.addColumn(noRepfamName, row, row); - htable1.put(put); - - /** - * recompareSleep is set to exceed how long we wait on - * {@link VerifyReplication#reCompareExecutor} termination when doing cleanup. this allows us to - * test the counter-incrementing logic if the executor still hasn't terminated after the call to - * shutdown and awaitTermination - */ - String[] args = new String[] { "--recompareThreads=1", "--recompareTries=1", - "--recompareSleep=121000", "--peerTableName=" + peerTableName.getNameAsString(), - UTIL2.getClusterKey(), tableName.getNameAsString() }; - - Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 3); - assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), - 3); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), - 1); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), - 1); - assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) - .getValue(), 1); - } - - @Test - public void testVerifyReplicationSynchronousRecompares() throws Exception { - // Populate the tables with same data - runBatchCopyTest(); - - // ONLY_IN_PEER_TABLE_ROWS - Put put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH))); - put.addColumn(noRepfamName, row, row); - htable3.put(put); - - // CONTENT_DIFFERENT_ROWS - put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH - 1))); - put.addColumn(noRepfamName, row, Bytes.toBytes("diff value")); - htable3.put(put); - - // ONLY_IN_SOURCE_TABLE_ROWS - put = new Put(Bytes.toBytes(Integer.toString(NB_ROWS_IN_BATCH + 1))); - put.addColumn(noRepfamName, row, row); - htable1.put(put); - - String[] args = new String[] { "--recompareTries=3", "--recompareSleep=1", - "--peerTableName=" + peerTableName.getNameAsString(), UTIL2.getClusterKey(), - tableName.getNameAsString() }; - Counters counters = runVerifyReplication(args, NB_ROWS_IN_BATCH - 1, 3); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue(), 9); - assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue(), - 9); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_PEER_TABLE_ROWS).getValue(), - 1); - assertEquals( - counters.findCounter(VerifyReplication.Verifier.Counters.CONTENT_DIFFERENT_ROWS).getValue(), - 1); - assertEquals(counters.findCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS) - .getValue(), 1); - } - @AfterClass public static void tearDownAfterClass() throws Exception { htable3.close(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java deleted file mode 100644 index 49c52fbcc3b3..000000000000 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationRecompareRunnable.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; -import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplicationRecompareRunnable; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.counters.GenericCounter; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@Category({ ReplicationTests.class, SmallTests.class }) -@RunWith(MockitoJUnitRunner.class) -public class TestVerifyReplicationRecompareRunnable { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestVerifyReplicationRecompareRunnable.class); - - @Mock - private Table sourceTable; - - @Mock - private Table replicatedTable; - - @Mock - private Mapper.Context context; - - static Result genResult(int cols) { - KeyValue[] kvs = new KeyValue[cols]; - - for (int i = 0; i < cols; ++i) { - kvs[i] = - new KeyValue(genBytes(), genBytes(), genBytes(), System.currentTimeMillis(), genBytes()); - } - - return Result.create(kvs); - } - - static byte[] genBytes() { - return Bytes.toBytes(ThreadLocalRandom.current().nextInt()); - } - - @Before - public void setUp() { - for (VerifyReplication.Verifier.Counters counter : VerifyReplication.Verifier.Counters - .values()) { - Counter emptyCounter = new GenericCounter(counter.name(), counter.name()); - when(context.getCounter(counter)).thenReturn(emptyCounter); - } - } - - @Test - public void itRecomparesGoodRow() throws IOException { - Result result = genResult(2); - - when(sourceTable.get(any(Get.class))).thenReturn(result); - when(replicatedTable.get(any(Get.class))).thenReturn(result); - - VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, - genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", - new Scan(), sourceTable, replicatedTable, 3, 1, 0, true); - - runnable.run(); - - assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); - assertEquals(0, - context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); - assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(1, - context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); - assertEquals(1, - context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); - assertEquals(2, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); - } - - @Test - public void itRecomparesBadRow() throws IOException { - Result replicatedResult = genResult(1); - when(sourceTable.get(any(Get.class))).thenReturn(genResult(5)); - when(replicatedTable.get(any(Get.class))).thenReturn(replicatedResult); - - VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, - genResult(5), replicatedResult, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, - "", new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); - - runnable.run(); - - assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); - assertEquals(1, - context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); - assertEquals(0, context.getCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); - assertEquals(1, - context.getCounter(VerifyReplication.Verifier.Counters.SOURCE_ROW_CHANGED).getValue()); - assertEquals(0, - context.getCounter(VerifyReplication.Verifier.Counters.PEER_ROW_CHANGED).getValue()); - assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); - } - - @Test - public void itHandlesExceptionOnRecompare() throws IOException { - when(sourceTable.get(any(Get.class))).thenThrow(new IOException("Error!")); - when(replicatedTable.get(any(Get.class))).thenReturn(genResult(5)); - - VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, - genResult(5), null, VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS, "", - new Scan(), sourceTable, replicatedTable, 1, 1, 0, true); - - runnable.run(); - - assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); - assertEquals(1, - context.getCounter(VerifyReplication.Verifier.Counters.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); - assertEquals(1, - context.getCounter(VerifyReplication.Verifier.Counters.FAILED_RECOMPARE).getValue()); - assertEquals(1, context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).getValue()); - } -}