diff --git a/docs/changelog/107970.yaml b/docs/changelog/107970.yaml new file mode 100644 index 0000000000000..d1c16079f4e62 --- /dev/null +++ b/docs/changelog/107970.yaml @@ -0,0 +1,5 @@ +pr: 107970 +summary: "Revert \"[Transform] Allow transforms to use PIT with remote clusters …" +area: Transform +type: bug +issues: [] diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 1634f417924c0..d726253e4c997 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -477,7 +477,8 @@ private void injectPointInTimeIfNeeded( ActionListener> listener ) { SearchRequest searchRequest = namedSearchRequest.v2(); - if (disablePit || searchRequest.indices().length == 0) { + // We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems. + if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) { listener.onResponse(namedSearchRequest); return; } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 43a8f35cfeafe..e4aefd91219a7 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -301,14 +301,8 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException { } public void testDisablePit() throws InterruptedException { - TransformConfig.Builder configBuilder = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()); - if (randomBoolean()) { - // TransformConfigTests.randomTransformConfig never produces remote indices in the source. - // We need to explicitly set the remote index here for coverage. - configBuilder.setSource(new SourceConfig("remote-cluster:remote-index")); - } - TransformConfig config = configBuilder.build(); - + // TransformConfigTests.randomTransformConfig never produces remote indices in the source, hence we are safe here. */ + TransformConfig config = TransformConfigTests.randomTransformConfig(); boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit(); try (var threadPool = createThreadPool()) { @@ -371,6 +365,69 @@ public void testDisablePit() throws InterruptedException { } } + public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedException { + TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()) + // Remote index is configured within source + .setSource(new SourceConfig("remote-cluster:remote-index")) + .build(); + boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit(); + + try (var threadPool = createThreadPool()) { + final var client = new PitMockClient(threadPool, true); + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + mock(TransformExtension.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")), + mock(TransformIndexerStats.class), + config, + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> assertNull(response.pointInTimeId()) + ); + + // reverse the setting + indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build()); + + // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> assertNull(response.pointInTimeId()) + ); + } + } + public void testHandlePitIndexNotFound() throws InterruptedException { // simulate a deleted index due to ILM try (var threadPool = createThreadPool()) {