Skip to content

Commit

Permalink
Update EsqlStatsAction to use a historical feature (elastic#101530)
Browse files Browse the repository at this point in the history
  • Loading branch information
thecoop authored Oct 30, 2023
1 parent fd11123 commit 057e36f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.Version;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;

import java.util.Map;

public class EsqlFeatures implements FeatureSpecification {
@Override
public Map<NodeFeature, Version> getHistoricalFeatures() {
return Map.of(TransportEsqlStatsAction.ESQL_STATS_FEATURE, Version.V_8_11_0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
Expand All @@ -15,13 +14,14 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.execution.PlanExecutor;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
Expand All @@ -33,13 +33,17 @@ public class TransportEsqlStatsAction extends TransportNodesAction<
EsqlStatsRequest.NodeStatsRequest,
EsqlStatsResponse.NodeStatsResponse> {

static final NodeFeature ESQL_STATS_FEATURE = new NodeFeature("esql.stats_node");

// the plan executor holds the metrics
private final FeatureService featureService;
private final PlanExecutor planExecutor;

@Inject
public TransportEsqlStatsAction(
TransportService transportService,
ClusterService clusterService,
FeatureService featureService,
ThreadPool threadPool,
ActionFilters actionFilters,
PlanExecutor planExecutor
Expand All @@ -52,17 +56,19 @@ public TransportEsqlStatsAction(
EsqlStatsRequest.NodeStatsRequest::new,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.featureService = featureService;
this.planExecutor = planExecutor;
}

@Override
protected void resolveRequest(EsqlStatsRequest request, ClusterState clusterState) {
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
DiscoveryNode[] concreteNodes = Arrays.stream(nodesIds)
.map(clusterState.nodes()::get)
.filter(n -> n.getVersion().onOrAfter(Version.V_8_11_0))
.toArray(DiscoveryNode[]::new);
request.setConcreteNodes(concreteNodes);
if (featureService.clusterHasFeature(clusterState, ESQL_STATS_FEATURE)) {
// use the whole cluster
super.resolveRequest(request, clusterState);
} else {
// not all nodes in the cluster have upgraded to esql - just use this node for now
request.setConcreteNodes(new DiscoveryNode[] { clusterService.localNode() });
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
#

org.elasticsearch.xpack.esql.plugin.EsqlFeatures

0 comments on commit 057e36f

Please sign in to comment.