-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Creating a transport action for the CoordinationDiagnosticsService #87984
Changes from 8 commits
10562a6
8445bb8
3f8d178
9ca91d0
7832478
93fba83
a61999e
c061af9
584086a
a391d38
fa3aa47
8241dd7
beb0451
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pr: 87984 | ||
summary: Creating a transport action for the `CoordinationDiagnosticsService` | ||
area: Health | ||
type: enhancement | ||
issues: [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.admin.cluster.coordination; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionRequest; | ||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.action.ActionResponse; | ||
import org.elasticsearch.action.ActionType; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
import org.elasticsearch.action.support.HandledTransportAction; | ||
import org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService; | ||
import org.elasticsearch.cluster.coordination.Coordinator; | ||
import org.elasticsearch.cluster.coordination.MasterHistoryService; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsResult; | ||
|
||
/** | ||
* This action exposes CoordinationDiagnosticsService.diagnoseMasterStability() so that a node can get a remote node's view of | ||
* coordination diagnostics (including master stability). | ||
*/ | ||
public class CoordinationDiagnosticsAction extends ActionType<CoordinationDiagnosticsAction.Response> { | ||
|
||
public static final CoordinationDiagnosticsAction INSTANCE = new CoordinationDiagnosticsAction(); | ||
public static final String NAME = "cluster:internal/coordination_diagnostics/info"; | ||
|
||
private CoordinationDiagnosticsAction() { | ||
super(NAME, CoordinationDiagnosticsAction.Response::new); | ||
} | ||
|
||
public static class Request extends ActionRequest { | ||
final boolean explain; // Non-private for testing | ||
|
||
public Request(boolean explain) { | ||
this.explain = explain; | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
return null; | ||
} | ||
|
||
public Request(StreamInput in) throws IOException { | ||
super(in); | ||
this.explain = in.readBoolean(); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeBoolean(explain); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
return explain == ((Request) o).explain; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hashCode(explain); | ||
} | ||
|
||
} | ||
|
||
public static class Response extends ActionResponse { | ||
|
||
private final CoordinationDiagnosticsResult result; | ||
|
||
public Response(StreamInput in) throws IOException { | ||
super(in); | ||
result = new CoordinationDiagnosticsResult(in); | ||
} | ||
|
||
public Response(CoordinationDiagnosticsResult result) { | ||
this.result = result; | ||
} | ||
|
||
public CoordinationDiagnosticsResult getCoordinationDiagnosticsResult() { | ||
return result; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
result.writeTo(out); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
CoordinationDiagnosticsAction.Response response = (CoordinationDiagnosticsAction.Response) o; | ||
return result.equals(response.result); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(result); | ||
} | ||
} | ||
|
||
/** | ||
* This transport action calls the CoordinationDiagnosticsService on a remote node. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: This doc is rather misleading to me. The transport action exposes the functionality via the transport service (as opposed to calling a service on a remote node) |
||
*/ | ||
public static class TransportAction extends HandledTransportAction<Request, Response> { | ||
private final ClusterService clusterService; | ||
private final Coordinator coordinator; | ||
private final MasterHistoryService masterHistoryService; | ||
|
||
@Inject | ||
public TransportAction( | ||
ClusterService clusterService, | ||
TransportService transportService, | ||
ActionFilters actionFilters, | ||
Coordinator coordinator, | ||
MasterHistoryService masterHistoryService | ||
) { | ||
super(CoordinationDiagnosticsAction.NAME, transportService, actionFilters, CoordinationDiagnosticsAction.Request::new); | ||
this.clusterService = clusterService; | ||
this.coordinator = coordinator; | ||
this.masterHistoryService = masterHistoryService; | ||
} | ||
|
||
@Override | ||
protected void doExecute(Task task, CoordinationDiagnosticsAction.Request request, ActionListener<Response> listener) { | ||
listener.onResponse( | ||
new Response( | ||
new CoordinationDiagnosticsService(clusterService, coordinator, masterHistoryService).diagnoseMasterStability( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we inject the service or create it once in the constructor? are there thread-safety concerns if we do so? (I'd think there shouldn't be as it's only reading things here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think you're right -- I don't think there would be any thread-safety problems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And on second look, I'm not sure why I wasn't injecting that in the first place. |
||
request.explain | ||
) | ||
) | ||
); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,10 +14,14 @@ | |
import org.elasticsearch.cluster.ClusterStateListener; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.core.TimeValue; | ||
|
||
import java.io.IOException; | ||
import java.io.PrintWriter; | ||
import java.io.StringWriter; | ||
import java.util.Collection; | ||
|
@@ -412,13 +416,47 @@ public record CoordinationDiagnosticsResult( | |
CoordinationDiagnosticsStatus status, | ||
String summary, | ||
CoordinationDiagnosticsDetails details | ||
) {} | ||
) implements Writeable { | ||
|
||
public enum CoordinationDiagnosticsStatus { | ||
GREEN, | ||
UNKNOWN, | ||
YELLOW, | ||
RED; | ||
public CoordinationDiagnosticsResult(StreamInput in) throws IOException { | ||
this(CoordinationDiagnosticsStatus.fromStreamInput(in), in.readString(), new CoordinationDiagnosticsDetails(in)); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
status.writeTo(out); | ||
out.writeString(summary); | ||
details.writeTo(out); | ||
} | ||
} | ||
|
||
public enum CoordinationDiagnosticsStatus implements Writeable { | ||
GREEN((byte) 0), | ||
UNKNOWN((byte) 1), | ||
YELLOW((byte) 2), | ||
RED((byte) 3); | ||
|
||
private final byte value; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this Enums have ordinals already. Let's remove the What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh. Somehow I didn't realize that those methods existed. Switching to use them now. |
||
|
||
CoordinationDiagnosticsStatus(byte value) { | ||
this.value = value; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeByte(value); | ||
} | ||
|
||
public static CoordinationDiagnosticsStatus fromStreamInput(StreamInput in) throws IOException { | ||
byte value = in.readByte(); | ||
return switch (value) { | ||
case 0 -> GREEN; | ||
case 1 -> UNKNOWN; | ||
case 2 -> YELLOW; | ||
case 3 -> RED; | ||
default -> UNKNOWN; | ||
}; | ||
} | ||
} | ||
|
||
public record CoordinationDiagnosticsDetails( | ||
|
@@ -427,7 +465,7 @@ public record CoordinationDiagnosticsDetails( | |
@Nullable String remoteExceptionMessage, | ||
@Nullable String remoteExceptionStackTrace, | ||
@Nullable String clusterFormationDescription | ||
) { | ||
) implements Writeable { | ||
|
||
public CoordinationDiagnosticsDetails(DiscoveryNode currentMaster, List<DiscoveryNode> recentMasters) { | ||
this(currentMaster, recentMasters, null, null, null); | ||
|
@@ -437,6 +475,32 @@ public CoordinationDiagnosticsDetails(DiscoveryNode currentMaster, Exception rem | |
this(currentMaster, null, remoteException == null ? null : remoteException.getMessage(), getStackTrace(remoteException), null); | ||
} | ||
|
||
public CoordinationDiagnosticsDetails(StreamInput in) throws IOException { | ||
this(readCurrentMaster(in), readRecentMasters(in), in.readOptionalString(), in.readOptionalString(), in.readOptionalString()); | ||
} | ||
|
||
private static DiscoveryNode readCurrentMaster(StreamInput in) throws IOException { | ||
boolean hasCurrentMaster = in.readBoolean(); | ||
DiscoveryNode currentMaster; | ||
if (hasCurrentMaster) { | ||
currentMaster = new DiscoveryNode(in); | ||
} else { | ||
currentMaster = null; | ||
} | ||
return currentMaster; | ||
} | ||
|
||
private static List<DiscoveryNode> readRecentMasters(StreamInput in) throws IOException { | ||
boolean hasRecentMasters = in.readBoolean(); | ||
List<DiscoveryNode> recentMasters; | ||
if (hasRecentMasters) { | ||
recentMasters = in.readImmutableList(DiscoveryNode::new); | ||
} else { | ||
recentMasters = null; | ||
} | ||
return recentMasters; | ||
} | ||
|
||
private static String getStackTrace(Exception e) { | ||
if (e == null) { | ||
return null; | ||
|
@@ -447,5 +511,25 @@ private static String getStackTrace(Exception e) { | |
} | ||
|
||
public static final CoordinationDiagnosticsDetails EMPTY = new CoordinationDiagnosticsDetails(null, null, null, null, null); | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
if (currentMaster == null) { | ||
out.writeBoolean(false); | ||
} else { | ||
out.writeBoolean(true); | ||
currentMaster.writeTo(out); | ||
} | ||
if (recentMasters == null) { | ||
out.writeBoolean(false); | ||
} else { | ||
out.writeBoolean(true); | ||
out.writeList(recentMasters); | ||
} | ||
out.writeOptionalString(remoteExceptionMessage); | ||
out.writeOptionalString(remoteExceptionStackTrace); | ||
out.writeOptionalString(clusterFormationDescription); | ||
} | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.admin.cluster.coordination; | ||
|
||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodeRole; | ||
import org.elasticsearch.test.ESTestCase; | ||
import org.elasticsearch.test.EqualsHashCodeTestUtils; | ||
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.UUID; | ||
|
||
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsDetails; | ||
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsResult; | ||
import static org.elasticsearch.cluster.coordination.CoordinationDiagnosticsService.CoordinationDiagnosticsStatus; | ||
|
||
public class CoordinationDiagnosticsActionTests extends ESTestCase { | ||
|
||
public void testSerialization() { | ||
DiscoveryNode node1 = new DiscoveryNode( | ||
"node1", | ||
UUID.randomUUID().toString(), | ||
buildNewFakeTransportAddress(), | ||
Collections.emptyMap(), | ||
DiscoveryNodeRole.roles(), | ||
Version.CURRENT | ||
); | ||
DiscoveryNode node2 = new DiscoveryNode( | ||
"node2", | ||
UUID.randomUUID().toString(), | ||
buildNewFakeTransportAddress(), | ||
Collections.emptyMap(), | ||
DiscoveryNodeRole.roles(), | ||
Version.CURRENT | ||
); | ||
CoordinationDiagnosticsDetails details = new CoordinationDiagnosticsDetails( | ||
node1, | ||
List.of(node1, node2), | ||
randomAlphaOfLengthBetween(0, 30), | ||
randomAlphaOfLengthBetween(0, 30), | ||
randomAlphaOfLengthBetween(0, 30) | ||
); | ||
CoordinationDiagnosticsResult result = new CoordinationDiagnosticsResult( | ||
randomFrom(CoordinationDiagnosticsStatus.values()), | ||
randomAlphaOfLength(100), | ||
details | ||
); | ||
CoordinationDiagnosticsAction.Response response = new CoordinationDiagnosticsAction.Response(result); | ||
EqualsHashCodeTestUtils.checkEqualsAndHashCode( | ||
response, | ||
history -> copyWriteable(history, writableRegistry(), CoordinationDiagnosticsAction.Response::new), | ||
this::mutateResponse | ||
); | ||
|
||
CoordinationDiagnosticsAction.Request request = new CoordinationDiagnosticsAction.Request(randomBoolean()); | ||
EqualsHashCodeTestUtils.checkEqualsAndHashCode( | ||
request, | ||
history -> copyWriteable(history, writableRegistry(), CoordinationDiagnosticsAction.Request::new), | ||
this::mutateRequest | ||
); | ||
} | ||
|
||
private CoordinationDiagnosticsAction.Request mutateRequest(CoordinationDiagnosticsAction.Request originalRequest) { | ||
return new CoordinationDiagnosticsAction.Request(originalRequest.explain == false); | ||
} | ||
|
||
private CoordinationDiagnosticsAction.Response mutateResponse(CoordinationDiagnosticsAction.Response originalResponse) { | ||
CoordinationDiagnosticsResult originalResult = originalResponse.getCoordinationDiagnosticsResult(); | ||
return new CoordinationDiagnosticsAction.Response( | ||
new CoordinationDiagnosticsResult(originalResult.status(), randomAlphaOfLength(100), originalResult.details()) | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: {@link CoordinationDiagnosticsService#diagnoseMasterStability}