Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.8] Check if an analytics event data stream exists before installing pipeline. (#95621) #95775

Merged
merged 3 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/95621.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 95621
summary: Check if an analytics event data stream exists before installing pipeline
area: Application
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
package org.elasticsearch.xpack.application.analytics;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.application.utils.ingest.PipelineRegistry;
import org.elasticsearch.xpack.application.utils.ingest.PipelineTemplateConfiguration;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX;
import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.ROOT_RESOURCE_PATH;
Expand Down Expand Up @@ -50,4 +53,19 @@ protected String getOrigin() {
protected List<PipelineTemplateConfiguration> getIngestPipelineConfigs() {
return INGEST_PIPELINES;
}

@Override
protected boolean isClusterReady(ClusterChangedEvent event) {
return super.isClusterReady(event) && (isIngestPipelineInstalled(event.state()) || hasAnalyticsEventDataStream(event.state()));
}

private boolean hasAnalyticsEventDataStream(ClusterState state) {
Set<String> dataStreamNames = state.metadata().dataStreams().keySet();

return dataStreamNames.stream().anyMatch(dataStreamName -> dataStreamName.startsWith(EVENT_DATA_STREAM_INDEX_PREFIX));
}

private boolean isIngestPipelineInstalled(ClusterState state) {
return ingestPipelineExists(state, EVENT_DATA_STREAM_INGEST_PIPELINE_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,32 @@ public PipelineRegistry(ClusterService clusterService, ThreadPool threadPool, Cl

@Override
public void clusterChanged(ClusterChangedEvent event) {

if (isClusterReady(event)) {
addIngestPipelinesIfMissing(event.state());
}
}

protected abstract String getOrigin();

protected abstract List<PipelineTemplateConfiguration> getIngestPipelineConfigs();

protected boolean isClusterReady(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until recovered from disk, so the cluster state view is consistent
return;
return false;
}

DiscoveryNode masterNode = event.state().getNodes().getMasterNode();
if (masterNode == null || state.nodes().isLocalNodeElectedMaster() == false) {
// no master node elected or current node is not master
return;
return false;
}

addIngestPipelinesIfMissing(state);
return true;
}

protected abstract String getOrigin();

protected abstract List<PipelineTemplateConfiguration> getIngestPipelineConfigs();

private void addIngestPipelinesIfMissing(ClusterState state) {
for (PipelineTemplateConfiguration pipelineTemplateConfig : getIngestPipelineConfigs()) {
PipelineConfiguration newPipeline = pipelineTemplateConfig.load();
Expand Down Expand Up @@ -121,7 +128,7 @@ private void addIngestPipelinesIfMissing(ClusterState state) {
}
}

private static boolean ingestPipelineExists(ClusterState state, String pipelineId) {
protected boolean ingestPipelineExists(ClusterState state, String pipelineId) {
Optional<IngestMetadata> maybeMeta = Optional.ofNullable(state.metadata().custom(IngestMetadata.TYPE));
return maybeMeta.isPresent() && maybeMeta.get().getPipelines().containsKey(pipelineId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.test.ClusterServiceUtils;
Expand Down Expand Up @@ -123,6 +127,24 @@ public void testThatVersionedOldPipelinesAreUpgraded() throws Exception {
assertBusy(() -> assertThat(calledTimes.get(), equalTo(registry.getIngestPipelineConfigs().size())));
}

public void testThatPipelinesAreNotInstalledWhenNoAnalyticsCollectionExist() {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();

ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyMap(), nodes, false);

client.setVerifier((action, request, listener) -> {
if (action instanceof PutPipelineAction) {
fail("no behavioral analytics collection exists, pipeline should not be installed");
} else {
fail("client called with unexpected request: " + request.toString());
}
return null;
});

registry.clusterChanged(event);
}

public void testThatNewerPipelinesAreNotUpgraded() throws Exception {
DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();
Expand Down Expand Up @@ -211,7 +233,15 @@ private ActionResponse verifyIngestPipelinesInstalled(
}

private ClusterChangedEvent createClusterChangedEvent(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes) {
ClusterState cs = createClusterState(existingIngestPipelines, nodes);
return createClusterChangedEvent(existingIngestPipelines, nodes, true);
}

private ClusterChangedEvent createClusterChangedEvent(
Map<String, Integer> existingIngestPipelines,
DiscoveryNodes nodes,
boolean withDataStreams
) {
ClusterState cs = createClusterState(existingIngestPipelines, nodes, withDataStreams);
ClusterChangedEvent realEvent = new ClusterChangedEvent(
"created-from-test",
cs,
Expand All @@ -223,21 +253,45 @@ private ClusterChangedEvent createClusterChangedEvent(Map<String, Integer> exist
return event;
}

private ClusterState createClusterState(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes) {
private ClusterState createClusterState(Map<String, Integer> existingIngestPipelines, DiscoveryNodes nodes, boolean withDataStreams) {
Map<String, PipelineConfiguration> pipelines = new HashMap<>();
for (Map.Entry<String, Integer> e : existingIngestPipelines.entrySet()) {
pipelines.put(e.getKey(), createMockPipelineConfiguration(e.getKey(), e.getValue()));
}

Metadata.Builder metadataBuilder = Metadata.builder()
.transientSettings(Settings.EMPTY)
.putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines));

if (withDataStreams) {
DataStream dataStream = createDataStream();
metadataBuilder.dataStreams(
MapBuilder.<String, DataStream>newMapBuilder().put(dataStream.getName(), dataStream).map(),
Collections.emptyMap()
);
}

return ClusterState.builder(new ClusterName("test"))
.metadata(
Metadata.builder().transientSettings(Settings.EMPTY).putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)).build()
)
.metadata(metadataBuilder)
.blocks(new ClusterBlocks.Builder().build())
.nodes(nodes)
.build();
}

private DataStream createDataStream() {
return new DataStream(
AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX + randomIdentifier(),
randomList(1, 10, () -> new Index(randomIdentifier(), randomIdentifier())),
0,
Collections.emptyMap(),
false,
false,
false,
false,
IndexMode.STANDARD
);
}

private PipelineConfiguration createMockPipelineConfiguration(String pipelineId, int version) {
try (XContentBuilder configBuilder = JsonXContent.contentBuilder().startObject().field("version", version).endObject()) {
BytesReference config = BytesReference.bytes(configBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.upgrades;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.hamcrest.Matchers;
Expand All @@ -17,6 +18,8 @@
public class GeoIpUpgradeIT extends AbstractUpgradeTestCase {

public void testGeoIpDownloader() throws Exception {
assumeTrue("Disabled until PR #95621 is backported to branch " + Version.V_8_8_0, UPGRADE_FROM_VERSION.onOrBefore(Version.V_8_7_0));

if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertBusy(() -> {
Response response = client().performRequest(new Request("GET", "_cat/tasks"));
Expand All @@ -26,8 +29,8 @@ public void testGeoIpDownloader() throws Exception {
assertBusy(() -> {
Response response = client().performRequest(new Request("GET", "_ingest/geoip/stats"));
String tasks = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
// The geoip downloader should be executed since a geoip processors is present in behavioral analytics default pipeline:
assertThat(tasks, Matchers.containsString("failed_downloads\":1"));
// The geoip downloader doesn't actually do anything since there are no geoip processors:
assertThat(tasks, Matchers.containsString("failed_downloads\":0"));
assertThat(tasks, Matchers.containsString("successful_downloads\":0"));
});
}
Expand Down