diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java index c2e96ead6f73..cfdf0e12c850 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -37,6 +37,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -52,12 +53,12 @@ public class ReplicationProtobufUtil { * @param sourceBaseNamespaceDir Path to source cluster base namespace directory * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory */ - public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, - String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, - int timeout) throws IOException { + public static CompletableFuture replicateWALEntry( + AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId, + Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) { Pair p = buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); - FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); + return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index cec360a4c97e..39e68bf9eb44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -29,18 +29,11 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -57,7 +50,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.ipc.RemoteException; @@ -65,7 +58,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; +import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating @@ -82,8 +76,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final Logger LOG = LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); - private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; - /** Drop edits for tables that been deleted from the replication source and target */ public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = "hbase.replication.drop.on.deleted.table"; @@ -97,25 +89,22 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; - // Amount of time for shutdown to wait for all tasks to complete - private long maxTerminationWait; // Size limit for replication RPCs, in bytes private int replicationRpcLimit; // Metrics for this source private MetricsSource metrics; private boolean peersSelected = false; private String replicationClusterId = ""; - private ThreadPoolExecutor exec; private int maxThreads; private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; - private Abortable abortable; private boolean dropOnDeletedTables; private boolean dropOnDeletedColumnFamilies; private boolean isSerial = false; // Initialising as 0 to guarantee at least one logging message private long lastSinkFetchTime = 0; + private volatile boolean stopping = false; @Override public void init(Context context) throws IOException { @@ -124,20 +113,11 @@ public void init(Context context) throws IOException { this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); - // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator - // tasks to terminate when doStop() is called. - long maxTerminationWaitMultiplier = this.conf.getLong( - "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); - this.maxTerminationWait = maxTerminationWaitMultiplier - * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); - this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); - this.abortable = ctx.getAbortable(); // Set the size limit for replication RPCs to 95% of the max request size. // We could do with less slop if we have an accurate estimate of encoded size. Being // conservative for now. @@ -394,30 +374,31 @@ List> filterNotExistColumnFamilyEdits(final List> oldEnt return entryList; } - private long parallelReplicate(CompletionService pool, ReplicateContext replicateContext, - List> batches) throws IOException { - int futures = 0; + private long parallelReplicate(ReplicateContext replicateContext, List> batches) + throws IOException { + List> futures = + new ArrayList>(batches.size()); for (int i = 0; i < batches.size(); i++) { List entries = batches.get(i); - if (!entries.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), - replicateContext.getSize()); - } - // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource - pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); - futures++; + if (entries.isEmpty()) { + continue; } + if (LOG.isTraceEnabled()) { + LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), + replicateContext.getSize()); + } + // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource + futures.add(asyncReplicate(entries, i, replicateContext.getTimeout())); } IOException iox = null; long lastWriteTime = 0; - for (int i = 0; i < futures; i++) { + + for (CompletableFuture f : futures) { try { // wait for all futures, remove successful parts // (only the remaining parts will be retried) - Future f = pool.take(); - int index = f.get(); + int index = FutureUtils.get(f); List batch = batches.get(index); batches.set(index, Collections.emptyList()); // remove successful batch // Find the most recent write time in the batch @@ -425,12 +406,10 @@ private long parallelReplicate(CompletionService pool, ReplicateContext if (writeTime > lastWriteTime) { lastWriteTime = writeTime; } - } catch (InterruptedException ie) { - iox = new IOException(ie); - } catch (ExecutionException ee) { - iox = ee.getCause() instanceof IOException - ? (IOException) ee.getCause() - : new IOException(ee.getCause()); + } catch (IOException e) { + iox = e; + } catch (RuntimeException e) { + iox = new IOException(e); } } if (iox != null) { @@ -445,7 +424,6 @@ private long parallelReplicate(CompletionService pool, ReplicateContext */ @Override public boolean replicate(ReplicateContext replicateContext) { - CompletionService pool = new ExecutorCompletionService<>(this.exec); int sleepMultiplier = 1; if (!peersSelected && this.isRunning()) { @@ -468,7 +446,7 @@ public boolean replicate(ReplicateContext replicateContext) { } List> batches = createBatches(replicateContext.getEntries()); - while (this.isRunning() && !exec.isShutdown()) { + while (this.isRunning() && !this.stopping) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; @@ -477,7 +455,7 @@ public boolean replicate(ReplicateContext replicateContext) { } try { // replicate the batches to sink side. - parallelReplicate(pool, replicateContext, batches); + parallelReplicate(replicateContext, batches); return true; } catch (IOException ioe) { if (ioe instanceof RemoteException) { @@ -532,82 +510,117 @@ protected boolean isPeerEnabled() { @Override protected void doStop() { - disconnect(); // don't call super.doStop() // Allow currently running replication tasks to finish - exec.shutdown(); - try { - exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - } - // Abort if the tasks did not terminate in time - if (!exec.isTerminated()) { - String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " - + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " - + "Aborting to prevent Replication from deadlocking. See HBASE-16081."; - abortable.abort(errMsg, new IOException(errMsg)); - } + this.stopping = true; + disconnect(); // don't call super.doStop() notifyStopped(); } - protected int replicateEntries(List entries, int batchIndex, int timeout) - throws IOException { + protected CompletableFuture replicateEntries(List entries, int batchIndex, + int timeout) { + int entriesHashCode = System.identityHashCode(entries); + if (LOG.isTraceEnabled()) { + long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); + LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(), + entriesHashCode, entries.size(), size, replicationClusterId); + } SinkPeer sinkPeer = null; + final CompletableFuture resultCompletableFuture = new CompletableFuture(); try { - int entriesHashCode = System.identityHashCode(entries); - if (LOG.isTraceEnabled()) { - long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); - LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", - logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); - } sinkPeer = getReplicationSink(); - AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); - try { - ReplicationProtobufUtil.replicateWALEntry(rsAdmin, - entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, - hfileArchiveDir, timeout); - if (LOG.isTraceEnabled()) { - LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); - } - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); + } catch (IOException e) { + this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer); + resultCompletableFuture.completeExceptionally(e); + return resultCompletableFuture; + } + assert sinkPeer != null; + AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); + final SinkPeer sinkPeerToUse = sinkPeer; + FutureUtils.addListener( + ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]), + replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout), + (response, exception) -> { + if (exception != null) { + onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse); + resultCompletableFuture.completeExceptionally(exception); + return; } - throw e; - } - reportSinkSuccess(sinkPeer); - } catch (IOException ioe) { + reportSinkSuccess(sinkPeerToUse); + resultCompletableFuture.complete(batchIndex); + }); + return resultCompletableFuture; + } + + private void onReplicateWALEntryException(int entriesHashCode, Throwable exception, + final SinkPeer sinkPeer) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception); + } + if (exception instanceof IOException) { if (sinkPeer != null) { reportBadSink(sinkPeer); } - throw ioe; } - return batchIndex; } - private int serialReplicateRegionEntries(List entries, int batchIndex, int timeout) - throws IOException { - int batchSize = 0, index = 0; + /** + * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the + * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we + * send the next batch, until we send all entries out. + */ + private CompletableFuture serialReplicateRegionEntries( + PeekingIterator walEntryPeekingIterator, int batchIndex, int timeout) { + if (!walEntryPeekingIterator.hasNext()) { + return CompletableFuture.completedFuture(batchIndex); + } + int batchSize = 0; List batch = new ArrayList<>(); - for (Entry entry : entries) { + while (walEntryPeekingIterator.hasNext()) { + Entry entry = walEntryPeekingIterator.peek(); int entrySize = getEstimatedEntrySize(entry); if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { - replicateEntries(batch, index++, timeout); - batch.clear(); - batchSize = 0; + break; } + walEntryPeekingIterator.next(); batch.add(entry); batchSize += entrySize; } - if (batchSize > 0) { - replicateEntries(batch, index, timeout); + + if (batchSize <= 0) { + return CompletableFuture.completedFuture(batchIndex); } - return batchIndex; + final CompletableFuture resultCompletableFuture = new CompletableFuture(); + FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> { + if (exception != null) { + resultCompletableFuture.completeExceptionally(exception); + return; + } + if (!walEntryPeekingIterator.hasNext()) { + resultCompletableFuture.complete(batchIndex); + return; + } + FutureUtils.addListener( + serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout), + (currentResponse, currentException) -> { + if (currentException != null) { + resultCompletableFuture.completeExceptionally(currentException); + return; + } + resultCompletableFuture.complete(batchIndex); + }); + }); + return resultCompletableFuture; } - protected Callable createReplicator(List entries, int batchIndex, int timeout) { + /** + * Replicate entries to peer cluster by async API. + */ + protected CompletableFuture asyncReplicate(List entries, int batchIndex, + int timeout) { return isSerial - ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) - : () -> replicateEntries(entries, batchIndex, timeout); + ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex, + timeout) + : replicateEntries(entries, batchIndex, timeout); } private String logPeerId() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index 011f0a19f0c8..e82d69826d89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -267,14 +268,14 @@ protected final void verifyReplicationRequestRejection(HBaseTestingUtil utility, new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); } if (!expectedRejection) { - ReplicationProtobufUtil.replicateWALEntry( + FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, - HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); } else { try { - ReplicationProtobufUtil.replicateWALEntry( + FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, - HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); fail("Should throw IOException when sync-replication state is in A or DA"); } catch (RemoteException e) { assertRejection(e.unwrapRemoteException()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 53512ec2af88..9bc632e223be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -556,15 +556,16 @@ public boolean replicate(ReplicateContext replicateContext) { } @Override - protected Callable createReplicator(List entries, int ordinal, int timeout) { + protected CompletableFuture asyncReplicate(List entries, int ordinal, + int timeout) { // Fail only once, we don't want to slow down the test. if (failedOnce) { - return () -> ordinal; + return CompletableFuture.completedFuture(ordinal); } else { failedOnce = true; - return () -> { - throw new IOException("Sample Exception: Failed to replicate."); - }; + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new IOException("Sample Exception: Failed to replicate.")); + return future; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index 803c4278f97e..c48755fb5f0d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -21,7 +21,7 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -228,15 +228,15 @@ public boolean replicate(ReplicateContext replicateContext) { } @Override - protected Callable createReplicator(List entries, int ordinal, int timeout) { - return () -> { - int batchIndex = replicateEntries(entries, ordinal, timeout); + protected CompletableFuture asyncReplicate(List entries, int ordinal, + int timeout) { + return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); - return batchIndex; - }; + }); + } } @@ -245,20 +245,23 @@ public static class FailureInjectingReplicationEndpointForTest private final AtomicBoolean failNext = new AtomicBoolean(false); @Override - protected Callable createReplicator(List entries, int ordinal, int timeout) { - return () -> { - if (failNext.compareAndSet(false, true)) { - int batchIndex = replicateEntries(entries, ordinal, timeout); + protected CompletableFuture asyncReplicate(List entries, int ordinal, + int timeout) { + + if (failNext.compareAndSet(false, true)) { + return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); - return batchIndex; - } else if (failNext.compareAndSet(true, false)) { - throw new ServiceException("Injected failure"); - } - return ordinal; - }; + }); + } else if (failNext.compareAndSet(true, false)) { + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new ServiceException("Injected failure")); + return future; + } + return CompletableFuture.completedFuture(ordinal); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java index c0eace0bbdab..5f99b88e0a45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java @@ -22,7 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -165,11 +165,10 @@ public boolean canReplicateToSameCluster() { } @Override - protected Callable createReplicator(List entries, int ordinal, int timeout) { - return () -> { - entryQueue.addAll(entries); - return ordinal; - }; + protected CompletableFuture asyncReplicate(List entries, int ordinal, + int timeout) { + entryQueue.addAll(entries); + return CompletableFuture.completedFuture(ordinal); } @Override