From 312b10218847ab9ca295e99f464a75d0c1983365 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 22 Jun 2021 13:03:49 +0200 Subject: [PATCH 1/8] use pit for search --- .../xpack/transform/TransformServices.java | 4 +- .../transforms/ClientTransformIndexer.java | 124 ++++++++++++++++-- .../ClientTransformIndexerBuilder.java | 28 +--- .../transforms/TransformIndexer.java | 10 +- .../TransformPersistentTasksExecutor.java | 5 +- .../ClientTransformIndexerTests.java | 11 +- .../TransformIndexerFailureHandlingTests.java | 7 +- .../TransformIndexerStateTests.java | 18 ++- .../transforms/TransformIndexerTests.java | 18 ++- .../transforms/TransformTaskTests.java | 11 +- 10 files changed, 173 insertions(+), 63 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java index 15d1ee1e849bc..8592e589af0fb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java @@ -29,12 +29,12 @@ public final class TransformServices { public TransformServices( TransformConfigManager transformConfigManager, - TransformCheckpointService checkpointProvider, + TransformCheckpointService checkpointService, TransformAuditor transformAuditor, SchedulerEngine schedulerEngine ) { this.configManager = Objects.requireNonNull(transformConfigManager); - this.checkpointService = Objects.requireNonNull(checkpointProvider); + this.checkpointService = Objects.requireNonNull(checkpointService); this.auditor = Objects.requireNonNull(transformAuditor); this.schedulerEngine = Objects.requireNonNull(schedulerEngine); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 2bcca9b3de6ba..3f2c1370b55b8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -19,17 +19,23 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.ClosePointInTimeAction; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeAction; +import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -42,10 +48,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; -import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; -import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; import java.util.Collection; @@ -57,21 +62,23 @@ class ClientTransformIndexer extends TransformIndexer { + private static final TimeValue PIT_KEEP_ALIVE = TimeValue.timeValueSeconds(30); private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class); private final Client client; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); private final AtomicReference seqNoPrimaryTermAndIndex; + private PointInTimeBuilder pit; + private long pitCheckpoint; ClientTransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, AtomicReference initialState, TransformIndexerPosition initialPosition, Client client, - TransformAuditor auditor, TransformIndexerStats initialStats, TransformConfig transformConfig, Map fieldMappings, @@ -84,9 +91,8 @@ class ClientTransformIndexer extends TransformIndexer { ) { super( ExceptionsHelper.requireNonNull(threadPool, "threadPool"), - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, ExceptionsHelper.requireNonNull(initialState, "initialState"), @@ -111,13 +117,14 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId())); return; } - ClientHelper.executeWithHeadersAsync( - transformConfig.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, - client, - SearchAction.INSTANCE, + + if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) { + closePointInTime(); + } + + injectPointInTimeIfNeeded( buildSearchRequest(), - nextPhase + ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure) ); } @@ -440,6 +447,97 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() { return seqNoPrimaryTermAndIndex.get(); } + @Override + protected void afterFinishOrFailure() { + closePointInTime(); + super.afterFinishOrFailure(); + } + + private void closePointInTime() { + if (pit != null) { + String oldPit = pit.getEncodedId(); + pit = null; + ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit); + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + ClosePointInTimeAction.INSTANCE, + closePitRequest, + ActionListener.wrap(response -> { + logger.trace("closed pit"); + }, e -> { + // todo: what if the pit already timed out + logger.warn("failed to close point in time handle"); + }) + ); + } + } + + private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener listener) { + if (pit != null) { + searchRequest.source().pointInTimeBuilder(pit); + listener.onResponse(searchRequest); + return; + } + + // todo: check pit is supported on all nodes + + // no pit, create a new one + OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE); + + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + OpenPointInTimeAction.INSTANCE, + pitRequest, + ActionListener.wrap(response -> { + pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); + searchRequest.source().pointInTimeBuilder(pit); + pitCheckpoint = getNextCheckpoint().getCheckpoint(); + logger.info("using pit searcher"); + + listener.onResponse(searchRequest); + }, + e -> { + // todo: failed to open pit, warn and continue with ordinary search + logger.error( + new ParameterizedMessage("[{}] Failed to create a point in time reader, falling back to normal search.", getJobId()), + e + ); + listener.onResponse(searchRequest); + } + ) + ); + } + + private void doSearch(SearchRequest searchRequest, ActionListener listener) { + logger.trace("searchRequest: {}", searchRequest); + + + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + searchRequest, + ActionListener.wrap(response -> { + // did the pit change? + if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) { + pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); + + logger.trace("updated point in time"); + } + + listener.onResponse(response); + }, e -> { + // todo: handle search phase execution exception due to pit timeout, retry once + listener.onFailure(e); + }) + ); + } + private static String getBulkIndexDetailedFailureMessage(String prefix, Map failures) { if (failures.isEmpty()) { return ""; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index 339d0426483b7..f1cf4e12f7e87 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -15,20 +15,16 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; -import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; -import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; -import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; class ClientTransformIndexerBuilder { private ParentTaskAssigningClient parentTaskClient; - private TransformConfigManager transformsConfigManager; - private TransformCheckpointService transformsCheckpointService; - private TransformAuditor auditor; + private TransformServices transformServices; private Map fieldMappings; private TransformConfig transformConfig; private TransformIndexerStats initialStats; @@ -45,16 +41,16 @@ class ClientTransformIndexerBuilder { } ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) { - CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig); + CheckpointProvider checkpointProvider = + transformServices.getCheckpointService().getCheckpointProvider(parentTaskClient, transformConfig); return new ClientTransformIndexer( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, new AtomicReference<>(this.indexerState), initialPosition, parentTaskClient, - auditor, initialStats, transformConfig, fieldMappings, @@ -77,18 +73,8 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie return this; } - ClientTransformIndexerBuilder setTransformsConfigManager(TransformConfigManager transformsConfigManager) { - this.transformsConfigManager = transformsConfigManager; - return this; - } - - ClientTransformIndexerBuilder setTransformsCheckpointService(TransformCheckpointService transformsCheckpointService) { - this.transformsCheckpointService = transformsCheckpointService; - return this; - } - - ClientTransformIndexerBuilder setAuditor(TransformAuditor auditor) { - this.auditor = auditor; + ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) { + this.transformServices = transformServices; return this; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index eba9b28aab09a..8e9a945357be7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -128,9 +129,8 @@ private enum RunState { public TransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -142,16 +142,16 @@ public TransformIndexer( TransformContext context ) { super(threadPool, initialState, initialPosition, jobStats); - this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); + ExceptionsHelper.requireNonNull(transformServices, "transformServices"); + this.transformsConfigManager = transformServices.getConfigManager(); this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); - this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor"); + this.auditor = transformServices.getAuditor(); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); this.progress = transformProgress; this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint"); this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint"); this.context = ExceptionsHelper.requireNonNull(context, "context"); - // give runState a default this.runState = RunState.APPLY_RESULTS; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 0d6930061dcfa..42042968cee8d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -164,10 +164,9 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // // We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again // later if they want. - final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder().setAuditor(auditor) + final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder() .setClient(buildTask.getParentTaskClient()) - .setTransformsCheckpointService(transformServices.getCheckpointService()) - .setTransformsConfigManager(transformServices.getConfigManager()); + .setTransformServices(transformServices); final SetOnce stateHolder = new SetOnce<>(); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index bc83a168c9371..ad416e2c626d5 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -11,11 +11,14 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; @@ -39,12 +42,16 @@ public void testAudiOnFinishFrequency() { ClientTransformIndexer indexer = new ClientTransformIndexer( mock(ThreadPool.class), - mock(IndexBasedTransformConfigManager.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), null, mock(Client.class), - mock(TransformAuditor.class), mock(TransformIndexerStats.class), mock(TransformConfig.class), Collections.emptyMap(), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index fc0e1170fd5dc..c03fd7cbcb39b 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -46,7 +47,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.Transform; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; @@ -116,9 +119,9 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + new TransformServices( + transformsConfigManager, mock(TransformCheckpointService.class), auditor, mock(SchedulerEngine.class)), checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 63be4def6bfd5..3c2a7bb1456d4 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -42,8 +43,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; @@ -108,9 +111,8 @@ class MockedTransformIndexer extends TransformIndexer { MockedTransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -120,9 +122,8 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -593,12 +594,17 @@ private MockedTransformIndexer createMockIndexer( ) { CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider(config); transformConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {}, e -> {})); + TransformServices transformServices = new TransformServices( + transformConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ); MockedTransformIndexer indexer = new MockedTransformIndexer( threadPool, - transformConfigManager, + transformServices, checkpointProvider, - auditor, config, Collections.emptyMap(), state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 42120130a4032..bc2a488ed0bb1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -41,8 +42,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; @@ -110,9 +113,8 @@ class MockedTransformIndexer extends TransformIndexer { MockedTransformIndexer( int numberOfLoops, ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -122,9 +124,8 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -432,13 +433,18 @@ private MockedTransformIndexer createMockIndexer( ) { CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider(config); transformConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {}, e -> {})); + TransformServices transformServices = new TransformServices( + transformConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ); MockedTransformIndexer indexer = new MockedTransformIndexer( numberOfLoops, threadPool, - transformConfigManager, + transformServices, checkpointProvider, - auditor, config, Collections.emptyMap(), state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 044013bec70bc..b0801d67ef129 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; @@ -80,6 +81,12 @@ public void testStopOnFailedTaskWithStoppedIndexer() { transformsConfigManager, auditor ); + TransformServices transformServices = new TransformServices( + transformsConfigManager, + transformsCheckpointService, + auditor, + mock(SchedulerEngine.class) + ); TransformState transformState = new TransformState( TransformTaskState.FAILED, @@ -113,9 +120,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder(); indexerBuilder.setClient(new ParentTaskAssigningClient(client, TaskId.EMPTY_TASK_ID)) .setTransformConfig(transformConfig) - .setAuditor(auditor) - .setTransformsConfigManager(transformsConfigManager) - .setTransformsCheckpointService(transformsCheckpointService) + .setTransformServices(transformServices) .setFieldMappings(Collections.emptyMap()); transformTask.initializeIndexer(indexerBuilder); From a46bc7c38caa7deb04aa04b75c27ba5d68866057 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 30 Jun 2021 08:24:29 +0200 Subject: [PATCH 2/8] implement pit error handling --- .../transforms/ClientTransformIndexer.java | 72 +++++++++++++------ 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 3f2c1370b55b8..8f79600bcab6f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -35,8 +35,10 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -71,6 +73,7 @@ class ClientTransformIndexer extends TransformIndexer { private final AtomicReference seqNoPrimaryTermAndIndex; private PointInTimeBuilder pit; private long pitCheckpoint; + private boolean disablePit = false; ClientTransformIndexer( ThreadPool threadPool, @@ -464,25 +467,26 @@ private void closePointInTime() { client, ClosePointInTimeAction.INSTANCE, closePitRequest, - ActionListener.wrap(response -> { - logger.trace("closed pit"); - }, e -> { - // todo: what if the pit already timed out - logger.warn("failed to close point in time handle"); + ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> { + // note: closing the pit should never throw, even if the pit is invalid + logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e); }) ); } } private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener listener) { + if (disablePit == true) { + listener.onResponse(searchRequest); + return; + } + if (pit != null) { searchRequest.source().pointInTimeBuilder(pit); listener.onResponse(searchRequest); return; } - // todo: check pit is supported on all nodes - // no pit, create a new one OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE); @@ -496,26 +500,39 @@ private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListen pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); searchRequest.source().pointInTimeBuilder(pit); pitCheckpoint = getNextCheckpoint().getCheckpoint(); - logger.info("using pit searcher"); - + logger.trace("[{}] using pit search context [{}]", getJobId(), pit); listener.onResponse(searchRequest); - }, - e -> { - // todo: failed to open pit, warn and continue with ordinary search - logger.error( - new ParameterizedMessage("[{}] Failed to create a point in time reader, falling back to normal search.", getJobId()), + }, e -> { + Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); + // if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another + // try) + if (unwrappedException instanceof ActionNotFoundTransportException) { + logger.warn( + "[{}] source does not support point in time reader, falling back to normal search (more resource intensive)", + getJobId() + ); + auditor.warning( + getJobId(), + "Source does not support point in time reader, falling back to normal search (more resource intensive)" + ); + disablePit = true; + } else { + logger.warn( + new ParameterizedMessage( + "[{}] Failed to create a point in time reader, falling back to normal search.", + getJobId() + ), e ); - listener.onResponse(searchRequest); } - ) + listener.onResponse(searchRequest); + }) ); } private void doSearch(SearchRequest searchRequest, ActionListener listener) { logger.trace("searchRequest: {}", searchRequest); - ClientHelper.executeWithHeadersAsync( transformConfig.getHeaders(), ClientHelper.TRANSFORM_ORIGIN, @@ -526,13 +543,28 @@ private void doSearch(SearchRequest searchRequest, ActionListener { - // todo: handle search phase execution exception due to pit timeout, retry once + // check if the error has been caused by a missing search context, which could be a timed out pit + // re-try this search without pit, if it fails again the normal failure handler is called, if it + // succeeds a new pit gets created at the next run + Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); + if (unwrappedException instanceof SearchContextMissingException) { + logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e); + pit = null; + searchRequest.source().pointInTimeBuilder(null); + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + searchRequest, + listener + ); + } listener.onFailure(e); }) ); From 6ea5bb2122fe3026bd4705d6d63595f1aea7362d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 6 Jul 2021 15:47:37 +0200 Subject: [PATCH 3/8] log pit id --- .../xpack/transform/transforms/ClientTransformIndexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 8f79600bcab6f..27cd247317cf9 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -500,7 +500,7 @@ private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListen pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); searchRequest.source().pointInTimeBuilder(pit); pitCheckpoint = getNextCheckpoint().getCheckpoint(); - logger.trace("[{}] using pit search context [{}]", getJobId(), pit); + logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId()); listener.onResponse(searchRequest); }, e -> { Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); From 2883c03172e34ff601d387aa1801473ef441ac7b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 6 Jul 2021 16:01:40 +0200 Subject: [PATCH 4/8] close pit on stop --- .../xpack/transform/transforms/ClientTransformIndexer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 27cd247317cf9..caf60ba7f3270 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -456,6 +456,12 @@ protected void afterFinishOrFailure() { super.afterFinishOrFailure(); } + @Override + protected void onStop() { + closePointInTime(); + super.onStop(); + } + private void closePointInTime() { if (pit != null) { String oldPit = pit.getEncodedId(); From 3295719fa9c69c6003ff14e4f7cfcd821c5cb60a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 6 Jul 2021 17:01:14 +0200 Subject: [PATCH 5/8] checkstyle --- .../xpack/transform/transforms/ClientTransformIndexer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index caf60ba7f3270..f2f74f778ecb8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -482,7 +482,7 @@ private void closePointInTime() { } private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener listener) { - if (disablePit == true) { + if (disablePit) { listener.onResponse(searchRequest); return; } From d501166124ed01d7e66d095080e4ea554b41e470 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 8 Jul 2021 17:49:44 +0200 Subject: [PATCH 6/8] add tests for pit handling --- .../ClientTransformIndexerTests.java | 303 ++++++++++++++++++ 1 file changed, 303 insertions(+) diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index ad416e2c626d5..7d54ca009d3a3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -7,14 +7,38 @@ package org.elasticsearch.xpack.transform.transforms; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.ClosePointInTimeResponse; +import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; @@ -26,8 +50,14 @@ import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -94,4 +124,277 @@ public void testAudiOnFinishFrequency() { assertFalse(shouldAudit.get(11_999)); } + public void testPitInjection() throws InterruptedException { + TransformConfig config = TransformConfigTests.randomTransformConfig(); + + try (PitMockClient client = new PitMockClient(getTestName(), true)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + Collections.emptyMap(), + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+", response.pointInTimeId()); } + ); + + assertEquals(1L, client.getPitContextCounter()); + + indexer.afterFinishOrFailure(); + assertEquals(0L, client.getPitContextCounter()); + + // check its not called again + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+", response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id++", response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+++", response.pointInTimeId()); } + ); + + assertEquals(1L, client.getPitContextCounter()); + + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + } + } + + public void testPitInjectionIfPitNotSupported() throws InterruptedException { + TransformConfig config = TransformConfigTests.randomTransformConfig(); + + try (PitMockClient client = new PitMockClient(getTestName(), false)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + Collections.emptyMap(), + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + assertEquals(0L, client.getPitContextCounter()); + + indexer.afterFinishOrFailure(); + assertEquals(0L, client.getPitContextCounter()); + + // check its not called again + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + assertEquals(0L, client.getPitContextCounter()); + + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + } + } + + private static class MockClientTransformIndexer extends ClientTransformIndexer { + + MockClientTransformIndexer( + ThreadPool threadPool, + TransformServices transformServices, + CheckpointProvider checkpointProvider, + AtomicReference initialState, + TransformIndexerPosition initialPosition, + Client client, + TransformIndexerStats initialStats, + TransformConfig transformConfig, + Map fieldMappings, + TransformProgress transformProgress, + TransformCheckpoint lastCheckpoint, + TransformCheckpoint nextCheckpoint, + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, + TransformContext context, + boolean shouldStopAtCheckpoint + ) { + super( + threadPool, + transformServices, + checkpointProvider, + initialState, + initialPosition, + client, + initialStats, + transformConfig, + fieldMappings, + transformProgress, + lastCheckpoint, + nextCheckpoint, + seqNoPrimaryTermAndIndex, + context, + shouldStopAtCheckpoint + ); + } + + @Override + protected SearchRequest buildSearchRequest() { + return new SearchRequest().source(new SearchSourceBuilder()); + } + } + + private static class PitMockClient extends NoOpClient { + private final boolean pitSupported; + private AtomicLong pitContextCounter = new AtomicLong(); + + PitMockClient(String testName, boolean pitSupported) { + super(testName); + this.pitSupported = pitSupported; + } + + public long getPitContextCounter() { + return pitContextCounter.get(); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (request instanceof OpenPointInTimeRequest) { + if (pitSupported) { + pitContextCounter.incrementAndGet(); + OpenPointInTimeResponse response = new OpenPointInTimeResponse("the_pit_id"); + listener.onResponse((Response) response); + } else { + listener.onFailure(new ActionNotFoundTransportException("_pit")); + } + return; + } else if (request instanceof ClosePointInTimeRequest) { + ClosePointInTimeResponse response = new ClosePointInTimeResponse(true, 1); + assert pitContextCounter.get() > 0; + pitContextCounter.decrementAndGet(); + listener.onResponse((Response) response); + return; + } else if (request instanceof SearchRequest) { + SearchRequest searchRequest = (SearchRequest) request; + + SearchResponse response = new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + // copy the pit from the request + searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null + ); + listener.onResponse((Response) response); + return; + } + + super.doExecute(action, request, listener); + } + } + + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + function.accept(listener); + assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS)); + } } From 439988a4a7918774a33c81723d7c2ec62f4c4ea2 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 12 Jul 2021 17:46:17 +0200 Subject: [PATCH 7/8] address review comments and apply spotless --- .../transforms/ClientTransformIndexer.java | 32 +++++++------- .../ClientTransformIndexerBuilder.java | 4 +- .../transforms/TransformIndexer.java | 2 +- .../TransformPersistentTasksExecutor.java | 42 ++++++++++++------- .../TransformIndexerFailureHandlingTests.java | 11 ++--- 5 files changed, 53 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index f2f74f778ecb8..b799b92217b6f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -463,22 +463,24 @@ protected void onStop() { } private void closePointInTime() { - if (pit != null) { - String oldPit = pit.getEncodedId(); - pit = null; - ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit); - ClientHelper.executeWithHeadersAsync( - transformConfig.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, - client, - ClosePointInTimeAction.INSTANCE, - closePitRequest, - ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> { - // note: closing the pit should never throw, even if the pit is invalid - logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e); - }) - ); + if (pit == null) { + return; } + + String oldPit = pit.getEncodedId(); + pit = null; + ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit); + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + ClosePointInTimeAction.INSTANCE, + closePitRequest, + ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> { + // note: closing the pit should never throw, even if the pit is invalid + logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e); + }) + ); } private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener listener) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index f1cf4e12f7e87..4cb8899cd9fdc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -41,8 +41,8 @@ class ClientTransformIndexerBuilder { } ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) { - CheckpointProvider checkpointProvider = - transformServices.getCheckpointService().getCheckpointProvider(parentTaskClient, transformConfig); + CheckpointProvider checkpointProvider = transformServices.getCheckpointService() + .getCheckpointProvider(parentTaskClient, transformConfig); return new ClientTransformIndexer( threadPool, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 8e9a945357be7..57ad5e006c150 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -19,8 +19,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 42042968cee8d..ac2b0f66e8df1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -21,10 +21,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.ValidationException; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -94,12 +94,16 @@ public TransformPersistentTasksExecutor( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParams params, - Collection candidateNodes, - ClusterState clusterState) { + public PersistentTasksCustomMetadata.Assignment getAssignment( + TransformTaskParams params, + Collection candidateNodes, + ClusterState clusterState + ) { if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) { - return new PersistentTasksCustomMetadata.Assignment(null, - "Transform task will not be assigned as a feature reset is in progress."); + return new PersistentTasksCustomMetadata.Assignment( + null, + "Transform task will not be assigned as a feature reset is in progress." + ); } List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver); if (unavailableIndices.size() != 0) { @@ -113,7 +117,8 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParam return new PersistentTasksCustomMetadata.Assignment(null, reason); } DiscoveryNode discoveryNode = selectLeastLoadedNode( - clusterState, candidateNodes, + clusterState, + candidateNodes, node -> nodeCanRunThisTransform(node, params.getVersion(), params.requiresRemote(), null) ); @@ -164,8 +169,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // // We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again // later if they want. - final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder() - .setClient(buildTask.getParentTaskClient()) + final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder().setClient(buildTask.getParentTaskClient()) .setTransformServices(transformServices); final SetOnce stateHolder = new SetOnce<>(); @@ -283,14 +287,19 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ValidationException validationException = config.validate(null); if (validationException == null) { indexerBuilder.setTransformConfig(config); - SchemaUtil.getDestinationFieldMappings(buildTask.getParentTaskClient(), config.getDestination().getIndex(), - getFieldMappingsListener); + SchemaUtil.getDestinationFieldMappings( + buildTask.getParentTaskClient(), + config.getDestination().getIndex(), + getFieldMappingsListener + ); } else { auditor.error(transformId, validationException.getMessage()); markAsFailed( buildTask, TransformMessages.getMessage( - TransformMessages.TRANSFORM_CONFIGURATION_INVALID, transformId, validationException.getMessage() + TransformMessages.TRANSFORM_CONFIGURATION_INVALID, + transformId, + validationException.getMessage() ) ); } @@ -312,8 +321,11 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ); // <1> Check the index templates are installed - TransformInternalIndex.ensureLatestIndexAndTemplateInstalled(clusterService, buildTask.getParentTaskClient(), - templateCheckListener); + TransformInternalIndex.ensureLatestIndexAndTemplateInstalled( + clusterService, + buildTask.getParentTaskClient(), + templateCheckListener + ); } private static IndexerState currentIndexerState(TransformState previousState) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index c03fd7cbcb39b..64f9a4b7c551c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -120,7 +120,11 @@ class MockedTransformIndexer extends TransformIndexer { super( threadPool, new TransformServices( - transformsConfigManager, mock(TransformCheckpointService.class), auditor, mock(SchedulerEngine.class)), + transformsConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ), checkpointProvider, transformConfig, fieldMappings, @@ -645,10 +649,7 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce throw new SearchPhaseExecutionException( "query", "Partial shards failure", - new ShardSearchFailure[] { - new ShardSearchFailure( - new ElasticsearchTimeoutException("timed out during dbq") - ) } + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) } ); }; From e5f2ecea791f576714f20ac355e72d7960eed334 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 13 Jul 2021 09:40:16 +0200 Subject: [PATCH 8/8] make members volatile --- .../xpack/transform/transforms/ClientTransformIndexer.java | 6 +++--- .../xpack/transform/transforms/TransformIndexer.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index b799b92217b6f..72f8f65e30e4a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -71,9 +71,9 @@ class ClientTransformIndexer extends TransformIndexer { private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); private final AtomicReference seqNoPrimaryTermAndIndex; - private PointInTimeBuilder pit; - private long pitCheckpoint; - private boolean disablePit = false; + private volatile PointInTimeBuilder pit; + private volatile long pitCheckpoint; + private volatile boolean disablePit = false; ClientTransformIndexer( ThreadPool threadPool, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 57ad5e006c150..38db0d1bf9120 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -113,8 +113,8 @@ private enum RunState { private volatile Integer initialConfiguredPageSize; private volatile int pageSize = 0; - private long logEvery = 1; - private long logCount = 0; + private volatile long logEvery = 1; + private volatile long logCount = 0; private volatile TransformCheckpoint lastCheckpoint; private volatile TransformCheckpoint nextCheckpoint;