From 4e84a95509de42f671e28bb69b9ab80c32401d3b Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 27 May 2018 21:59:23 +0200 Subject: [PATCH] renamed ChunkProcessor to ChunkWorker --- .../ccr/action/ShardFollowTasksExecutor.java | 37 +++++++------- .../ccr/action/ChunksCoordinatorTests.java | 49 ++++++++++--------- 2 files changed, 44 insertions(+), 42 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 62b4107573f7e..f4344ba2c0748 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -134,9 +134,9 @@ void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask tas fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> { logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard, leaderGlobalCheckPoint, followGlobalCheckPoint); - ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, threadPool, imdVersionChecker, params.getMaxChunkSize(), - params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, task::markAsFailed, - task::isRunning, task::updateProcessedGlobalCheckpoint); + ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, threadPool, imdVersionChecker, + params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, + followerShard, task::markAsFailed, task::isRunning, task::updateProcessedGlobalCheckpoint); coordinator.start(followGlobalCheckPoint, leaderGlobalCheckPoint); }, task::markAsFailed); } @@ -165,6 +165,7 @@ static class ChunksCoordinator { private final Client followerClient; private final Client leaderClient; private final ThreadPool threadPool; + private final Executor ccrExecutor; private final IndexMetadataVersionChecker imdVersionChecker; private final long batchSize; @@ -178,7 +179,6 @@ static class ChunksCoordinator { private final AtomicInteger activeWorkers; private final AtomicLong lastPolledGlobalCheckpoint; - private final AtomicLong lastProcessedGlobalCheckPoint; private final Queue chunks = new ConcurrentLinkedQueue<>(); ChunksCoordinator(Client followerClient, @@ -197,6 +197,7 @@ static class ChunksCoordinator { this.leaderClient = leaderClient; this.threadPool = threadPool; this.imdVersionChecker = imdVersionChecker; + this.ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); this.batchSize = batchSize; this.maxConcurrentWorker = maxConcurrentWorker; this.processorMaxTranslogBytes = processorMaxTranslogBytes; @@ -207,7 +208,6 @@ static class ChunksCoordinator { this.processedGlobalCheckpointUpdater = processedGlobalCheckpointUpdater; this.activeWorkers = new AtomicInteger(); this.lastPolledGlobalCheckpoint = new AtomicLong(); - this.lastProcessedGlobalCheckPoint = new AtomicLong(); } void createChucks(long from, long to) { @@ -218,8 +218,8 @@ void createChucks(long from, long to) { } } - void update() { - schedule(() -> { + void updateChunksQueue() { + schedule(CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL, () -> { if (stateSupplier.get() == false) { return; } @@ -234,7 +234,7 @@ void update() { } else { LOGGER.debug("{} no write operations to fetch", followerShard); } - update(); + updateChunksQueue(); }, failureHandler); }); } @@ -245,7 +245,7 @@ void start(long followerGlobalCheckpoint, long leaderGlobalCheckPoint) { LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors", leaderShard, chunks.size(), maxConcurrentWorker); initiateChunkWorkers(); - update(); + updateChunksQueue(); } void initiateChunkWorkers() { @@ -257,7 +257,7 @@ void initiateChunkWorkers() { LOGGER.debug("{} Starting [{}] new chunk workers", followerShard, workersToStart); for (int i = 0; i < workersToStart; i++) { - threadPool.executor(Ccr.CCR_THREAD_POOL_NAME).execute(new AbstractRunnable() { + ccrExecutor.execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { assert e != null; @@ -299,14 +299,13 @@ void processNextChunk() { failureHandler.accept(e); } }; - Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); - ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker, + ChunkWorker worker = new ChunkWorker(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker, leaderShard, followerShard, processorHandler); - processor.start(chunk[0], chunk[1], processorMaxTranslogBytes); + worker.start(chunk[0], chunk[1], processorMaxTranslogBytes); } - void schedule(Runnable runnable) { - threadPool.schedule(CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { + void schedule(TimeValue delay, Runnable runnable) { + threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { @Override public void onFailure(Exception e) { failureHandler.accept(e); @@ -325,7 +324,7 @@ Queue getChunks() { } - static class ChunkProcessor { + static class ChunkWorker { private final Client leaderClient; private final Client followerClient; @@ -338,9 +337,9 @@ static class ChunkProcessor { private final Consumer handler; final AtomicInteger retryCounter = new AtomicInteger(0); - ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, - BiConsumer> indexVersionChecker, - ShardId leaderShard, ShardId followerShard, Consumer handler) { + ChunkWorker(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, + BiConsumer> indexVersionChecker, ShardId leaderShard, ShardId followerShard, + Consumer handler) { this.leaderClient = leaderClient; this.followerClient = followerClient; this.chunks = chunks; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java index c0753159e8b33..7c501ced9bb18 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -14,7 +14,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkWorker; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.IndexMetadataVersionChecker; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -122,7 +123,8 @@ public void testCoordinator() throws Exception { IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); ChunksCoordinator coordinator = new ChunksCoordinator(client, client, threadPool, checker, batchSize, - concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler, () -> true, value -> {}); + concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler, + () -> true, value -> {}); int numberOfOps = randomIntBetween(batchSize, batchSize * 20); long from = randomInt(1000); @@ -163,8 +165,9 @@ public void testCoordinator_failure() throws Exception { assertThat(e, sameInstance(expectedException)); }; IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client);ChunksCoordinator coordinator = - new ChunksCoordinator(client, client, threadPool, checker,10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler, () -> true, value -> {}); + followShardId.getIndex(), client, client); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, threadPool, checker,10, 1, Long.MAX_VALUE, + leaderShardId, followShardId, handler, () -> true, value -> {}); coordinator.start(0, 20); assertThat(coordinator.getChunks().size(), equalTo(1)); @@ -209,7 +212,7 @@ public void testCoordinator_concurrent() throws Exception { assertThat(calledOnceChecker.get(), is(false)); } - public void testChunkProcessor() { + public void testChunkWorker() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockShardChangesApiCall(client); @@ -223,14 +226,14 @@ public void testChunkProcessor() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); } - public void testChunkProcessorRetry() { + public void testChunkWorkerRetry() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockBulkShardOperationsApiCall(client); @@ -246,15 +249,15 @@ public void testChunkProcessorRetry() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1)); + assertThat(chunkWorker.retryCounter.get(), equalTo(testRetryLimit + 1)); } - public void testChunkProcessorRetryTooManyTimes() { + public void testChunkWorkerRetryTooManyTimes() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockBulkShardOperationsApiCall(client); @@ -270,17 +273,17 @@ public void testChunkProcessorRetryTooManyTimes() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting...")); assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); + assertThat(chunkWorker.retryCounter.get(), equalTo(testRetryLimit)); } - public void testChunkProcessorNoneRetryableError() { + public void testChunkWorkerNoneRetryableError() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockBulkShardOperationsApiCall(client); @@ -295,16 +298,16 @@ public void testChunkProcessorNoneRetryableError() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); assertThat(exception[0].getMessage(), equalTo("unexpected")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); + assertThat(chunkWorker.retryCounter.get(), equalTo(0)); } - public void testChunkProcessorExceedMaxTranslogsBytes() { + public void testChunkWorkerExceedMaxTranslogsBytes() { long from = 0; long to = 20; long actualTo = 10; @@ -335,9 +338,9 @@ public void testChunkProcessorExceedMaxTranslogsBytes() { Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; BiConsumer> versionChecker = (indexVersiuon, consumer) -> consumer.accept(null); - ChunkProcessor chunkProcessor = - new ChunkProcessor(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, followShardId, handler); - chunkProcessor.start(from, to, Long.MAX_VALUE); + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, + followShardId, handler); + chunkWorker.start(from, to, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); assertThat(chunks.size(), equalTo(1));