Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HealthAPI] Add support for the FEATURE_STATE affected resource #92296

Merged
merged 8 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.health.HealthIndicatorResult;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.openjdk.jmh.annotations.Benchmark;
Expand All @@ -45,6 +46,7 @@

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -171,7 +173,7 @@ public void setUp() throws Exception {
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())
);
clusterService.getClusterApplierService().setInitialState(initialClusterState);
indicatorService = new ShardsAvailabilityHealthIndicatorService(clusterService, allocationService);
indicatorService = new ShardsAvailabilityHealthIndicatorService(clusterService, allocationService, new SystemIndices(List.of()));
}

private int toInt(String v) {
Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/92296.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92296
summary: "[HealthAPI] Add support for the FEATURE_STATE affected resource"
area: Health
type: feature
issues:
- 91353
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ POST _snapshot/my_repository/snapshot-20200617/_restore
<1> The indices to restore.
+
<2> We also want to restore the aliases.
+
NOTE: If any <<feature-state,feature states>> need to be restored we'll need to specify them using the
`feature_states` field and the indices that belong to the feature states we restore must not be specified under `indices`.
The <<health-api, Health API>> returns both the `indices` and `feature_states` that need to be restored for the restore from snapshot diagnosis. e.g.:
+
Comment on lines +214 to +217
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's rather unfortunate we can't recommend something better here because there currently is no way of getting the system indices/data streams associated with a feature state ( #86307 )

[source,console]
----
POST _snapshot/my_repository/snapshot-20200617/_restore
{
"feature_states": [ "geoip" ],
"indices": "kibana_sample_data_flights,.ds-my-data-stream-2022.06.17-000001",
"include_aliases": true
}
----
// TEST[skip:illustration purposes only]

. Finally we can verify that the indices health is now `green` via the <<cat-indices,cat indices API>>.
+
Expand Down Expand Up @@ -430,6 +445,21 @@ POST _snapshot/my_repository/snapshot-20200617/_restore
<1> The indices to restore.
+
<2> We also want to restore the aliases.
+
NOTE: If any <<feature-state,feature states>> need to be restored we'll need to specify them using the
`feature_states` field and the indices that belong to the feature states we restore must not be specified under `indices`.
The <<health-api, Health API>> returns both the `indices` and `feature_states` that need to be restored for the restore from snapshot diagnosis. e.g.:
+
[source,console]
----
POST _snapshot/my_repository/snapshot-20200617/_restore
{
"feature_states": [ "geoip" ],
"indices": "kibana_sample_data_flights,.ds-my-data-stream-2022.06.17-000001",
"include_aliases": true
}
----
// TEST[skip:illustration purposes only]

. Finally we can verify that the indices health is now `green` via the <<cat-indices,cat indices API>>.
+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.health.ImpactArea;
import org.elasticsearch.health.SimpleHealthIndicatorDetails;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;

import java.util.ArrayList;
Expand All @@ -59,6 +60,8 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.cluster.health.ClusterShardHealth.getInactivePrimaryHealth;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING;
Expand All @@ -67,6 +70,7 @@
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.elasticsearch.health.Diagnosis.Resource.Type.FEATURE_STATE;
import static org.elasticsearch.health.Diagnosis.Resource.Type.INDEX;
import static org.elasticsearch.health.HealthStatus.GREEN;
import static org.elasticsearch.health.HealthStatus.RED;
Expand Down Expand Up @@ -96,9 +100,16 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
private final ClusterService clusterService;
private final AllocationService allocationService;

public ShardsAvailabilityHealthIndicatorService(ClusterService clusterService, AllocationService allocationService) {
private final SystemIndices systemIndices;

public ShardsAvailabilityHealthIndicatorService(
ClusterService clusterService,
AllocationService allocationService,
SystemIndices systemIndices
) {
this.clusterService = clusterService;
this.allocationService = allocationService;
this.systemIndices = systemIndices;
}

@Override
Expand Down Expand Up @@ -760,7 +771,7 @@ private Optional<Diagnosis.Definition> checkNotEnoughNodesInDataTier(
}
}

private class ShardAllocationStatus {
class ShardAllocationStatus {
private final ShardAllocationCounts primaries = new ShardAllocationCounts();
private final ShardAllocationCounts replicas = new ShardAllocationCounts();
private final Metadata clusterMetadata;
Expand Down Expand Up @@ -908,28 +919,108 @@ public List<Diagnosis> getDiagnosis(boolean verbose, int maxAffectedResourcesCou
if (diagnosisToAffectedIndices.isEmpty()) {
return List.of();
} else {
return diagnosisToAffectedIndices.entrySet()
.stream()
.map(
e -> new Diagnosis(
e.getKey(),
List.of(
new Diagnosis.Resource(
INDEX,
e.getValue()
.stream()
.sorted(indicesComparatorByPriorityAndName(clusterMetadata))
.limit(Math.min(e.getValue().size(), maxAffectedResourcesCount))
.collect(Collectors.toList())
)

return diagnosisToAffectedIndices.entrySet().stream().map(e -> {
List<Diagnosis.Resource> affectedResources = new ArrayList<>(1);
if (e.getKey().equals(ACTION_RESTORE_FROM_SNAPSHOT)) {
Set<String> restoreFromSnapshotIndices = e.getValue();
if (restoreFromSnapshotIndices != null && restoreFromSnapshotIndices.isEmpty() == false) {
affectedResources = getRestoreFromSnapshotAffectedResources(
clusterMetadata,
systemIndices,
restoreFromSnapshotIndices,
maxAffectedResourcesCount
);
}
} else {
affectedResources.add(
new Diagnosis.Resource(
INDEX,
e.getValue()
.stream()
.sorted(indicesComparatorByPriorityAndName(clusterMetadata))
.limit(Math.min(e.getValue().size(), maxAffectedResourcesCount))
.collect(Collectors.toList())
)
)
)
.collect(Collectors.toList());
);
}
return new Diagnosis(e.getKey(), affectedResources);
}).collect(Collectors.toList());
}
} else {
return List.of();
}
}

/**
* The restore from snapshot operation requires the user to specify indices and feature states.
* The indices that are part of the feature states must not be specified. This method loops through all the
* identified unassigned indices and returns the affected {@link Diagnosis.Resource}s of type `INDEX`
* and if applicable `FEATURE_STATE`
*/
static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
Metadata metadata,
SystemIndices systemIndices,
Set<String> restoreFromSnapshotIndices,
int maxAffectedResourcesCount
) {
List<Diagnosis.Resource> affectedResources = new ArrayList<>(2);

Set<String> affectedIndices = new HashSet<>(restoreFromSnapshotIndices);
Set<String> affectedFeatureStates = new HashSet<>();
Map<String, Set<String>> featureToSystemIndices = systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(metadata).stream())
.collect(toSet())
)
);

for (Map.Entry<String, Set<String>> featureToIndices : featureToSystemIndices.entrySet()) {
for (String featureIndex : featureToIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToIndices.getKey());
affectedIndices.remove(featureIndex);
}
}
}

