From 902bc3ed70a332db97d07e41f38fe4bd2686990d Mon Sep 17 00:00:00 2001 From: chenglei Date: Wed, 22 Jun 2022 19:38:48 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"HBASE-27062=20ThreadPool=20is=20unnec?= =?UTF-8?q?essary=20in=20HBaseInterClusterReplication=E2=80=A6=20(#4463)"?= =?UTF-8?q?=20(#4559)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protobuf/ReplicationProtobufUtil.java | 11 +- .../HBaseInterClusterReplicationEndpoint.java | 211 ++++++++---------- .../replication/SyncReplicationTestBase.java | 9 +- .../replication/TestReplicationEndpoint.java | 13 +- .../regionserver/TestReplicator.java | 35 ++- .../TestSerialReplicationEndpoint.java | 11 +- 6 files changed, 136 insertions(+), 154 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java index cfdf0e12c850..c2e96ead6f73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -29,6 +28,7 @@ import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -37,7 +37,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -53,12 +52,12 @@ public class ReplicationProtobufUtil { * @param sourceBaseNamespaceDir Path to source cluster base namespace directory * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory */ - public static CompletableFuture replicateWALEntry( - AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId, - Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) { + public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, + int timeout) throws IOException { Pair p = buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); - return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout); + FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 39e68bf9eb44..cec360a4c97e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -29,11 +29,18 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -50,7 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.ipc.RemoteException; @@ -58,8 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; -import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating @@ -76,6 +82,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final Logger LOG = LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); + private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; + /** Drop edits for tables that been deleted from the replication source and target */ public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = "hbase.replication.drop.on.deleted.table"; @@ -89,22 +97,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; + // Amount of time for shutdown to wait for all tasks to complete + private long maxTerminationWait; // Size limit for replication RPCs, in bytes private int replicationRpcLimit; // Metrics for this source private MetricsSource metrics; private boolean peersSelected = false; private String replicationClusterId = ""; + private ThreadPoolExecutor exec; private int maxThreads; private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; + private Abortable abortable; private boolean dropOnDeletedTables; private boolean dropOnDeletedColumnFamilies; private boolean isSerial = false; // Initialising as 0 to guarantee at least one logging message private long lastSinkFetchTime = 0; - private volatile boolean stopping = false; @Override public void init(Context context) throws IOException { @@ -113,11 +124,20 @@ public void init(Context context) throws IOException { this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); + // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator + // tasks to terminate when doStop() is called. + long maxTerminationWaitMultiplier = this.conf.getLong( + "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); + this.maxTerminationWait = maxTerminationWaitMultiplier + * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); + this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); + this.abortable = ctx.getAbortable(); // Set the size limit for replication RPCs to 95% of the max request size. // We could do with less slop if we have an accurate estimate of encoded size. Being // conservative for now. @@ -374,31 +394,30 @@ List> filterNotExistColumnFamilyEdits(final List> oldEnt return entryList; } - private long parallelReplicate(ReplicateContext replicateContext, List> batches) - throws IOException { - List> futures = - new ArrayList>(batches.size()); + private long parallelReplicate(CompletionService pool, ReplicateContext replicateContext, + List> batches) throws IOException { + int futures = 0; for (int i = 0; i < batches.size(); i++) { List entries = batches.get(i); - if (entries.isEmpty()) { - continue; - } - if (LOG.isTraceEnabled()) { - LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), - replicateContext.getSize()); + if (!entries.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), + replicateContext.getSize()); + } + // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource + pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); + futures++; } - // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource - futures.add(asyncReplicate(entries, i, replicateContext.getTimeout())); } IOException iox = null; long lastWriteTime = 0; - - for (CompletableFuture f : futures) { + for (int i = 0; i < futures; i++) { try { // wait for all futures, remove successful parts // (only the remaining parts will be retried) - int index = FutureUtils.get(f); + Future f = pool.take(); + int index = f.get(); List batch = batches.get(index); batches.set(index, Collections.emptyList()); // remove successful batch // Find the most recent write time in the batch @@ -406,10 +425,12 @@ private long parallelReplicate(ReplicateContext replicateContext, List lastWriteTime) { lastWriteTime = writeTime; } - } catch (IOException e) { - iox = e; - } catch (RuntimeException e) { - iox = new IOException(e); + } catch (InterruptedException ie) { + iox = new IOException(ie); + } catch (ExecutionException ee) { + iox = ee.getCause() instanceof IOException + ? (IOException) ee.getCause() + : new IOException(ee.getCause()); } } if (iox != null) { @@ -424,6 +445,7 @@ private long parallelReplicate(ReplicateContext replicateContext, List pool = new ExecutorCompletionService<>(this.exec); int sleepMultiplier = 1; if (!peersSelected && this.isRunning()) { @@ -446,7 +468,7 @@ public boolean replicate(ReplicateContext replicateContext) { } List> batches = createBatches(replicateContext.getEntries()); - while (this.isRunning() && !this.stopping) { + while (this.isRunning() && !exec.isShutdown()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; @@ -455,7 +477,7 @@ public boolean replicate(ReplicateContext replicateContext) { } try { // replicate the batches to sink side. - parallelReplicate(replicateContext, batches); + parallelReplicate(pool, replicateContext, batches); return true; } catch (IOException ioe) { if (ioe instanceof RemoteException) { @@ -510,117 +532,82 @@ protected boolean isPeerEnabled() { @Override protected void doStop() { - // Allow currently running replication tasks to finish - this.stopping = true; disconnect(); // don't call super.doStop() + // Allow currently running replication tasks to finish + exec.shutdown(); + try { + exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + // Abort if the tasks did not terminate in time + if (!exec.isTerminated()) { + String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + + "Aborting to prevent Replication from deadlocking. See HBASE-16081."; + abortable.abort(errMsg, new IOException(errMsg)); + } notifyStopped(); } - protected CompletableFuture replicateEntries(List entries, int batchIndex, - int timeout) { - int entriesHashCode = System.identityHashCode(entries); - if (LOG.isTraceEnabled()) { - long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); - LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(), - entriesHashCode, entries.size(), size, replicationClusterId); - } + protected int replicateEntries(List entries, int batchIndex, int timeout) + throws IOException { SinkPeer sinkPeer = null; - final CompletableFuture resultCompletableFuture = new CompletableFuture(); try { + int entriesHashCode = System.identityHashCode(entries); + if (LOG.isTraceEnabled()) { + long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); + LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", + logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); + } sinkPeer = getReplicationSink(); - } catch (IOException e) { - this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer); - resultCompletableFuture.completeExceptionally(e); - return resultCompletableFuture; - } - assert sinkPeer != null; - AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); - final SinkPeer sinkPeerToUse = sinkPeer; - FutureUtils.addListener( - ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout), - (response, exception) -> { - if (exception != null) { - onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse); - resultCompletableFuture.completeExceptionally(exception); - return; + AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); + try { + ReplicationProtobufUtil.replicateWALEntry(rsAdmin, + entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, + hfileArchiveDir, timeout); + if (LOG.isTraceEnabled()) { + LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); } - reportSinkSuccess(sinkPeerToUse); - resultCompletableFuture.complete(batchIndex); - }); - return resultCompletableFuture; - } - - private void onReplicateWALEntryException(int entriesHashCode, Throwable exception, - final SinkPeer sinkPeer) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception); - } - if (exception instanceof IOException) { + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); + } + throw e; + } + reportSinkSuccess(sinkPeer); + } catch (IOException ioe) { if (sinkPeer != null) { reportBadSink(sinkPeer); } + throw ioe; } + return batchIndex; } - /** - * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the - * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we - * send the next batch, until we send all entries out. - */ - private CompletableFuture serialReplicateRegionEntries( - PeekingIterator walEntryPeekingIterator, int batchIndex, int timeout) { - if (!walEntryPeekingIterator.hasNext()) { - return CompletableFuture.completedFuture(batchIndex); - } - int batchSize = 0; + private int serialReplicateRegionEntries(List entries, int batchIndex, int timeout) + throws IOException { + int batchSize = 0, index = 0; List batch = new ArrayList<>(); - while (walEntryPeekingIterator.hasNext()) { - Entry entry = walEntryPeekingIterator.peek(); + for (Entry entry : entries) { int entrySize = getEstimatedEntrySize(entry); if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { - break; + replicateEntries(batch, index++, timeout); + batch.clear(); + batchSize = 0; } - walEntryPeekingIterator.next(); batch.add(entry); batchSize += entrySize; } - - if (batchSize <= 0) { - return CompletableFuture.completedFuture(batchIndex); + if (batchSize > 0) { + replicateEntries(batch, index, timeout); } - final CompletableFuture resultCompletableFuture = new CompletableFuture(); - FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> { - if (exception != null) { - resultCompletableFuture.completeExceptionally(exception); - return; - } - if (!walEntryPeekingIterator.hasNext()) { - resultCompletableFuture.complete(batchIndex); - return; - } - FutureUtils.addListener( - serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout), - (currentResponse, currentException) -> { - if (currentException != null) { - resultCompletableFuture.completeExceptionally(currentException); - return; - } - resultCompletableFuture.complete(batchIndex); - }); - }); - return resultCompletableFuture; + return batchIndex; } - /** - * Replicate entries to peer cluster by async API. - */ - protected CompletableFuture asyncReplicate(List entries, int batchIndex, - int timeout) { + protected Callable createReplicator(List entries, int batchIndex, int timeout) { return isSerial - ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex, - timeout) - : replicateEntries(entries, batchIndex, timeout); + ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) + : () -> replicateEntries(entries, batchIndex, timeout); } private String logPeerId() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index e82d69826d89..011f0a19f0c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -268,14 +267,14 @@ protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility, new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); } if (!expectedRejection) { - FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( + ReplicationProtobufUtil.replicateWALEntry( connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, - HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); } else { try { - FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( + ReplicationProtobufUtil.replicateWALEntry( connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, - HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); fail("Should throw IOException when sync-replication state is in A or DA"); } catch (RemoteException e) { assertRejection(e.unwrapRemoteException()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 9bc632e223be..53512ec2af88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -556,16 +556,15 @@ public boolean replicate(ReplicateContext replicateContext) { } @Override - protected CompletableFuture asyncReplicate(List entries, int ordinal, - int timeout) { + protected Callable createReplicator(List entries, int ordinal, int timeout) { // Fail only once, we don't want to slow down the test. if (failedOnce) { - return CompletableFuture.completedFuture(ordinal); + return () -> ordinal; } else { failedOnce = true; - CompletableFuture future = new CompletableFuture(); - future.completeExceptionally(new IOException("Sample Exception: Failed to replicate.")); - return future; + return () -> { + throw new IOException("Sample Exception: Failed to replicate."); + }; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index c48755fb5f0d..803c4278f97e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -228,15 +228,15 @@ public boolean replicate(ReplicateContext replicateContext) { } @Override - protected CompletableFuture asyncReplicate(List entries, int ordinal, - int timeout) { - return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { + protected Callable createReplicator(List entries, int ordinal, int timeout) { + return () -> { + int batchIndex = replicateEntries(entries, ordinal, timeout); entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); - }); - + return batchIndex; + }; } } @@ -245,23 +245,20 @@ public static class FailureInjectingReplicationEndpointForTest private final AtomicBoolean failNext = new AtomicBoolean(false); @Override - protected CompletableFuture asyncReplicate(List entries, int ordinal, - int timeout) { - - if (failNext.compareAndSet(false, true)) { - return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { + protected Callable createReplicator(List entries, int ordinal, int timeout) { + return () -> { + if (failNext.compareAndSet(false, true)) { + int batchIndex = replicateEntries(entries, ordinal, timeout); entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); - }); - } else if (failNext.compareAndSet(true, false)) { - CompletableFuture future = new CompletableFuture(); - future.completeExceptionally(new ServiceException("Injected failure")); - return future; - } - return CompletableFuture.completedFuture(ordinal); - + return batchIndex; + } else if (failNext.compareAndSet(true, false)) { + throw new ServiceException("Injected failure"); + } + return ordinal; + }; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java index 5f99b88e0a45..c0eace0bbdab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java @@ -22,7 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -165,10 +165,11 @@ public boolean canReplicateToSameCluster() { } @Override - protected CompletableFuture asyncReplicate(List entries, int ordinal, - int timeout) { - entryQueue.addAll(entries); - return CompletableFuture.completedFuture(ordinal); + protected Callable createReplicator(List entries, int ordinal, int timeout) { + return () -> { + entryQueue.addAll(entries); + return ordinal; + }; } @Override