diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java index 15d1ee1e849bc..8592e589af0fb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformServices.java @@ -29,12 +29,12 @@ public final class TransformServices { public TransformServices( TransformConfigManager transformConfigManager, - TransformCheckpointService checkpointProvider, + TransformCheckpointService checkpointService, TransformAuditor transformAuditor, SchedulerEngine schedulerEngine ) { this.configManager = Objects.requireNonNull(transformConfigManager); - this.checkpointService = Objects.requireNonNull(checkpointProvider); + this.checkpointService = Objects.requireNonNull(checkpointService); this.auditor = Objects.requireNonNull(transformAuditor); this.schedulerEngine = Objects.requireNonNull(schedulerEngine); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 2bcca9b3de6ba..72f8f65e30e4a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -19,18 +19,26 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.ClosePointInTimeAction; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeAction; +import org.elasticsearch.action.search.OpenPointInTimeRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.search.SearchContextMissingException; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -42,10 +50,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; -import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; -import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; import java.util.Collection; @@ -57,21 +64,24 @@ class ClientTransformIndexer extends TransformIndexer { + private static final TimeValue PIT_KEEP_ALIVE = TimeValue.timeValueSeconds(30); private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class); private final Client client; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); private final AtomicReference seqNoPrimaryTermAndIndex; + private volatile PointInTimeBuilder pit; + private volatile long pitCheckpoint; + private volatile boolean disablePit = false; ClientTransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, AtomicReference initialState, TransformIndexerPosition initialPosition, Client client, - TransformAuditor auditor, TransformIndexerStats initialStats, TransformConfig transformConfig, Map fieldMappings, @@ -84,9 +94,8 @@ class ClientTransformIndexer extends TransformIndexer { ) { super( ExceptionsHelper.requireNonNull(threadPool, "threadPool"), - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, ExceptionsHelper.requireNonNull(initialState, "initialState"), @@ -111,13 +120,14 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener nextPhase.onFailure(new ElasticsearchException("Attempted to do a search request for failed transform [{}].", getJobId())); return; } - ClientHelper.executeWithHeadersAsync( - transformConfig.getHeaders(), - ClientHelper.TRANSFORM_ORIGIN, - client, - SearchAction.INSTANCE, + + if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) { + closePointInTime(); + } + + injectPointInTimeIfNeeded( buildSearchRequest(), - nextPhase + ActionListener.wrap(pitSearchRequest -> { doSearch(pitSearchRequest, nextPhase); }, nextPhase::onFailure) ); } @@ -440,6 +450,134 @@ SeqNoPrimaryTermAndIndex getSeqNoPrimaryTermAndIndex() { return seqNoPrimaryTermAndIndex.get(); } + @Override + protected void afterFinishOrFailure() { + closePointInTime(); + super.afterFinishOrFailure(); + } + + @Override + protected void onStop() { + closePointInTime(); + super.onStop(); + } + + private void closePointInTime() { + if (pit == null) { + return; + } + + String oldPit = pit.getEncodedId(); + pit = null; + ClosePointInTimeRequest closePitRequest = new ClosePointInTimeRequest(oldPit); + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + ClosePointInTimeAction.INSTANCE, + closePitRequest, + ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> { + // note: closing the pit should never throw, even if the pit is invalid + logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e); + }) + ); + } + + private void injectPointInTimeIfNeeded(SearchRequest searchRequest, ActionListener listener) { + if (disablePit) { + listener.onResponse(searchRequest); + return; + } + + if (pit != null) { + searchRequest.source().pointInTimeBuilder(pit); + listener.onResponse(searchRequest); + return; + } + + // no pit, create a new one + OpenPointInTimeRequest pitRequest = new OpenPointInTimeRequest(transformConfig.getSource().getIndex()).keepAlive(PIT_KEEP_ALIVE); + + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + OpenPointInTimeAction.INSTANCE, + pitRequest, + ActionListener.wrap(response -> { + pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); + searchRequest.source().pointInTimeBuilder(pit); + pitCheckpoint = getNextCheckpoint().getCheckpoint(); + logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId()); + listener.onResponse(searchRequest); + }, e -> { + Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); + // if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another + // try) + if (unwrappedException instanceof ActionNotFoundTransportException) { + logger.warn( + "[{}] source does not support point in time reader, falling back to normal search (more resource intensive)", + getJobId() + ); + auditor.warning( + getJobId(), + "Source does not support point in time reader, falling back to normal search (more resource intensive)" + ); + disablePit = true; + } else { + logger.warn( + new ParameterizedMessage( + "[{}] Failed to create a point in time reader, falling back to normal search.", + getJobId() + ), + e + ); + } + listener.onResponse(searchRequest); + }) + ); + } + + private void doSearch(SearchRequest searchRequest, ActionListener listener) { + logger.trace("searchRequest: {}", searchRequest); + + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + searchRequest, + ActionListener.wrap(response -> { + // did the pit change? + if (response.pointInTimeId() != null && (pit == null || response.pointInTimeId() != pit.getEncodedId())) { + pit = new PointInTimeBuilder(response.pointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); + logger.trace("point in time handle has changed"); + } + + listener.onResponse(response); + }, e -> { + // check if the error has been caused by a missing search context, which could be a timed out pit + // re-try this search without pit, if it fails again the normal failure handler is called, if it + // succeeds a new pit gets created at the next run + Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); + if (unwrappedException instanceof SearchContextMissingException) { + logger.warn(new ParameterizedMessage("[{}] Search context missing, falling back to normal search.", getJobId()), e); + pit = null; + searchRequest.source().pointInTimeBuilder(null); + ClientHelper.executeWithHeadersAsync( + transformConfig.getHeaders(), + ClientHelper.TRANSFORM_ORIGIN, + client, + SearchAction.INSTANCE, + searchRequest, + listener + ); + } + listener.onFailure(e); + }) + ); + } + private static String getBulkIndexDetailedFailureMessage(String prefix, Map failures) { if (failures.isEmpty()) { return ""; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index 339d0426483b7..4cb8899cd9fdc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -15,20 +15,16 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; -import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; -import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; -import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; class ClientTransformIndexerBuilder { private ParentTaskAssigningClient parentTaskClient; - private TransformConfigManager transformsConfigManager; - private TransformCheckpointService transformsCheckpointService; - private TransformAuditor auditor; + private TransformServices transformServices; private Map fieldMappings; private TransformConfig transformConfig; private TransformIndexerStats initialStats; @@ -45,16 +41,16 @@ class ClientTransformIndexerBuilder { } ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) { - CheckpointProvider checkpointProvider = transformsCheckpointService.getCheckpointProvider(parentTaskClient, transformConfig); + CheckpointProvider checkpointProvider = transformServices.getCheckpointService() + .getCheckpointProvider(parentTaskClient, transformConfig); return new ClientTransformIndexer( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, new AtomicReference<>(this.indexerState), initialPosition, parentTaskClient, - auditor, initialStats, transformConfig, fieldMappings, @@ -77,18 +73,8 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie return this; } - ClientTransformIndexerBuilder setTransformsConfigManager(TransformConfigManager transformsConfigManager) { - this.transformsConfigManager = transformsConfigManager; - return this; - } - - ClientTransformIndexerBuilder setTransformsCheckpointService(TransformCheckpointService transformsCheckpointService) { - this.transformsCheckpointService = transformsCheckpointService; - return this; - } - - ClientTransformIndexerBuilder setAuditor(TransformAuditor auditor) { - this.auditor = auditor; + ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) { + this.transformServices = transformServices; return this; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index eba9b28aab09a..38db0d1bf9120 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -19,8 +19,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -112,8 +113,8 @@ private enum RunState { private volatile Integer initialConfiguredPageSize; private volatile int pageSize = 0; - private long logEvery = 1; - private long logCount = 0; + private volatile long logEvery = 1; + private volatile long logCount = 0; private volatile TransformCheckpoint lastCheckpoint; private volatile TransformCheckpoint nextCheckpoint; @@ -128,9 +129,8 @@ private enum RunState { public TransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -142,16 +142,16 @@ public TransformIndexer( TransformContext context ) { super(threadPool, initialState, initialPosition, jobStats); - this.transformsConfigManager = ExceptionsHelper.requireNonNull(transformsConfigManager, "transformsConfigManager"); + ExceptionsHelper.requireNonNull(transformServices, "transformServices"); + this.transformsConfigManager = transformServices.getConfigManager(); this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); - this.auditor = ExceptionsHelper.requireNonNull(auditor, "auditor"); + this.auditor = transformServices.getAuditor(); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); this.fieldMappings = ExceptionsHelper.requireNonNull(fieldMappings, "fieldMappings"); this.progress = transformProgress; this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint"); this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint"); this.context = ExceptionsHelper.requireNonNull(context, "context"); - // give runState a default this.runState = RunState.APPLY_RESULTS; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 0d6930061dcfa..ac2b0f66e8df1 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -21,10 +21,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.ValidationException; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Tuple; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -94,12 +94,16 @@ public TransformPersistentTasksExecutor( } @Override - public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParams params, - Collection candidateNodes, - ClusterState clusterState) { + public PersistentTasksCustomMetadata.Assignment getAssignment( + TransformTaskParams params, + Collection candidateNodes, + ClusterState clusterState + ) { if (TransformMetadata.getTransformMetadata(clusterState).isResetMode()) { - return new PersistentTasksCustomMetadata.Assignment(null, - "Transform task will not be assigned as a feature reset is in progress."); + return new PersistentTasksCustomMetadata.Assignment( + null, + "Transform task will not be assigned as a feature reset is in progress." + ); } List unavailableIndices = verifyIndicesPrimaryShardsAreActive(clusterState, resolver); if (unavailableIndices.size() != 0) { @@ -113,7 +117,8 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(TransformTaskParam return new PersistentTasksCustomMetadata.Assignment(null, reason); } DiscoveryNode discoveryNode = selectLeastLoadedNode( - clusterState, candidateNodes, + clusterState, + candidateNodes, node -> nodeCanRunThisTransform(node, params.getVersion(), params.requiresRemote(), null) ); @@ -164,10 +169,8 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // // We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again // later if they want. - final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder().setAuditor(auditor) - .setClient(buildTask.getParentTaskClient()) - .setTransformsCheckpointService(transformServices.getCheckpointService()) - .setTransformsConfigManager(transformServices.getConfigManager()); + final ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder().setClient(buildTask.getParentTaskClient()) + .setTransformServices(transformServices); final SetOnce stateHolder = new SetOnce<>(); @@ -284,14 +287,19 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ValidationException validationException = config.validate(null); if (validationException == null) { indexerBuilder.setTransformConfig(config); - SchemaUtil.getDestinationFieldMappings(buildTask.getParentTaskClient(), config.getDestination().getIndex(), - getFieldMappingsListener); + SchemaUtil.getDestinationFieldMappings( + buildTask.getParentTaskClient(), + config.getDestination().getIndex(), + getFieldMappingsListener + ); } else { auditor.error(transformId, validationException.getMessage()); markAsFailed( buildTask, TransformMessages.getMessage( - TransformMessages.TRANSFORM_CONFIGURATION_INVALID, transformId, validationException.getMessage() + TransformMessages.TRANSFORM_CONFIGURATION_INVALID, + transformId, + validationException.getMessage() ) ); } @@ -313,8 +321,11 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ); // <1> Check the index templates are installed - TransformInternalIndex.ensureLatestIndexAndTemplateInstalled(clusterService, buildTask.getParentTaskClient(), - templateCheckListener); + TransformInternalIndex.ensureLatestIndexAndTemplateInstalled( + clusterService, + buildTask.getParentTaskClient(), + templateCheckListener + ); } private static IndexerState currentIndexerState(TransformState previousState) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index bc83a168c9371..7d54ca009d3a3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -7,15 +7,42 @@ package org.elasticsearch.xpack.transform.transforms; +import org.apache.lucene.search.TotalHits; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.ClosePointInTimeResponse; +import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.profile.SearchProfileShardResults; +import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; @@ -23,8 +50,14 @@ import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -39,12 +72,16 @@ public void testAudiOnFinishFrequency() { ClientTransformIndexer indexer = new ClientTransformIndexer( mock(ThreadPool.class), - mock(IndexBasedTransformConfigManager.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), null, mock(Client.class), - mock(TransformAuditor.class), mock(TransformIndexerStats.class), mock(TransformConfig.class), Collections.emptyMap(), @@ -87,4 +124,277 @@ public void testAudiOnFinishFrequency() { assertFalse(shouldAudit.get(11_999)); } + public void testPitInjection() throws InterruptedException { + TransformConfig config = TransformConfigTests.randomTransformConfig(); + + try (PitMockClient client = new PitMockClient(getTestName(), true)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + Collections.emptyMap(), + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+", response.pointInTimeId()); } + ); + + assertEquals(1L, client.getPitContextCounter()); + + indexer.afterFinishOrFailure(); + assertEquals(0L, client.getPitContextCounter()); + + // check its not called again + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+", response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id++", response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertEquals("the_pit_id+++", response.pointInTimeId()); } + ); + + assertEquals(1L, client.getPitContextCounter()); + + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + } + } + + public void testPitInjectionIfPitNotSupported() throws InterruptedException { + TransformConfig config = TransformConfigTests.randomTransformConfig(); + + try (PitMockClient client = new PitMockClient(getTestName(), false)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + Collections.emptyMap(), + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + assertEquals(0L, client.getPitContextCounter()); + + indexer.afterFinishOrFailure(); + assertEquals(0L, client.getPitContextCounter()); + + // check its not called again + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> { assertNull(response.pointInTimeId()); } + ); + + assertEquals(0L, client.getPitContextCounter()); + + indexer.onStop(); + assertEquals(0L, client.getPitContextCounter()); + } + } + + private static class MockClientTransformIndexer extends ClientTransformIndexer { + + MockClientTransformIndexer( + ThreadPool threadPool, + TransformServices transformServices, + CheckpointProvider checkpointProvider, + AtomicReference initialState, + TransformIndexerPosition initialPosition, + Client client, + TransformIndexerStats initialStats, + TransformConfig transformConfig, + Map fieldMappings, + TransformProgress transformProgress, + TransformCheckpoint lastCheckpoint, + TransformCheckpoint nextCheckpoint, + SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex, + TransformContext context, + boolean shouldStopAtCheckpoint + ) { + super( + threadPool, + transformServices, + checkpointProvider, + initialState, + initialPosition, + client, + initialStats, + transformConfig, + fieldMappings, + transformProgress, + lastCheckpoint, + nextCheckpoint, + seqNoPrimaryTermAndIndex, + context, + shouldStopAtCheckpoint + ); + } + + @Override + protected SearchRequest buildSearchRequest() { + return new SearchRequest().source(new SearchSourceBuilder()); + } + } + + private static class PitMockClient extends NoOpClient { + private final boolean pitSupported; + private AtomicLong pitContextCounter = new AtomicLong(); + + PitMockClient(String testName, boolean pitSupported) { + super(testName); + this.pitSupported = pitSupported; + } + + public long getPitContextCounter() { + return pitContextCounter.get(); + } + + @SuppressWarnings("unchecked") + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + if (request instanceof OpenPointInTimeRequest) { + if (pitSupported) { + pitContextCounter.incrementAndGet(); + OpenPointInTimeResponse response = new OpenPointInTimeResponse("the_pit_id"); + listener.onResponse((Response) response); + } else { + listener.onFailure(new ActionNotFoundTransportException("_pit")); + } + return; + } else if (request instanceof ClosePointInTimeRequest) { + ClosePointInTimeResponse response = new ClosePointInTimeResponse(true, 1); + assert pitContextCounter.get() > 0; + pitContextCounter.decrementAndGet(); + listener.onResponse((Response) response); + return; + } else if (request instanceof SearchRequest) { + SearchRequest searchRequest = (SearchRequest) request; + + SearchResponse response = new SearchResponse( + new InternalSearchResponse( + new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f), + // Simulate completely null aggs + null, + new Suggest(Collections.emptyList()), + new SearchProfileShardResults(Collections.emptyMap()), + false, + false, + 1 + ), + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY, + // copy the pit from the request + searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null + ); + listener.onResponse((Response) response); + return; + } + + super.doExecute(action, request, listener); + } + } + + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean listenerCalled = new AtomicBoolean(false); + + LatchedActionListener listener = new LatchedActionListener<>(ActionListener.wrap(r -> { + assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true)); + furtherTests.accept(r); + }, e -> { fail("got unexpected exception: " + e); }), latch); + + function.accept(listener); + assertTrue("timed out after 5s", latch.await(5, TimeUnit.SECONDS)); + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index fc0e1170fd5dc..64f9a4b7c551c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -46,7 +47,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.Transform; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; @@ -116,9 +119,13 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + new TransformServices( + transformsConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ), checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -642,10 +649,7 @@ public void testRetentionPolicyDeleteByQueryThrowsTemporaryProblem() throws Exce throw new SearchPhaseExecutionException( "query", "Partial shards failure", - new ShardSearchFailure[] { - new ShardSearchFailure( - new ElasticsearchTimeoutException("timed out during dbq") - ) } + new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchTimeoutException("timed out during dbq")) } ); }; diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 63be4def6bfd5..3c2a7bb1456d4 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -42,8 +43,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; @@ -108,9 +111,8 @@ class MockedTransformIndexer extends TransformIndexer { MockedTransformIndexer( ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -120,9 +122,8 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -593,12 +594,17 @@ private MockedTransformIndexer createMockIndexer( ) { CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider(config); transformConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {}, e -> {})); + TransformServices transformServices = new TransformServices( + transformConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ); MockedTransformIndexer indexer = new MockedTransformIndexer( threadPool, - transformConfigManager, + transformServices, checkpointProvider, - auditor, config, Collections.emptyMap(), state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 42120130a4032..bc2a488ed0bb1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IterationResult; +import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfigTests; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -41,8 +42,10 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.MockTimebasedCheckpointProvider; +import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager; @@ -110,9 +113,8 @@ class MockedTransformIndexer extends TransformIndexer { MockedTransformIndexer( int numberOfLoops, ThreadPool threadPool, - TransformConfigManager transformsConfigManager, + TransformServices transformServices, CheckpointProvider checkpointProvider, - TransformAuditor auditor, TransformConfig transformConfig, Map fieldMappings, AtomicReference initialState, @@ -122,9 +124,8 @@ class MockedTransformIndexer extends TransformIndexer { ) { super( threadPool, - transformsConfigManager, + transformServices, checkpointProvider, - auditor, transformConfig, fieldMappings, initialState, @@ -432,13 +433,18 @@ private MockedTransformIndexer createMockIndexer( ) { CheckpointProvider checkpointProvider = new MockTimebasedCheckpointProvider(config); transformConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {}, e -> {})); + TransformServices transformServices = new TransformServices( + transformConfigManager, + mock(TransformCheckpointService.class), + auditor, + mock(SchedulerEngine.class) + ); MockedTransformIndexer indexer = new MockedTransformIndexer( numberOfLoops, threadPool, - transformConfigManager, + transformServices, checkpointProvider, - auditor, config, Collections.emptyMap(), state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 044013bec70bc..b0801d67ef129 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; @@ -80,6 +81,12 @@ public void testStopOnFailedTaskWithStoppedIndexer() { transformsConfigManager, auditor ); + TransformServices transformServices = new TransformServices( + transformsConfigManager, + transformsCheckpointService, + auditor, + mock(SchedulerEngine.class) + ); TransformState transformState = new TransformState( TransformTaskState.FAILED, @@ -113,9 +120,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { ClientTransformIndexerBuilder indexerBuilder = new ClientTransformIndexerBuilder(); indexerBuilder.setClient(new ParentTaskAssigningClient(client, TaskId.EMPTY_TASK_ID)) .setTransformConfig(transformConfig) - .setAuditor(auditor) - .setTransformsConfigManager(transformsConfigManager) - .setTransformsCheckpointService(transformsCheckpointService) + .setTransformServices(transformServices) .setFieldMappings(Collections.emptyMap()); transformTask.initializeIndexer(indexerBuilder);