Skip to content

Commit

Permalink
Creating a transport action for the CoordinationDiagnosticsService (e…
Browse files Browse the repository at this point in the history
…lastic#87984)

This exposes the CoordinationDiagnosticsService (elastic#87672) through a transport action so that it can
be called remotely as part of the health API in the event that: (1) there has been no master recently,
(2) there are master-eligible nodes in the cluster, (3) none are elected, and (4) the current node is
not master eligible.
  • Loading branch information
masseyke authored Jun 29, 2022
1 parent 81815f0 commit cd0d5c8
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/87984.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87984
summary: Creating a transport action for the `CoordinationDiagnosticsService`
area: Health
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction;
import org.elasticsearch.action.admin.cluster.coordination.MasterHistoryAction;
import org.elasticsearch.action.admin.cluster.desirednodes.DeleteDesiredNodesAction;
import org.elasticsearch.action.admin.cluster.desirednodes.GetDesiredNodesAction;
Expand Down Expand Up @@ -639,6 +640,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(AnalyzeIndexDiskUsageAction.INSTANCE, TransportAnalyzeIndexDiskUsageAction.class);
actions.register(FieldUsageStatsAction.INSTANCE, TransportFieldUsageAction.class);
actions.register(MasterHistoryAction.INSTANCE, MasterHistoryAction.TransportAction.class);
actions.register(CoordinationDiagnosticsAction.INSTANCE, CoordinationDiagnosticsAction.TransportAction.class);

// Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsResult;

/**
* This action exposes CoordinationDiagnosticsService#diagnoseMasterStability so that a node can get a remote node's view of
* coordination diagnostics (including master stability).
*/
public class CoordinationDiagnosticsAction extends ActionType<CoordinationDiagnosticsAction.Response> {

public static final CoordinationDiagnosticsAction INSTANCE = new CoordinationDiagnosticsAction();
public static final String NAME = "cluster:internal/coordination_diagnostics/info";

private CoordinationDiagnosticsAction() {
super(NAME, CoordinationDiagnosticsAction.Response::new);
}

public static class Request extends ActionRequest {
final boolean explain; // Non-private for testing

public Request(boolean explain) {
this.explain = explain;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public Request(StreamInput in) throws IOException {
super(in);
this.explain = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(explain);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return explain == ((Request) o).explain;
}

@Override
public int hashCode() {
return Objects.hashCode(explain);
}

}

public static class Response extends ActionResponse {

private final CoordinationDiagnosticsResult result;

public Response(StreamInput in) throws IOException {
super(in);
result = new CoordinationDiagnosticsResult(in);
}

public Response(CoordinationDiagnosticsResult result) {
this.result = result;
}

public CoordinationDiagnosticsResult getCoordinationDiagnosticsResult() {
return result;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
result.writeTo(out);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinationDiagnosticsAction.Response response = (CoordinationDiagnosticsAction.Response) o;
return result.equals(response.result);
}

@Override
public int hashCode() {
return Objects.hash(result);
}
}

/**
* This transport action calls CoordinationDiagnosticsService#diagnoseMasterStability
*/
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final CoordinationDiagnosticsService coordinationDiagnosticsService;

@Inject
public TransportAction(
TransportService transportService,
ActionFilters actionFilters,
CoordinationDiagnosticsService coordinationDiagnosticsService
) {
super(CoordinationDiagnosticsAction.NAME, transportService, actionFilters, CoordinationDiagnosticsAction.Request::new);
this.coordinationDiagnosticsService = coordinationDiagnosticsService;
}

@Override
protected void doExecute(Task task, CoordinationDiagnosticsAction.Request request, ActionListener<Response> listener) {
listener.onResponse(new Response(coordinationDiagnosticsService.diagnoseMasterStability(request.explain)));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
Expand Down Expand Up @@ -412,13 +416,34 @@ public record CoordinationDiagnosticsResult(
CoordinationDiagnosticsStatus status,
String summary,
CoordinationDiagnosticsDetails details
) {}
) implements Writeable {

public enum CoordinationDiagnosticsStatus {
public CoordinationDiagnosticsResult(StreamInput in) throws IOException {
this(CoordinationDiagnosticsStatus.fromStreamInput(in), in.readString(), new CoordinationDiagnosticsDetails(in));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
status.writeTo(out);
out.writeString(summary);
details.writeTo(out);
}
}

public enum CoordinationDiagnosticsStatus implements Writeable {
GREEN,
UNKNOWN,
YELLOW,
RED;

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(this);
}

public static CoordinationDiagnosticsStatus fromStreamInput(StreamInput in) throws IOException {
return in.readEnum(CoordinationDiagnosticsStatus.class);
}
}

public record CoordinationDiagnosticsDetails(
Expand All @@ -427,7 +452,7 @@ public record CoordinationDiagnosticsDetails(
@Nullable String remoteExceptionMessage,
@Nullable String remoteExceptionStackTrace,
@Nullable String clusterFormationDescription
) {
) implements Writeable {

public CoordinationDiagnosticsDetails(DiscoveryNode currentMaster, List<DiscoveryNode> recentMasters) {
this(currentMaster, recentMasters, null, null, null);
Expand All @@ -437,6 +462,32 @@ public CoordinationDiagnosticsDetails(DiscoveryNode currentMaster, Exception rem
this(currentMaster, null, remoteException == null ? null : remoteException.getMessage(), getStackTrace(remoteException), null);
}

public CoordinationDiagnosticsDetails(StreamInput in) throws IOException {
this(readCurrentMaster(in), readRecentMasters(in), in.readOptionalString(), in.readOptionalString(), in.readOptionalString());
}

private static DiscoveryNode readCurrentMaster(StreamInput in) throws IOException {
boolean hasCurrentMaster = in.readBoolean();
DiscoveryNode currentMaster;
if (hasCurrentMaster) {
currentMaster = new DiscoveryNode(in);
} else {
currentMaster = null;
}
return currentMaster;
}

private static List<DiscoveryNode> readRecentMasters(StreamInput in) throws IOException {
boolean hasRecentMasters = in.readBoolean();
List<DiscoveryNode> recentMasters;
if (hasRecentMasters) {
recentMasters = in.readImmutableList(DiscoveryNode::new);
} else {
recentMasters = null;
}
return recentMasters;
}

private static String getStackTrace(Exception e) {
if (e == null) {
return null;
Expand All @@ -447,5 +498,25 @@ private static String getStackTrace(Exception e) {
}

public static final CoordinationDiagnosticsDetails EMPTY = new CoordinationDiagnosticsDetails(null, null, null, null, null);

@Override
public void writeTo(StreamOutput out) throws IOException {
if (currentMaster == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
currentMaster.writeTo(out);
}
if (recentMasters == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeList(recentMasters);
}
out.writeOptionalString(remoteExceptionMessage);
out.writeOptionalString(remoteExceptionStackTrace);
out.writeOptionalString(clusterFormationDescription);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsDetails;
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsResult;
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsStatus;

public class CoordinationDiagnosticsActionTests extends ESTestCase {

public void testSerialization() {
DiscoveryNode node1 = new DiscoveryNode(
"node1",
UUID.randomUUID().toString(),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
DiscoveryNodeRole.roles(),
Version.CURRENT
);
DiscoveryNode node2 = new DiscoveryNode(
"node2",
UUID.randomUUID().toString(),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
DiscoveryNodeRole.roles(),
Version.CURRENT
);
CoordinationDiagnosticsDetails details = new CoordinationDiagnosticsDetails(
node1,
List.of(node1, node2),
randomAlphaOfLengthBetween(0, 30),
randomAlphaOfLengthBetween(0, 30),
randomAlphaOfLengthBetween(0, 30)
);
CoordinationDiagnosticsResult result = new CoordinationDiagnosticsResult(
randomFrom(CoordinationDiagnosticsStatus.values()),
randomAlphaOfLength(100),
details
);
CoordinationDiagnosticsAction.Response response = new CoordinationDiagnosticsAction.Response(result);
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
response,
history -> copyWriteable(history, writableRegistry(), CoordinationDiagnosticsAction.Response::new),
this::mutateResponse
);

CoordinationDiagnosticsAction.Request request = new CoordinationDiagnosticsAction.Request(randomBoolean());
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
request,
history -> copyWriteable(history, writableRegistry(), CoordinationDiagnosticsAction.Request::new),
this::mutateRequest
);
}

private CoordinationDiagnosticsAction.Request mutateRequest(CoordinationDiagnosticsAction.Request originalRequest) {
return new CoordinationDiagnosticsAction.Request(originalRequest.explain == false);
}

private CoordinationDiagnosticsAction.Response mutateResponse(CoordinationDiagnosticsAction.Response originalResponse) {
CoordinationDiagnosticsResult originalResult = originalResponse.getCoordinationDiagnosticsResult();
return new CoordinationDiagnosticsAction.Response(
new CoordinationDiagnosticsResult(originalResult.status(), randomAlphaOfLength(100), originalResult.details())
);
}
}
Loading

0 comments on commit cd0d5c8

Please sign in to comment.