From 676e941432a374e55000aecc2078ed1e9ee3281a Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 29 Aug 2019 13:29:16 -0400 Subject: [PATCH] Address review comments --- .../action/bulk/TransportBulkAction.java | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a9da7d70066c4..e9a7c5efe3fea 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -158,31 +158,27 @@ private final class BulkExecutor extends ActionRunnable { private final ClusterStateObserver recoveredObserver; private final BulkRequest bulkRequest; private final Task task; - long startTime = -1; + long startTime; BulkExecutor(Task task, BulkRequest bulkRequest, ActionListener listener) { super(listener); this.recoveredObserver = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); this.bulkRequest = bulkRequest; this.task = task; + startTime = relativeTime(); } @Override protected void doRun() { - if (startTime == -1) { - startTime = relativeTime(); - } - ClusterBlockException blockException = clusterService.state().blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + ClusterState currentState = clusterService.state(); + ClusterBlockException blockException = currentState.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + if (blockException != null) { - if (recoveredObserver.isTimedOut()) { - // we running as a last attempt after a timeout has happened. don't retry - listener.onFailure(blockException); - return; - } recoveredObserver.waitForNextChange(new ClusterStateObserver.Listener() { @Override - public void onNewClusterState(ClusterState state) { - run(); + public void onNewClusterState(ClusterState newState) { + // predicate passed, begin preparing for the bulk + prepForBulk(newState); } @Override @@ -192,22 +188,21 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // Try one more time... - run(); + listener.onFailure(blockException); } - }); + }, newState -> newState.blocks().global(ClusterBlockLevel.WRITE).isEmpty()); return; } // All good, begin preparing for the bulk request - prepForBulk(); + prepForBulk(currentState); } - private void prepForBulk() { + private void prepForBulk(ClusterState clusterState) { final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); boolean hasIndexRequestsWithPipelines = false; - final MetaData metaData = clusterService.state().getMetaData(); + final MetaData metaData = clusterState.getMetaData(); ImmutableOpenMap indicesMetaData = metaData.indices(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); @@ -296,11 +291,10 @@ private void prepForBulk() { indices we can't create that we'll use when we try to run the requests. */ final Map indicesThatCannotBeCreated = new HashMap<>(); Set autoCreateIndices = new HashSet<>(); - ClusterState state = clusterService.state(); for (String index : indices) { boolean shouldAutoCreate; try { - shouldAutoCreate = shouldAutoCreate(index, state); + shouldAutoCreate = shouldAutoCreate(index, clusterState); } catch (IndexNotFoundException e) { shouldAutoCreate = false; indicesThatCannotBeCreated.put(index, e);