Skip to content

Commit

Permalink
[HealthAPI] Add support for the FEATURE_STATE affected resource (#92296)
Browse files Browse the repository at this point in the history
The `shards_availability` indicator diagnoses the condition where
indices need to be restored from snapshot.
Starting with 8.0 using feature_states when restoring from snapshot is
mandatory.

This adds support for the `FEATURE_STATE` affected resource to aid with
building up the snapshot restore API call (which will need to include
all the indices and feature states reported by the restore-from-snapshot
diagnosis).

Note that the health API will not report any indices that are part of a
feature state.
  • Loading branch information
andreidan authored Dec 20, 2022
1 parent f581f0b commit 9170113
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.health.HealthIndicatorResult;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.openjdk.jmh.annotations.Benchmark;
Expand All @@ -45,6 +46,7 @@

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -171,7 +173,7 @@ public void setUp() throws Exception {
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())
);
clusterService.getClusterApplierService().setInitialState(initialClusterState);
indicatorService = new ShardsAvailabilityHealthIndicatorService(clusterService, allocationService);
indicatorService = new ShardsAvailabilityHealthIndicatorService(clusterService, allocationService, new SystemIndices(List.of()));
}

private int toInt(String v) {
Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/92296.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92296
summary: "[HealthAPI] Add support for the FEATURE_STATE affected resource"
area: Health
type: feature
issues:
- 91353
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,21 @@ POST _snapshot/my_repository/snapshot-20200617/_restore
<1> The indices to restore.
+
<2> We also want to restore the aliases.
+
NOTE: If any <<feature-state,feature states>> need to be restored we'll need to specify them using the
`feature_states` field and the indices that belong to the feature states we restore must not be specified under `indices`.
The <<health-api, Health API>> returns both the `indices` and `feature_states` that need to be restored for the restore from snapshot diagnosis. e.g.:
+
[source,console]
----
POST _snapshot/my_repository/snapshot-20200617/_restore
{
"feature_states": [ "geoip" ],
"indices": "kibana_sample_data_flights,.ds-my-data-stream-2022.06.17-000001",
"include_aliases": true
}
----
// TEST[skip:illustration purposes only]
. Finally we can verify that the indices health is now `green` via the <<cat-indices,cat indices API>>.
+
Expand Down Expand Up @@ -430,6 +445,21 @@ POST _snapshot/my_repository/snapshot-20200617/_restore
<1> The indices to restore.
+
<2> We also want to restore the aliases.
+
NOTE: If any <<feature-state,feature states>> need to be restored we'll need to specify them using the
`feature_states` field and the indices that belong to the feature states we restore must not be specified under `indices`.
The <<health-api, Health API>> returns both the `indices` and `feature_states` that need to be restored for the restore from snapshot diagnosis. e.g.:
+
[source,console]
----
POST _snapshot/my_repository/snapshot-20200617/_restore
{
"feature_states": [ "geoip" ],
"indices": "kibana_sample_data_flights,.ds-my-data-stream-2022.06.17-000001",
"include_aliases": true
}
----
// TEST[skip:illustration purposes only]
. Finally we can verify that the indices health is now `green` via the <<cat-indices,cat indices API>>.
+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.health.ImpactArea;
import org.elasticsearch.health.SimpleHealthIndicatorDetails;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;

import java.util.ArrayList;
Expand All @@ -59,6 +60,8 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.cluster.health.ClusterShardHealth.getInactivePrimaryHealth;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING;
Expand All @@ -67,6 +70,7 @@
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING;
import static org.elasticsearch.health.Diagnosis.Resource.Type.FEATURE_STATE;
import static org.elasticsearch.health.Diagnosis.Resource.Type.INDEX;
import static org.elasticsearch.health.HealthStatus.GREEN;
import static org.elasticsearch.health.HealthStatus.RED;
Expand Down Expand Up @@ -96,9 +100,16 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
private final ClusterService clusterService;
private final AllocationService allocationService;

public ShardsAvailabilityHealthIndicatorService(ClusterService clusterService, AllocationService allocationService) {
private final SystemIndices systemIndices;

public ShardsAvailabilityHealthIndicatorService(
ClusterService clusterService,
AllocationService allocationService,
SystemIndices systemIndices
) {
this.clusterService = clusterService;
this.allocationService = allocationService;
this.systemIndices = systemIndices;
}

@Override
Expand Down Expand Up @@ -760,7 +771,7 @@ private Optional<Diagnosis.Definition> checkNotEnoughNodesInDataTier(
}
}

private class ShardAllocationStatus {
class ShardAllocationStatus {
private final ShardAllocationCounts primaries = new ShardAllocationCounts();
private final ShardAllocationCounts replicas = new ShardAllocationCounts();
private final Metadata clusterMetadata;
Expand Down Expand Up @@ -908,28 +919,108 @@ public List<Diagnosis> getDiagnosis(boolean verbose, int maxAffectedResourcesCou
if (diagnosisToAffectedIndices.isEmpty()) {
return List.of();
} else {
return diagnosisToAffectedIndices.entrySet()
.stream()
.map(
e -> new Diagnosis(
e.getKey(),
List.of(
new Diagnosis.Resource(
INDEX,
e.getValue()
.stream()
.sorted(indicesComparatorByPriorityAndName(clusterMetadata))
.limit(Math.min(e.getValue().size(), maxAffectedResourcesCount))
.collect(Collectors.toList())
)

return diagnosisToAffectedIndices.entrySet().stream().map(e -> {
List<Diagnosis.Resource> affectedResources = new ArrayList<>(1);
if (e.getKey().equals(ACTION_RESTORE_FROM_SNAPSHOT)) {
Set<String> restoreFromSnapshotIndices = e.getValue();
if (restoreFromSnapshotIndices != null && restoreFromSnapshotIndices.isEmpty() == false) {
affectedResources = getRestoreFromSnapshotAffectedResources(
clusterMetadata,
systemIndices,
restoreFromSnapshotIndices,
maxAffectedResourcesCount
);
}
} else {
affectedResources.add(
new Diagnosis.Resource(
INDEX,
e.getValue()
.stream()
.sorted(indicesComparatorByPriorityAndName(clusterMetadata))
.limit(Math.min(e.getValue().size(), maxAffectedResourcesCount))
.collect(Collectors.toList())
)
)
)
.collect(Collectors.toList());
);
}
return new Diagnosis(e.getKey(), affectedResources);
}).collect(Collectors.toList());
}
} else {
return List.of();
}
}

/**
* The restore from snapshot operation requires the user to specify indices and feature states.
* The indices that are part of the feature states must not be specified. This method loops through all the
* identified unassigned indices and returns the affected {@link Diagnosis.Resource}s of type `INDEX`
* and if applicable `FEATURE_STATE`
*/
static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
Metadata metadata,
SystemIndices systemIndices,
Set<String> restoreFromSnapshotIndices,
int maxAffectedResourcesCount
) {
List<Diagnosis.Resource> affectedResources = new ArrayList<>(2);

Set<String> affectedIndices = new HashSet<>(restoreFromSnapshotIndices);
Set<String> affectedFeatureStates = new HashSet<>();
Map<String, Set<String>> featureToSystemIndices = systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(metadata).stream())
.collect(toSet())
)
);

