From ba0c09bb895280b92779a0d1f4fc6145f74217f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20FOUCRET?= Date: Wed, 3 May 2023 13:02:30 +0200 Subject: [PATCH] [8.8] Check if an analytics event data stream exists before installing pipeline. (#95621) (#95775) --- docs/changelog/95621.yaml | 5 ++ .../AnalyticsIngestPipelineRegistry.java | 18 ++++++ .../utils/ingest/PipelineRegistry.java | 23 ++++--- .../AnalyticsIngestPipelineRegistryTests.java | 64 +++++++++++++++++-- .../upgrades/GeoIpUpgradeIT.java | 7 +- 5 files changed, 102 insertions(+), 15 deletions(-) create mode 100644 docs/changelog/95621.yaml diff --git a/docs/changelog/95621.yaml b/docs/changelog/95621.yaml new file mode 100644 index 0000000000000..57dd56da37dc9 --- /dev/null +++ b/docs/changelog/95621.yaml @@ -0,0 +1,5 @@ +pr: 95621 +summary: Check if an analytics event data stream exists before installing pipeline +area: Application +type: bug +issues: [] diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java index 83f58ca24b527..26c7792d3bee9 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistry.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.application.analytics; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.application.utils.ingest.PipelineRegistry; @@ -14,6 +16,7 @@ import java.util.Collections; import java.util.List; +import java.util.Set; import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX; import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.ROOT_RESOURCE_PATH; @@ -50,4 +53,19 @@ protected String getOrigin() { protected List getIngestPipelineConfigs() { return INGEST_PIPELINES; } + + @Override + protected boolean isClusterReady(ClusterChangedEvent event) { + return super.isClusterReady(event) && (isIngestPipelineInstalled(event.state()) || hasAnalyticsEventDataStream(event.state())); + } + + private boolean hasAnalyticsEventDataStream(ClusterState state) { + Set dataStreamNames = state.metadata().dataStreams().keySet(); + + return dataStreamNames.stream().anyMatch(dataStreamName -> dataStreamName.startsWith(EVENT_DATA_STREAM_INDEX_PREFIX)); + } + + private boolean isIngestPipelineInstalled(ClusterState state) { + return ingestPipelineExists(state, EVENT_DATA_STREAM_INGEST_PIPELINE_NAME); + } } diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java index 1534afbc9d5cf..e49056fcf885b 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/utils/ingest/PipelineRegistry.java @@ -62,25 +62,32 @@ public PipelineRegistry(ClusterService clusterService, ThreadPool threadPool, Cl @Override public void clusterChanged(ClusterChangedEvent event) { + + if (isClusterReady(event)) { + addIngestPipelinesIfMissing(event.state()); + } + } + + protected abstract String getOrigin(); + + protected abstract List getIngestPipelineConfigs(); + + protected boolean isClusterReady(ClusterChangedEvent event) { ClusterState state = event.state(); if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // wait until recovered from disk, so the cluster state view is consistent - return; + return false; } DiscoveryNode masterNode = event.state().getNodes().getMasterNode(); if (masterNode == null || state.nodes().isLocalNodeElectedMaster() == false) { // no master node elected or current node is not master - return; + return false; } - addIngestPipelinesIfMissing(state); + return true; } - protected abstract String getOrigin(); - - protected abstract List getIngestPipelineConfigs(); - private void addIngestPipelinesIfMissing(ClusterState state) { for (PipelineTemplateConfiguration pipelineTemplateConfig : getIngestPipelineConfigs()) { PipelineConfiguration newPipeline = pipelineTemplateConfig.load(); @@ -121,7 +128,7 @@ private void addIngestPipelinesIfMissing(ClusterState state) { } } - private static boolean ingestPipelineExists(ClusterState state, String pipelineId) { + protected boolean ingestPipelineExists(ClusterState state, String pipelineId) { Optional maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE)); return maybeMeta.isPresent() && maybeMeta.get().getPipelines().containsKey(pipelineId); } diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java index 819556395d012..2b107e8a27b2f 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/analytics/AnalyticsIngestPipelineRegistryTests.java @@ -19,14 +19,18 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.ingest.IngestMetadata; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.test.ClusterServiceUtils; @@ -123,6 +127,24 @@ public void testThatVersionedOldPipelinesAreUpgraded() throws Exception { assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelineConfigs().size()))); } + public void testThatPipelinesAreNotInstalledWhenNoAnalyticsCollectionExist() { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes, false); + + client.setVerifier((action, request, listener) -> { + if (action instanceof PutPipelineAction) { + fail("no behavioral analytics collection exists, pipeline should not be installed"); + } else { + fail("client called with unexpected request: " + request.toString()); + } + return null; + }); + + registry.clusterChanged(event); + } + public void testThatNewerPipelinesAreNotUpgraded() throws Exception { DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); @@ -211,7 +233,15 @@ private ActionResponse verifyIngestPipelinesInstalled( } private ClusterChangedEvent createClusterChangedEvent(Map existingIngestPipelines, DiscoveryNodes nodes) { - ClusterState cs = createClusterState(existingIngestPipelines, nodes); + return createClusterChangedEvent(existingIngestPipelines, nodes, true); + } + + private ClusterChangedEvent createClusterChangedEvent( + Map existingIngestPipelines, + DiscoveryNodes nodes, + boolean withDataStreams + ) { + ClusterState cs = createClusterState(existingIngestPipelines, nodes, withDataStreams); ClusterChangedEvent realEvent = new ClusterChangedEvent( "created-from-test", cs, @@ -223,21 +253,45 @@ private ClusterChangedEvent createClusterChangedEvent(Map exist return event; } - private ClusterState createClusterState(Map existingIngestPipelines, DiscoveryNodes nodes) { + private ClusterState createClusterState(Map existingIngestPipelines, DiscoveryNodes nodes, boolean withDataStreams) { Map pipelines = new HashMap<>(); for (Map.Entry e : existingIngestPipelines.entrySet()) { pipelines.put(e.getKey(), createMockPipelineConfiguration(e.getKey(), e.getValue())); } + Metadata.Builder metadataBuilder = Metadata.builder() + .transientSettings(Settings.EMPTY) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)); + + if (withDataStreams) { + DataStream dataStream = createDataStream(); + metadataBuilder.dataStreams( + MapBuilder.newMapBuilder().put(dataStream.getName(), dataStream).map(), + Collections.emptyMap() + ); + } + return ClusterState.builder(new ClusterName("test")) - .metadata( - Metadata.builder().transientSettings(Settings.EMPTY).putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)).build() - ) + .metadata(metadataBuilder) .blocks(new ClusterBlocks.Builder().build()) .nodes(nodes) .build(); } + private DataStream createDataStream() { + return new DataStream( + AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX + randomIdentifier(), + randomList(1, 10, () -> new Index(randomIdentifier(), randomIdentifier())), + 0, + Collections.emptyMap(), + false, + false, + false, + false, + IndexMode.STANDARD + ); + } + private PipelineConfiguration createMockPipelineConfiguration(String pipelineId, int version) { try (XContentBuilder configBuilder = JsonXContent.contentBuilder().startObject().field("version", version).endObject()) { BytesReference config = BytesReference.bytes(configBuilder); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java index e918bfc15a9b4..c531a8d9d618d 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/GeoIpUpgradeIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.upgrades; import org.apache.http.util.EntityUtils; +import org.elasticsearch.Version; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.hamcrest.Matchers; @@ -17,6 +18,8 @@ public class GeoIpUpgradeIT extends AbstractUpgradeTestCase { public void testGeoIpDownloader() throws Exception { + assumeTrue("Disabled until PR #95621 is backported to branch " + Version.V_8_8_0, UPGRADE_FROM_VERSION.onOrBefore(Version.V_8_7_0)); + if (CLUSTER_TYPE == ClusterType.UPGRADED) { assertBusy(() -> { Response response = client().performRequest(new Request("GET", "_cat/tasks")); @@ -26,8 +29,8 @@ public void testGeoIpDownloader() throws Exception { assertBusy(() -> { Response response = client().performRequest(new Request("GET", "_ingest/geoip/stats")); String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - // The geoip downloader should be executed since a geoip processors is present in behavioral analytics default pipeline: - assertThat(tasks, Matchers.containsString("failed_downloads\":1")); + // The geoip downloader doesn't actually do anything since there are no geoip processors: + assertThat(tasks, Matchers.containsString("failed_downloads\":0")); assertThat(tasks, Matchers.containsString("successful_downloads\":0")); }); }