Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating a transport action for the CoordinationDiagnosticsService #87984

Merged
5 changes: 5 additions & 0 deletions docs/changelog/87984.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 87984
summary: Creating a transport action for the `CoordinationDiagnosticsService`
area: Health
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.coordination.ClusterFormationInfoAction;
import org.elasticsearch.action.admin.cluster.coordination.CoordinationDiagnosticsAction;
import org.elasticsearch.action.admin.cluster.coordination.MasterHistoryAction;
import org.elasticsearch.action.admin.cluster.desirednodes.DeleteDesiredNodesAction;
import org.elasticsearch.action.admin.cluster.desirednodes.GetDesiredNodesAction;
Expand Down Expand Up @@ -639,6 +640,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(AnalyzeIndexDiskUsageAction.INSTANCE, TransportAnalyzeIndexDiskUsageAction.class);
actions.register(FieldUsageStatsAction.INSTANCE, TransportFieldUsageAction.class);
actions.register(MasterHistoryAction.INSTANCE, MasterHistoryAction.TransportAction.class);
actions.register(CoordinationDiagnosticsAction.INSTANCE, CoordinationDiagnosticsAction.TransportAction.class);

// Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsResult;

/**
* This action exposes CoordinationDiagnosticsService#diagnoseMasterStability so that a node can get a remote node's view of
* coordination diagnostics (including master stability).
*/
public class CoordinationDiagnosticsAction extends ActionType<CoordinationDiagnosticsAction.Response> {

public static final CoordinationDiagnosticsAction INSTANCE = new CoordinationDiagnosticsAction();
public static final String NAME = "cluster:internal/coordination_diagnostics/info";

private CoordinationDiagnosticsAction() {
super(NAME, CoordinationDiagnosticsAction.Response::new);
}

public static class Request extends ActionRequest {
final boolean explain; // Non-private for testing

public Request(boolean explain) {
this.explain = explain;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public Request(StreamInput in) throws IOException {
super(in);
this.explain = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(explain);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return explain == ((Request) o).explain;
}

@Override
public int hashCode() {
return Objects.hashCode(explain);
}

}

public static class Response extends ActionResponse {

private final CoordinationDiagnosticsResult result;

public Response(StreamInput in) throws IOException {
super(in);
result = new CoordinationDiagnosticsResult(in);
}

public Response(CoordinationDiagnosticsResult result) {
this.result = result;
}

public CoordinationDiagnosticsResult getCoordinationDiagnosticsResult() {
return result;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
result.writeTo(out);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinationDiagnosticsAction.Response response = (CoordinationDiagnosticsAction.Response) o;
return result.equals(response.result);
}

@Override
public int hashCode() {
return Objects.hash(result);
}
}

/**
* This transport action calls CoordinationDiagnosticsService#diagnoseMasterStability
*/
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final CoordinationDiagnosticsService coordinationDiagnosticsService;

@Inject
public TransportAction(
TransportService transportService,
ActionFilters actionFilters,
CoordinationDiagnosticsService coordinationDiagnosticsService
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

) {
super(CoordinationDiagnosticsAction.NAME, transportService, actionFilters, CoordinationDiagnosticsAction.Request::new);
this.coordinationDiagnosticsService = coordinationDiagnosticsService;
}

@Override
protected void doExecute(Task task, CoordinationDiagnosticsAction.Request request, ActionListener<Response> listener) {
listener.onResponse(new Response(coordinationDiagnosticsService.diagnoseMasterStability(request.explain)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use ActionRunnable.wrap here to make sure that a potential exception from diagnoseMasterStability gets indeed propagated to listener.onFailure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's already handled for us in the parent class here right? https://github.com/elastic/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/action/support/TransportAction.java#L79. I'll confirm that that is where it is called from.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right. TIL. Thanks for looking into this!

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
Expand Down Expand Up @@ -412,13 +416,34 @@ public record CoordinationDiagnosticsResult(
CoordinationDiagnosticsStatus status,
String summary,
CoordinationDiagnosticsDetails details
) {}
) implements Writeable {

public enum CoordinationDiagnosticsStatus {
public CoordinationDiagnosticsResult(StreamInput in) throws IOException {
this(CoordinationDiagnosticsStatus.fromStreamInput(in), in.readString(), new CoordinationDiagnosticsDetails(in));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
status.writeTo(out);
out.writeString(summary);
details.writeTo(out);
}
}

public enum CoordinationDiagnosticsStatus implements Writeable {
GREEN,
UNKNOWN,
YELLOW,
RED;

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(this);
}

public static CoordinationDiagnosticsStatus fromStreamInput(StreamInput in) throws IOException {
return in.readEnum(CoordinationDiagnosticsStatus.class);
}
}

public record CoordinationDiagnosticsDetails(
Expand All @@ -427,7 +452,7 @@ public record CoordinationDiagnosticsDetails(
@Nullable String remoteExceptionMessage,
@Nullable String remoteExceptionStackTrace,
@Nullable String clusterFormationDescription
) {
) implements Writeable {

public CoordinationDiagnosticsDetails(DiscoveryNode currentMaster, List<DiscoveryNode> recentMasters) {
this(currentMaster, recentMasters, null, null, null);
Expand All @@ -437,6 +462,32 @@ public CoordinationDiagnosticsDetails(DiscoveryNode currentMaster, Exception rem
this(currentMaster, null, remoteException == null ? null : remoteException.getMessage(), getStackTrace(remoteException), null);
}

public CoordinationDiagnosticsDetails(StreamInput in) throws IOException {
this(readCurrentMaster(in), readRecentMasters(in), in.readOptionalString(), in.readOptionalString(), in.readOptionalString());
}

private static DiscoveryNode readCurrentMaster(StreamInput in) throws IOException {
boolean hasCurrentMaster = in.readBoolean();
DiscoveryNode currentMaster;
if (hasCurrentMaster) {
currentMaster = new DiscoveryNode(in);
} else {
currentMaster = null;
}
return currentMaster;
}

private static List<DiscoveryNode> readRecentMasters(StreamInput in) throws IOException {
boolean hasRecentMasters = in.readBoolean();
List<DiscoveryNode> recentMasters;
if (hasRecentMasters) {
recentMasters = in.readImmutableList(DiscoveryNode::new);
} else {
recentMasters = null;
}
return recentMasters;
}

private static String getStackTrace(Exception e) {
if (e == null) {
return null;
Expand All @@ -447,5 +498,25 @@ private static String getStackTrace(Exception e) {
}

public static final CoordinationDiagnosticsDetails EMPTY = new CoordinationDiagnosticsDetails(null, null, null, null, null);

@Override
public void writeTo(StreamOutput out) throws IOException {
if (currentMaster == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
currentMaster.writeTo(out);
}
if (recentMasters == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeList(recentMasters);
}
out.writeOptionalString(remoteExceptionMessage);
out.writeOptionalString(remoteExceptionStackTrace);
out.writeOptionalString(clusterFormationDescription);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.coordination;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;

import java.util.Collections;
import java.util.List;
import java.util.UUID;

import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsDetails;
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsResult;
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsStatus;

public class CoordinationDiagnosticsActionTests extends ESTestCase {

public void testSerialization() {
DiscoveryNode node1 = new DiscoveryNode(
"node1",
UUID.randomUUID().toString(),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
DiscoveryNodeRole.roles(),
Version.CURRENT
);
DiscoveryNode node2 = new DiscoveryNode(
"node2",
UUID.randomUUID().toString(),
buildNewFakeTransportAddress(),
Collections.emptyMap(),
DiscoveryNodeRole.roles(),
Version.CURRENT
);
CoordinationDiagnosticsDetails details = new CoordinationDiagnosticsDetails(
node1,
List.of(node1, node2),
randomAlphaOfLengthBetween(0, 30),
randomAlphaOfLengthBetween(0, 30),
randomAlphaOfLengthBetween(0, 30)
);
CoordinationDiagnosticsResult result = new CoordinationDiagnosticsResult(
randomFrom(CoordinationDiagnosticsStatus.values()),
randomAlphaOfLength(100),
details
);
CoordinationDiagnosticsAction.Response response = new CoordinationDiagnosticsAction.Response(result);
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
response,
history -> copyWriteable(history, writableRegistry(), CoordinationDiagnosticsAction.Response::new),
this::mutateResponse
);

CoordinationDiagnosticsAction.Request request = new CoordinationDiagnosticsAction.Request(randomBoolean());
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
request,
history -> copyWriteable(history, writableRegistry(), CoordinationDiagnosticsAction.Request::new),
this::mutateRequest
);
}

private CoordinationDiagnosticsAction.Request mutateRequest(CoordinationDiagnosticsAction.Request originalRequest) {
return new CoordinationDiagnosticsAction.Request(originalRequest.explain == false);
}

private CoordinationDiagnosticsAction.Response mutateResponse(CoordinationDiagnosticsAction.Response originalResponse) {
CoordinationDiagnosticsResult originalResult = originalResponse.getCoordinationDiagnosticsResult();
return new CoordinationDiagnosticsAction.Response(
new CoordinationDiagnosticsResult(originalResult.status(), randomAlphaOfLength(100), originalResult.details())
);
}
}
Loading