Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add get ingest pipeline API #30847

Merged
merged 7 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;

Expand Down Expand Up @@ -87,4 +89,26 @@ public void putPipelineAsync(PutPipelineRequest request, ActionListener<PutPipel
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::putPipeline,
PutPipelineResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public GetPipelineResponse getPipeline(GetPipelineRequest request, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, emptySet(), headers);
}

/**
* Asynchronously get an existing pipeline
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/get-pipeline-api.html"> Get Pipeline API on elastic.co</a>
*/
public void getPipelineAsync(GetPipelineRequest request, ActionListener<GetPipelineResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::getPipeline,
GetPipelineResponse::fromXContent, listener, emptySet(), headers);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Along the lines of the other comment I left on #30793 , this API should be added under the ingest namespace rather than the cluster one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to pick up moving to a new namespace after this and #30865 have been merged. Is this okay? Or do I need to bundle it with this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure fine with me

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -610,6 +611,18 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
return request;
}

static Request getPipeline(GetPipelineRequest getPipelineRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

Params parameters = new Params(request);
parameters.withMasterTimeout(getPipelineRequest.masterNodeTimeout());
return request;
}

static Request putPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ingest/pipeline")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand All @@ -32,7 +34,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
Expand Down Expand Up @@ -113,31 +115,7 @@ public void testClusterUpdateSettingNonExistent() {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
Expand All @@ -147,4 +125,27 @@ public void testPutPipeline() throws IOException {
execute(request, highLevelClient().cluster()::putPipeline, highLevelClient().cluster()::putPipelineAsync);
assertTrue(putPipelineResponse.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildRandomXContentPipeline();
{
PutPipelineRequest request = new PutPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

GetPipelineRequest request = new GetPipelineRequest(id);

GetPipelineResponse response =
execute(request, highLevelClient().cluster()::getPipeline, highLevelClient().cluster()::getPipelineAsync);
assertTrue(response.isFound());
assertEquals(response.pipelines().get(0).getId(), id);
PipelineConfiguration expectedConfig =
new PipelineConfiguration(id, BytesReference.bytes(pipelineBuilder), pipelineBuilder.contentType());
assertEquals(expectedConfig.getConfigAsMap(), response.pipelines().get(0).getConfigAsMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -80,4 +85,42 @@ private HighLevelClient(RestClient restClient) {
super(restClient, (client) -> {}, Collections.emptyList());
}
}

protected static XContentBuilder buildRandomXContentPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
pipelineBuilder.startObject();
{
pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors");
pipelineBuilder.startArray(Pipeline.PROCESSORS_KEY);
{
pipelineBuilder.startObject().startObject("set");
{
pipelineBuilder
.field("field", "foo")
.field("value", "bar");
}
pipelineBuilder.endObject().endObject();
pipelineBuilder.startObject().startObject("convert");
{
pipelineBuilder
.field("field", "rank")
.field("type", "integer");
}
pipelineBuilder.endObject().endObject();
}
pipelineBuilder.endArray();
}
pipelineBuilder.endObject();
return pipelineBuilder;
}

protected static void createPipeline(String pipelineId) throws IOException {
XContentBuilder builder = buildRandomXContentPipeline();
createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType()));
}

protected static void createPipeline(PutPipelineRequest putPipelineRequest) throws IOException {
assertOK(client().performRequest(RequestConverters.putPipeline(putPipelineRequest)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
Expand Down Expand Up @@ -1425,6 +1426,20 @@ public void testPutPipeline() throws IOException {
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testGetPipeline() {
String pipelineId = "some_pipeline_id";
Map<String, String> expectedParams = new HashMap<>();
GetPipelineRequest request = new GetPipelineRequest("some_pipeline_id");
setRandomMasterTimeout(request, expectedParams);
Request expectedRequest = RequestConverters.getPipeline(request);
StringJoiner endpoint = new StringJoiner("/", "/", "");
endpoint.add("_ingest/pipeline");
endpoint.add(pipelineId);
assertEquals(endpoint.toString(), expectedRequest.getEndpoint());
assertEquals(HttpGet.METHOD_NAME, expectedRequest.getMethod());
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineResponse;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
Expand All @@ -34,11 +36,13 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.ingest.PipelineConfiguration;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -257,4 +261,74 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}

public void testGetPipeline() throws IOException {
RestHighLevelClient client = highLevelClient();

{
createPipeline("my-pipeline-id");
}

{
// tag::get-pipeline-request
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id"); // <1>
// end::end-pipeline-request

// tag::get-pipeline-request-masterTimeout
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); // <1>
request.masterNodeTimeout("1m"); // <2>
// end::get-pipeline-request-masterTimeout

// tag::get-pipeline-execute
GetPipelineResponse response = client.cluster().getPipeline(request); // <1>
// end::get-pipeline-execute

// tag::get-pipeline-response
boolean successful = response.isFound(); // <1>
List<PipelineConfiguration> pipelines = response.pipelines(); // <2>
for(PipelineConfiguration pipeline: pipelines) {
Map<String, Object> config = pipeline.getConfigAsMap(); // <3>
}
// end::get-pipeline-response

assertTrue(successful);
}
}

public void testGetPipelineAsync() throws Exception {
RestHighLevelClient client = highLevelClient();

{
createPipeline("my-pipeline-id");
}

{
GetPipelineRequest request = new GetPipelineRequest("my-pipeline-id");

// tag::get-pipeline-execute-listener
ActionListener<GetPipelineResponse> listener =
new ActionListener<GetPipelineResponse>() {
@Override
public void onResponse(GetPipelineResponse response) {
// <1>
}

@Override
public void onFailure(Exception e) {
// <2>
}
};
// end::get-pipeline-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::get-pipeline-execute-async
client.cluster().getPipelineAsync(request, listener); // <1>
// end::get-pipeline-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
}
Loading