Map<String, Set<String>> featureToDsBackingIndices = systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getDataStreamDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getBackingIndexNames(metadata).stream())
.collect(toSet())
)
);

// the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from
// the list of affected indices (the feature state will cover the restore of these indices too)
for (Map.Entry<String, Set<String>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
for (String featureIndex : featureToBackingIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToBackingIndices.getKey());
affectedIndices.remove(featureIndex);
}
}
}

if (affectedIndices.isEmpty() == false) {
affectedResources.add(new Diagnosis.Resource(INDEX, affectedIndices.stream().limit(maxAffectedResourcesCount).toList()));
}
if (affectedFeatureStates.isEmpty() == false) {
affectedResources.add(
new Diagnosis.Resource(FEATURE_STATE, affectedFeatureStates.stream().limit(maxAffectedResourcesCount).toList())
);
}
return affectedResources;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public enum Type {
INDEX("indices"),
NODE("nodes"),
SLM_POLICY("slm_policies"),
FEATURE_STATE("feature_states"),
SNAPSHOT_REPOSITORY("snapshot_repositories");

private final String displayValue;
Expand Down
13 changes: 10 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,13 @@ protected Node(
discoveryModule.getCoordinator(),
masterHistoryService
);
HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService, threadPool);
HealthService healthService = createHealthService(
clusterService,
clusterModule,
coordinationDiagnosticsService,
threadPool,
systemIndices
);
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
Expand Down Expand Up @@ -1199,15 +1205,16 @@ private HealthService createHealthService(
ClusterService clusterService,
ClusterModule clusterModule,
CoordinationDiagnosticsService coordinationDiagnosticsService,
ThreadPool threadPool
ThreadPool threadPool,
SystemIndices systemIndices
) {
List<HealthIndicatorService> preflightHealthIndicatorServices = Collections.singletonList(
new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService)
);
var serverHealthIndicatorServices = new ArrayList<>(
List.of(
new RepositoryIntegrityHealthIndicatorService(clusterService),
new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService())
new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService(), systemIndices)
)
);
serverHealthIndicatorServices.add(new DiskHealthIndicatorService(clusterService));
Expand Down
Loading