From f314465cd98733f1ce2ff322aa01714d80ee03cb Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 14 Jul 2021 04:34:01 -0700 Subject: [PATCH] [7.x][Transform] improve performance by using point in time API for search (#75333) Use point in time API for every checkpoint in transform. Using point in time reduces pressure on the source indexes, e.g. less refreshes. In case, pit isn't supported (e.g. when searching remote clusters) it falls back to ordinary search requests as before. closes #73481 backport #74984 --- .../xpack/transform/TransformServices.java | 4 +- .../transforms/ClientTransformIndexer.java | 164 ++++++++- .../ClientTransformIndexerBuilder.java | 28 +- .../transforms/TransformIndexer.java | 16 +- .../TransformPersistentTasksExecutor.java | 47 ++- .../ClientTransformIndexerTests.java | 314 +++++++++++++++++- .../TransformIndexerFailureHandlingTests.java | 16 +- .../TransformIndexerStateTests.java | 18 +- .../transforms/TransformIndexerTests.java | 18 +- .../transforms/TransformTaskTests.java | 11 +- 10 files changed, 552 insertions(+), 84 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java index 15d1ee1e849bc..8592e589af0fb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java @@ -29,12 +29,12 @@ public final class TransformServices { public TransformServices( TransformConfigManager transformConfigManager, - TransformCheckpointService checkpointProvider, + TransformCheckpointService checkpointService, TransformAuditor transformAuditor, SchedulerEngine schedulerEngine ) { this.configManager = Objects.requireNonNull(transformConfigManager); - this.checkpointService = Objects.requireNonNull(checkpointProvider); + this.checkpointService = Objects.requireNonNull(checkpointService); this.auditor = Objects.requireNonNull(transformAuditor); this.schedulerEngine = Objects.requireNonNull(schedulerEngine); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 2bcca9b3de6ba..72f8f65e30e4a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -19,18 +19,26 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.ClosePointInTimeAction; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeAction; +import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.SearchContextMissingException; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -42,10 +50,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; -import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; -import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; import java.util.Collection; @@ -57,21 +64,24 @@ class ClientTransformIndexer extends TransformIndexer { + private static final TimeValue PIT_KEEP_ALIVE = TimeValue.timeValueSeconds(30); private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class); private final Client client; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); private final AtomicReference seqNoPrimaryTermAndIndex; + private volatile PointInTimeBuilder pit; + private volatile long pitCheckpoint; + private volatile boolean disablePit = false; ClientTransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, AtomicReference initialState, TransformIndexerPosition initialPosition, Client client, - TransformAuditor auditor, TransformIndexerStats initialStats, TransformConfig transformConfig, Map fieldMappings, @@ -84,9 +94,8 @@ class ClientTransformIndexer extends TransformIndexer { ) { super( ExceptionsHelper.requireNonNull(threadPool, "threadPool"), - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, ExceptionsHelper.requireNonNull(initialState, "initialState"), @@ -111,13 +120,14 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId())); return; } - ClientHelper.executeWithHeadersAsync( - transformConfig.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, - client, - SearchAction.INSTANCE, + + if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) { + closePointInTime(); + } + + injectPointInTimeIfNeeded( buildSearchRequest(), - nextPhase + ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure) ); } @@ -440,6 +450,134 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() { return seqNoPrimaryTermAndIndex.get(); } + @Override + protected void afterFinishOrFailure() { + closePointInTime(); + super.afterFinishOrFailure(); + } + + @Override + protected void onStop() { + closePointInTime(); + super.onStop(); + } + + private void closePointInTime() { + if (pit == null) { + return; + } + + String oldPit = pit.getEncodedId(); + pit = null; + ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit); + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + ClosePointInTimeAction.INSTANCE, + closePitRequest, + ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> { + // note: closing the pit should never throw, even if the pit is invalid + logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e); + }) + ); + } + + private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener listener) { + if (disablePit) { + listener.onResponse(searchRequest); + return; + } + + if (pit != null) { + searchRequest.source().pointInTimeBuilder(pit); + listener.onResponse(searchRequest); + return; + } + + // no pit, create a new one + OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE); + + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + OpenPointInTimeAction.INSTANCE, + pitRequest, + ActionListener.wrap(response -> { + pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); + searchRequest.source().pointInTimeBuilder(pit); + pitCheckpoint = getNextCheckpoint().getCheckpoint(); + logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId()); + listener.onResponse(searchRequest); + }, e -> { + Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); + // if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another + // try) + if (unwrappedException instanceof ActionNotFoundTransportException) { + logger.warn( + "[{}] source does not support point in time reader, falling back to normal search (more resource intensive)", + getJobId() + ); + auditor.warning( + getJobId(), + "Source does not support point in time reader, falling back to normal search (more resource intensive)" + ); + disablePit = true; + } else { + logger.warn( + new ParameterizedMessage( + "[{}] Failed to create a point in time reader, falling back to normal search.", + getJobId() + ), + e + ); + } + listener.onResponse(searchRequest); + }) + ); + } + + private void doSearch(SearchRequest searchRequest, ActionListener listener) { + logger.trace("searchRequest: {}", searchRequest); + + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + searchRequest, + ActionListener.wrap(response -> { + // did the pit change? + if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) { + pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); + logger.trace("point in time handle has changed"); + } + + listener.onResponse(response); + }, e -> { + // check if the error has been caused by a missing search context, which could be a timed out pit + // re-try this search without pit, if it fails again the normal failure handler is called, if it + // succeeds a new pit gets created at the next run + Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); + if (unwrappedException instanceof SearchContextMissingException) { + logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e); + pit = null; + searchRequest.source().pointInTimeBuilder(null); + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + searchRequest, + listener + ); + } + listener.onFailure(e); + }) + ); + } + private static String getBulkIndexDetailedFailureMessage(String prefix, Map failures) { if (failures.isEmpty()) { return ""; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index 339d0426483b7..4cb8899cd9fdc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -15,20 +15,16 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; -import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; -import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; -import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; class ClientTransformIndexerBuilder { private ParentTaskAssigningClient parentTaskClient; - private TransformConfigManager transformsConfigManager; - private TransformCheckpointService transformsCheckpointService; - private TransformAuditor auditor; + private TransformServices transformServices; private Map fieldMappings; private TransformConfig transformConfig; private TransformIndexerStats initialStats; @@ -45,16 +41,16 @@ class ClientTransformIndexerBuilder { } ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) { - CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig); + CheckpointProvider checkpointProvider = transformServices.getCheckpointService() + .getCheckpointProvider(parentTaskClient, transformConfig); return new ClientTransformIndexer( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, new AtomicReference<>(this.indexerState), initialPosition, parentTaskClient, - auditor, initialStats, transformConfig, fieldMappings, @@ -77,18 +73,8 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie return this; } - ClientTransformIndexerBuilder setTransformsConfigManager(TransformConfigManager transformsConfigManager) { - this.transformsConfigManager = transformsConfigManager; - return this; - } - - ClientTransformIndexerBuilder setTransformsCheckpointService(TransformCheckpointService transformsCheckpointService) { - this.transformsCheckpointService = transformsCheckpointService; - return this; - } - - ClientTransformIndexerBuilder setAuditor(TransformAuditor auditor) { - this.auditor = auditor; + ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) { + this.transformServices = transformServices; return this; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index eba9b28aab09a..38db0d1bf9120 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -19,8 +19,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -112,8 +113,8 @@ private enum RunState { private volatile Integer initialConfiguredPageSize; private volatile int pageSize = 0; - private long logEvery = 1; - private long logCount = 0; + private volatile long logEvery = 1; + private volatile long logCount = 0; private volatile TransformCheckpoint lastCheckpoint; private volatile TransformCheckpoint nextCheckpoint; @@ -128,9 +129,8 @@ private enum RunState { public TransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -142,16 +142,16 @@ public TransformIndexer( TransformContext context ) { super(threadPool, initialState, initialPosition, jobStats); - this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); + ExceptionsHelper.requireNonNull(transformServices, "transformServices"); + this.transformsConfigManager = transformServices.getConfigManager(); this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); - this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor"); + this.auditor = transformServices.getAuditor(); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); this.progress = transformProgress; this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint"); this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint"); this.context = ExceptionsHelper.requireNonNull(context, "context"); - // give runState a default this.runState = RunState.APPLY_RESULTS; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 8a557c75e4cd4..d747b0a3b3d56 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -22,10 +22,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.Nullable; + import org.elasticsearch.common.ValidationException; -import org.elasticsearch.core.Tuple; + import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -96,12 +98,16 @@ public TransformPersistentTasksExecutor( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParams params, - Collection candidateNodes, - ClusterState clusterState) { + public PersistentTasksCustomMetadata.Assignment getAssignment( + TransformTaskParams params, + Collection candidateNodes, + ClusterState clusterState + ) { if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) { - return new PersistentTasksCustomMetadata.Assignment(null, - "Transform task will not be assigned as a feature reset is in progress."); + return new PersistentTasksCustomMetadata.Assignment( + null, + "Transform task will not be assigned as a feature reset is in progress." + ); } List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver); if (unavailableIndices.size() != 0) { @@ -184,10 +190,9 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // // We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again // later if they want. - final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder().setAuditor(auditor) - .setClient(buildTask.getParentTaskClient()) - .setTransformsCheckpointService(transformServices.getCheckpointService()) - .setTransformsConfigManager(transformServices.getConfigManager()); + final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder().setClient(buildTask.getParentTaskClient()) + .setTransformServices(transformServices); + final SetOnce stateHolder = new SetOnce<>(); @@ -304,14 +309,19 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ValidationException validationException = config.validate(null); if (validationException == null) { indexerBuilder.setTransformConfig(config); - SchemaUtil.getDestinationFieldMappings(buildTask.getParentTaskClient(), config.getDestination().getIndex(), - getFieldMappingsListener); + SchemaUtil.getDestinationFieldMappings( + buildTask.getParentTaskClient(), + config.getDestination().getIndex(), + getFieldMappingsListener + ); } else { auditor.error(transformId, validationException.getMessage()); markAsFailed( buildTask, TransformMessages.getMessage( - TransformMessages.TRANSFORM_CONFIGURATION_INVALID, transformId, validationException.getMessage() + TransformMessages.TRANSFORM_CONFIGURATION_INVALID, + transformId, + validationException.getMessage() ) ); } @@ -333,8 +343,11 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ); // <1> Check the index templates are installed - TransformInternalIndex.ensureLatestIndexAndTemplateInstalled(clusterService, buildTask.getParentTaskClient(), - templateCheckListener); + TransformInternalIndex.ensureLatestIndexAndTemplateInstalled( + clusterService, + buildTask.getParentTaskClient(), + templateCheckListener + ); } private static IndexerState currentIndexerState(TransformState previousState) { @@ -415,4 +428,4 @@ protected AllocatedPersistentTask createTask( headers ); } -} +} \ No newline at end of file diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index bc83a168c9371..7d54ca009d3a3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -7,15 +7,42 @@ package org.elasticsearch.xpack.transform.transforms; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.ClosePointInTimeResponse; +import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; @@ -23,8 +50,14 @@ import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -39,12 +72,16 @@ public void testAudiOnFinishFrequency() { ClientTransformIndexer indexer = new ClientTransformIndexer( mock(ThreadPool.class), - mock(IndexBasedTransformConfigManager.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), null, mock(Client.class), - mock(TransformAuditor.class), mock(TransformIndexerStats.class), mock(TransformConfig.class), Collections.emptyMap(), @@ -87,4 +124,277 @@ public void testAudiOnFinishFrequency() { assertFalse(shouldAudit.get(11_999)); } + public void testPitInjection() throws InterruptedException { + TransformConfig config = TransformConfigTests.randomTransformConfig(); + + try (PitMockClient client = new PitMockClient(getTestName(), true)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + Collections.emptyMap(), + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+", response.pointInTimeId()); } + ); + + assertEquals(1L, client.getPitContextCounter()); + + indexer.afterFinishOrFailure(); + assertEquals(0L, client.getPitContextCounter()); + + // check its not called again + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+", response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id++", response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+++", response.pointInTimeId()); } + ); + + assertEquals(1L, client.getPitContextCounter()); + + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + } + } + + public void testPitInjectionIfPitNotSupported() throws InterruptedException { + TransformConfig config = TransformConfigTests.randomTransformConfig(); + + try (PitMockClient client = new PitMockClient(getTestName(), false)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + Collections.emptyMap(), + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + assertEquals(0L, client.getPitContextCounter()); + + indexer.afterFinishOrFailure(); + assertEquals(0L, client.getPitContextCounter()); + + // check its not called again + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + assertEquals(0L, client.getPitContextCounter()); + + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + } + } + + private static class MockClientTransformIndexer extends ClientTransformIndexer { + + MockClientTransformIndexer( + ThreadPool threadPool, + TransformServices transformServices, + CheckpointProvider checkpointProvider, + AtomicReference initialState, + TransformIndexerPosition initialPosition, + Client client, + TransformIndexerStats initialStats, + TransformConfig transformConfig, + Map fieldMappings, + TransformProgress transformProgress, + TransformCheckpoint lastCheckpoint, + TransformCheckpoint nextCheckpoint, + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, + TransformContext context, + boolean shouldStopAtCheckpoint + ) { + super( + threadPool, + transformServices, + checkpointProvider, + initialState, + initialPosition, + client, + initialStats, + transformConfig, + fieldMappings, + transformProgress, + lastCheckpoint, + nextCheckpoint, + seqNoPrimaryTermAndIndex, + context, + shouldStopAtCheckpoint + ); + } + + @Override + protected SearchRequest buildSearchRequest() { + return new SearchRequest().source(new SearchSourceBuilder()); + } + } + + private static class PitMockClient extends NoOpClient { + private final boolean pitSupported; + private AtomicLong pitContextCounter = new AtomicLong(); + + PitMockClient(String testName, boolean pitSupported) { + super(testName); + this.pitSupported = pitSupported; + } + + public long getPitContextCounter() { + return pitContextCounter.get(); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (request instanceof OpenPointInTimeRequest) { + if (pitSupported) { + pitContextCounter.incrementAndGet(); + OpenPointInTimeResponse response = new OpenPointInTimeResponse("the_pit_id"); + listener.onResponse((Response) response); + } else { + listener.onFailure(new ActionNotFoundTransportException("_pit")); + } + return; + } else if (request instanceof ClosePointInTimeRequest) { + ClosePointInTimeResponse response = new ClosePointInTimeResponse(true, 1); + assert pitContextCounter.get() > 0; + pitContextCounter.decrementAndGet(); + listener.onResponse((Response) response); + return; + } else if (request instanceof SearchRequest) { + SearchRequest searchRequest = (SearchRequest) request; + + SearchResponse response = new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + // copy the pit from the request + searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null + ); + listener.onResponse((Response) response); + return; + } + + super.doExecute(action, request, listener); + } + } + + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + function.accept(listener); + assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS)); + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 4c72ad8a3b444..24fdcd22c4449 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -46,7 +47,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.Transform; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; @@ -116,9 +119,13 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + new TransformServices( + transformsConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ), checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -642,10 +649,7 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce throw new SearchPhaseExecutionException( "query", "Partial shards failure", - new ShardSearchFailure[] { - new ShardSearchFailure( - new ElasticsearchTimeoutException("timed out during dbq") - ) } + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) } ); }; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 63be4def6bfd5..3c2a7bb1456d4 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -42,8 +43,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; @@ -108,9 +111,8 @@ class MockedTransformIndexer extends TransformIndexer { MockedTransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -120,9 +122,8 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -593,12 +594,17 @@ private MockedTransformIndexer createMockIndexer( ) { CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider(config); transformConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {}, e -> {})); + TransformServices transformServices = new TransformServices( + transformConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ); MockedTransformIndexer indexer = new MockedTransformIndexer( threadPool, - transformConfigManager, + transformServices, checkpointProvider, - auditor, config, Collections.emptyMap(), state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 42120130a4032..bc2a488ed0bb1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -41,8 +42,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; @@ -110,9 +113,8 @@ class MockedTransformIndexer extends TransformIndexer { MockedTransformIndexer( int numberOfLoops, ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -122,9 +124,8 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -432,13 +433,18 @@ private MockedTransformIndexer createMockIndexer( ) { CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider(config); transformConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {}, e -> {})); + TransformServices transformServices = new TransformServices( + transformConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ); MockedTransformIndexer indexer = new MockedTransformIndexer( numberOfLoops, threadPool, - transformConfigManager, + transformServices, checkpointProvider, - auditor, config, Collections.emptyMap(), state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 044013bec70bc..b0801d67ef129 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; @@ -80,6 +81,12 @@ public void testStopOnFailedTaskWithStoppedIndexer() { transformsConfigManager, auditor ); + TransformServices transformServices = new TransformServices( + transformsConfigManager, + transformsCheckpointService, + auditor, + mock(SchedulerEngine.class) + ); TransformState transformState = new TransformState( TransformTaskState.FAILED, @@ -113,9 +120,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder(); indexerBuilder.setClient(new ParentTaskAssigningClient(client, TaskId.EMPTY_TASK_ID)) .setTransformConfig(transformConfig) - .setAuditor(auditor) - .setTransformsConfigManager(transformsConfigManager) - .setTransformsCheckpointService(transformsCheckpointService) + .setTransformServices(transformServices) .setFieldMappings(Collections.emptyMap()); transformTask.initializeIndexer(indexerBuilder);