Skip to content

Commit

Permalink
[8.8] Check if an analytics event data stream exists before installin…
Browse files Browse the repository at this point in the history
…g pipeline. (elastic#95621) (elastic#95775)
  • Loading branch information
afoucret authored May 3, 2023
1 parent 0846eae commit ba0c09b
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 15 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/95621.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 95621
summary: Check if an analytics event data stream exists before installing pipeline
area: Application
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
package org.elasticsearch.xpack.application.analytics;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.application.utils.ingest.PipelineRegistry;
import org.elasticsearch.xpack.application.utils.ingest.PipelineTemplateConfiguration;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX;
import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.ROOT_RESOURCE_PATH;
Expand Down Expand Up @@ -50,4 +53,19 @@ protected String getOrigin() {
protected List<PipelineTemplateConfiguration> getIngestPipelineConfigs() {
return INGEST_PIPELINES;
}

@Override
protected boolean isClusterReady(ClusterChangedEvent event) {
return super.isClusterReady(event) && (isIngestPipelineInstalled(event.state()) || hasAnalyticsEventDataStream(event.state()));
}

private boolean hasAnalyticsEventDataStream(ClusterState state) {
Set<String> dataStreamNames = state.metadata().dataStreams().keySet();

return dataStreamNames.stream().anyMatch(dataStreamName -> dataStreamName.startsWith(EVENT_DATA_STREAM_INDEX_PREFIX));
}

private boolean isIngestPipelineInstalled(ClusterState state) {
return ingestPipelineExists(state, EVENT_DATA_STREAM_INGEST_PIPELINE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,32 @@ public PipelineRegistry(ClusterService clusterService, ThreadPool threadPool, Cl

@Override
public void clusterChanged(ClusterChangedEvent event) {

if (isClusterReady(event)) {
addIngestPipelinesIfMissing(event.state());
}
}

protected abstract String getOrigin();

protected abstract List<PipelineTemplateConfiguration> getIngestPipelineConfigs();

protected boolean isClusterReady(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until recovered from disk, so the cluster state view is consistent
return;
return false;
}

DiscoveryNode masterNode = event.state().getNodes().getMasterNode();
if (masterNode == null || state.nodes().isLocalNodeElectedMaster() == false) {
// no master node elected or current node is not master
return;
return false;
}

addIngestPipelinesIfMissing(state);
return true;
}

protected abstract String getOrigin();

protected abstract List<PipelineTemplateConfiguration> getIngestPipelineConfigs();

private void addIngestPipelinesIfMissing(ClusterState state) {
for (PipelineTemplateConfiguration pipelineTemplateConfig : getIngestPipelineConfigs()) {
PipelineConfiguration newPipeline = pipelineTemplateConfig.load();
Expand Down Expand Up @@ -121,7 +128,7 @@ private void addIngestPipelinesIfMissing(ClusterState state) {
}
}

private static boolean ingestPipelineExists(ClusterState state, String pipelineId) {
protected boolean ingestPipelineExists(ClusterState state, String pipelineId) {
Optional<IngestMetadata> maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE));
return maybeMeta.isPresent() && maybeMeta.get().getPipelines().containsKey(pipelineId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.test.ClusterServiceUtils;
Expand Down Expand Up @@ -123,6 +127,24 @@ public void testThatVersionedOldPipelinesAreUpgraded() throws Exception {
assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelineConfigs().size())));
}

public void testThatPipelinesAreNotInstalledWhenNoAnalyticsCollectionExist() {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();

ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes, false);

client.setVerifier((action, request, listener) -> {
if (action instanceof PutPipelineAction) {
fail("no behavioral analytics collection exists, pipeline should not be installed");
} else {
fail("client called with unexpected request: " + request.toString());
}
return null;
});

registry.clusterChanged(event);
}

public void testThatNewerPipelinesAreNotUpgraded() throws Exception {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Expand Down Expand Up @@ -211,7 +233,15 @@ private ActionResponse verifyIngestPipelinesInstalled(
}

private ClusterChangedEvent createClusterChangedEvent(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes) {
ClusterState cs = createClusterState(existingIngestPipelines, nodes);
return createClusterChangedEvent(existingIngestPipelines, nodes, true);
}

private ClusterChangedEvent createClusterChangedEvent(
Map<String, Integer> existingIngestPipelines,
DiscoveryNodes nodes,
boolean withDataStreams
) {
ClusterState cs = createClusterState(existingIngestPipelines, nodes, withDataStreams);
ClusterChangedEvent realEvent = new ClusterChangedEvent(
"created-from-test",
cs,
Expand All @@ -223,21 +253,45 @@ private ClusterChangedEvent createClusterChangedEvent(Map<String, Integer> exist
return event;
}

private ClusterState createClusterState(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes) {
private ClusterState createClusterState(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes, boolean withDataStreams) {
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
for (Map.Entry<String, Integer> e : existingIngestPipelines.entrySet()) {
pipelines.put(e.getKey(), createMockPipelineConfiguration(e.getKey(), e.getValue()));
}

Metadata.Builder metadataBuilder = Metadata.builder()
.transientSettings(Settings.EMPTY)
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines));

if (withDataStreams) {
DataStream dataStream = createDataStream();
metadataBuilder.dataStreams(
MapBuilder.<String, DataStream>newMapBuilder().put(dataStream.getName(), dataStream).map(),
Collections.emptyMap()
);
}

return ClusterState.builder(new ClusterName("test"))
.metadata(
Metadata.builder().transientSettings(Settings.EMPTY).putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)).build()
)
.metadata(metadataBuilder)
.blocks(new ClusterBlocks.Builder().build())
.nodes(nodes)
.build();
}

private DataStream createDataStream() {
return new DataStream(
AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX + randomIdentifier(),
randomList(1, 10, () -> new Index(randomIdentifier(), randomIdentifier())),
0,
Collections.emptyMap(),
false,
false,
false,
false,
IndexMode.STANDARD
);
}

private PipelineConfiguration createMockPipelineConfiguration(String pipelineId, int version) {
try (XContentBuilder configBuilder = JsonXContent.contentBuilder().startObject().field("version", version).endObject()) {
BytesReference config = BytesReference.bytes(configBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.upgrades;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.hamcrest.Matchers;
Expand All @@ -17,6 +18,8 @@
public class GeoIpUpgradeIT extends AbstractUpgradeTestCase {

public void testGeoIpDownloader() throws Exception {
assumeTrue("Disabled until PR #95621 is backported to branch " + Version.V_8_8_0, UPGRADE_FROM_VERSION.onOrBefore(Version.V_8_7_0));

if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertBusy(() -> {
Response response = client().performRequest(new Request("GET", "_cat/tasks"));
Expand All @@ -26,8 +29,8 @@ public void testGeoIpDownloader() throws Exception {
assertBusy(() -> {
Response response = client().performRequest(new Request("GET", "_ingest/geoip/stats"));
String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
// The geoip downloader should be executed since a geoip processors is present in behavioral analytics default pipeline:
assertThat(tasks, Matchers.containsString("failed_downloads\":1"));
// The geoip downloader doesn't actually do anything since there are no geoip processors:
assertThat(tasks, Matchers.containsString("failed_downloads\":0"));
assertThat(tasks, Matchers.containsString("successful_downloads\":0"));
});
}
Expand Down

0 comments on commit ba0c09b

Please sign in to comment.