From f5ee958eadcf1e7bc247e59c4d439744f7ef508e Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 20 Apr 2023 15:11:22 +0800 Subject: [PATCH] HBASE-27783 Addendum forward port the test improvement when backporting to branch-2 --- .../TestDisablePeerModification.java | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java index 2c038b6798ba..7b9fa7001559 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java @@ -23,8 +23,11 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -40,11 +43,17 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) @Category({ MasterTests.class, LargeTests.class }) public class TestDisablePeerModification { @@ -54,9 +63,9 @@ public class TestDisablePeerModification { private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); - private static CountDownLatch ARRIVE = new CountDownLatch(1); + private static volatile CountDownLatch ARRIVE; - private static CountDownLatch RESUME = new CountDownLatch(1); + private static volatile CountDownLatch RESUME; public static final class MockPeerStorage extends FSReplicationPeerStorage { @@ -77,6 +86,14 @@ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean ena } } + @Parameter + public boolean async; + + @Parameters(name = "{index}: async={0}") + public static List params() { + return Arrays.asList(new Object[] { true }, new Object[] { false }); + } + @BeforeClass public static void setUp() throws Exception { UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, @@ -89,18 +106,37 @@ public static void tearDown() throws IOException { UTIL.shutdownMiniCluster(); } + @Before + public void setUpBeforeTest() throws IOException { + UTIL.getAdmin().replicationPeerModificationSwitch(true, true); + } + @Test public void testDrainProcs() throws Exception { + ARRIVE = new CountDownLatch(1); + RESUME = new CountDownLatch(1); AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin(); ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); - CompletableFuture addFuture = admin.addReplicationPeer("test_peer", rpc); + CompletableFuture addFuture = admin.addReplicationPeer("test_peer_" + async, rpc); ARRIVE.await(); // we have a pending add peer procedure which has already passed the first state, let's issue a // peer modification switch request to disable peer modification and set drainProcs to true - CompletableFuture switchFuture = admin.replicationPeerModificationSwitch(false, true); + CompletableFuture switchFuture; + if (async) { + switchFuture = admin.replicationPeerModificationSwitch(false, true); + } else { + switchFuture = new CompletableFuture<>(); + ForkJoinPool.commonPool().submit(() -> { + try { + switchFuture.complete(UTIL.getAdmin().replicationPeerModificationSwitch(false, true)); + } catch (IOException e) { + switchFuture.completeExceptionally(e); + } + }); + } // sleep a while, the switchFuture should not finish yet // the sleep is necessary as we can not join on the switchFuture, so there is no stable way to