for (Map.Entry<String, Set<String>> featureToIndices : featureToSystemIndices.entrySet()) {
for (String featureIndex : featureToIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToIndices.getKey());
affectedIndices.remove(featureIndex);
}
}
}

Map<String, Set<String>> featureToDsBackingIndices = systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getDataStreamDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getBackingIndexNames(metadata).stream())
.collect(toSet())
)
);

// the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from
// the list of affected indices (the feature state will cover the restore of these indices too)
for (Map.Entry<String, Set<String>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
for (String featureIndex : featureToBackingIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToBackingIndices.getKey());
affectedIndices.remove(featureIndex);
}
}
}

if (affectedIndices.isEmpty() == false) {
affectedResources.add(new Diagnosis.Resource(INDEX, affectedIndices.stream().limit(maxAffectedResourcesCount).toList()));
}
if (affectedFeatureStates.isEmpty() == false) {
affectedResources.add(
new Diagnosis.Resource(FEATURE_STATE, affectedFeatureStates.stream().limit(maxAffectedResourcesCount).toList())
);
}
return affectedResources;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public enum Type {
INDEX("indices"),
NODE("nodes"),
SLM_POLICY("slm_policies"),
FEATURE_STATE("feature_states"),
SNAPSHOT_REPOSITORY("snapshot_repositories");

private final String displayValue;
Expand Down
13 changes: 10 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,13 @@ protected Node(
discoveryModule.getCoordinator(),
masterHistoryService
);
HealthService healthService = createHealthService(clusterService, clusterModule, coordinationDiagnosticsService, threadPool);
HealthService healthService = createHealthService(
clusterService,
clusterModule,
coordinationDiagnosticsService,
threadPool,
systemIndices
);
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
Expand Down Expand Up @@ -1199,15 +1205,16 @@ private HealthService createHealthService(
ClusterService clusterService,
ClusterModule clusterModule,
CoordinationDiagnosticsService coordinationDiagnosticsService,
ThreadPool threadPool
ThreadPool threadPool,
SystemIndices systemIndices
) {
List<HealthIndicatorService> preflightHealthIndicatorServices = Collections.singletonList(
new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService)
);
var serverHealthIndicatorServices = new ArrayList<>(
List.of(
new RepositoryIntegrityHealthIndicatorService(clusterService),
new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService())
new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService(), systemIndices)
)
);
serverHealthIndicatorServices.add(new DiskHealthIndicatorService(clusterService));
Expand Down
Loading

0 comments on commit 9170113

Please sign in to comment.