Skip to content

Commit

Permalink
TestGrokPatternAction is local-only
Browse files Browse the repository at this point in the history
Misc tidy-up following elastic#104394:

- This action only runs on the coordinating node, no need to define wire
  serialization for its request/response types.

- No need to subclass `ActionType`, nor to define how to receive
  responses from remote clusters.

- Moves to executing an `AbstractRunnable` to be sure to handle all
  failures (including threadpool rejections) properly.
  • Loading branch information
DaveCTurner committed Jan 24, 2024
1 parent 0aeb9be commit 71d0b52
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.grok.GrokCaptureExtracter;
Expand All @@ -28,14 +28,11 @@
import java.util.Map;
import java.util.Objects;

public class TestGrokPatternAction extends ActionType<TestGrokPatternAction.Response> {
public class TestGrokPatternAction {

public static final TestGrokPatternAction INSTANCE = new TestGrokPatternAction();
public static final String NAME = "cluster:monitor/text_structure/test_grok_pattern";

private TestGrokPatternAction() {
super(NAME, Response::new);
}
public static final ActionType<TestGrokPatternAction.Response> INSTANCE = ActionType.localOnly(
"cluster:monitor/text_structure/test_grok_pattern"
);

public static class Request extends ActionRequest {

Expand Down Expand Up @@ -87,23 +84,13 @@ private Request(String grokPattern, List<String> text, String ecsCompatibility)
this.ecsCompatibility = ecsCompatibility;
}

public Request(StreamInput in) throws IOException {
super(in);
grokPattern = in.readString();
text = in.readStringCollectionAsList();
ecsCompatibility = in.readOptionalString();
}

public static Request parseRequest(String ecsCompatibility, XContentParser parser) throws IOException {
return PARSER.parse(parser, null).ecsCompatibility(ecsCompatibility).build();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(grokPattern);
out.writeStringCollection(text);
out.writeOptionalString(ecsCompatibility);
TransportAction.localOnly();
}

public String getGrokPattern() {
Expand Down Expand Up @@ -150,11 +137,6 @@ public Response(List<Map<String, Object>> ranges) {
this.ranges = ranges;
}

public Response(StreamInput in) throws IOException {
super(in);
ranges = in.readCollectionAsList(StreamInput::readGenericMap);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -193,7 +175,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeGenericList(ranges, StreamOutput::writeGenericMap);
TransportAction.localOnly();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.grok.GrokBuiltinPatterns;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.xpack.core.textstructure.action.TestGrokPatternAction;

import java.util.ArrayList;
Expand All @@ -27,38 +28,26 @@

import static org.elasticsearch.grok.GrokBuiltinPatterns.ECS_COMPATIBILITY_V1;

public class TransportTestGrokPatternAction extends HandledTransportAction<TestGrokPatternAction.Request, TestGrokPatternAction.Response> {
public class TransportTestGrokPatternAction extends TransportAction<TestGrokPatternAction.Request, TestGrokPatternAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportTestGrokPatternAction.class);

private final ThreadPool threadPool;

@Inject
public TransportTestGrokPatternAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool) {
super(
TestGrokPatternAction.NAME,
transportService,
actionFilters,
TestGrokPatternAction.Request::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
super(TestGrokPatternAction.INSTANCE.name(), actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
}

@Override
protected void doExecute(Task task, TestGrokPatternAction.Request request, ActionListener<TestGrokPatternAction.Response> listener) {
// As matching a regular expression might take a while, we run
// in a different thread to avoid blocking the network thread.
threadPool.generic().execute(() -> {
try {
listener.onResponse(getResponse(request));
} catch (Exception e) {
listener.onFailure(e);
}
});
// As matching a regular expression might take a while, we run in a different thread to avoid blocking the network thread.
threadPool.generic().execute(ActionRunnable.supply(listener, () -> getResponse(request)));
}

private TestGrokPatternAction.Response getResponse(TestGrokPatternAction.Request request) {
assert Transports.assertNotTransportThread("matching regexes is too expensive for a network thread");
boolean ecsCompatibility = ECS_COMPATIBILITY_V1.equals(request.getEcsCompatibility());
Grok grok = new Grok(GrokBuiltinPatterns.get(ecsCompatibility), request.getGrokPattern(), logger::debug);
List<Map<String, Object>> ranges = new ArrayList<>();
Expand Down

0 comments on commit 71d0b52

Please sign in to comment.