diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupClient.java
index 1a766cb4923b4..3059eb4606589 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupClient.java
@@ -20,6 +20,8 @@
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.rollup.GetRollupJobRequest;
+import org.elasticsearch.client.rollup.GetRollupJobResponse;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobResponse;
@@ -73,4 +75,37 @@ public void putRollupJobAsync(PutRollupJobRequest request, RequestOptions option
PutRollupJobResponse::fromXContent,
listener, Collections.emptySet());
}
+
+ /**
+ * Get a rollup job from the cluster.
+ * See
+ * the docs for more.
+ * @param request the request
+ * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @return the response
+ * @throws IOException in case there is a problem sending the request or parsing back the response
+ */
+ public GetRollupJobResponse getRollupJob(GetRollupJobRequest request, RequestOptions options) throws IOException {
+ return restHighLevelClient.performRequestAndParseEntity(request,
+ RollupRequestConverters::getJob,
+ options,
+ GetRollupJobResponse::fromXContent,
+ Collections.emptySet());
+ }
+
+ /**
+ * Asynchronously get a rollup job from the cluster.
+ * See
+ * the docs for more.
+ * @param request the request
+ * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+ * @param listener the listener to be notified upon request completion
+ */
+ public void getRollupJobAsync(GetRollupJobRequest request, RequestOptions options, ActionListener listener) {
+ restHighLevelClient.performRequestAsyncAndParseEntity(request,
+ RollupRequestConverters::getJob,
+ options,
+ GetRollupJobResponse::fromXContent,
+ listener, Collections.emptySet());
+ }
}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupRequestConverters.java
index f1c4f77ae4c9a..261467fa268b9 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupRequestConverters.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RollupRequestConverters.java
@@ -18,7 +18,9 @@
*/
package org.elasticsearch.client;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
+import org.elasticsearch.client.rollup.GetRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import java.io.IOException;
@@ -42,4 +44,14 @@ static Request putJob(final PutRollupJobRequest putRollupJobRequest) throws IOEx
request.setEntity(createEntity(putRollupJobRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
+
+ static Request getJob(final GetRollupJobRequest getRollupJobRequest) {
+ String endpoint = new RequestConverters.EndpointBuilder()
+ .addPathPartAsIs("_xpack")
+ .addPathPartAsIs("rollup")
+ .addPathPartAsIs("job")
+ .addPathPart(getRollupJobRequest.getJobId())
+ .build();
+ return new Request(HttpGet.METHOD_NAME, endpoint);
+ }
}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobRequest.java
new file mode 100644
index 0000000000000..410bc7caa09da
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.rollup;
+
+import org.elasticsearch.client.Validatable;
+import org.elasticsearch.client.ValidationException;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Request to fetch rollup jobs.
+ */
+public class GetRollupJobRequest implements Validatable {
+ private final String jobId;
+
+ /**
+ * Create a requets .
+ * @param jobId id of the job to return or {@code _all} to return all jobs
+ */
+ public GetRollupJobRequest(final String jobId) {
+ Objects.requireNonNull(jobId, "jobId is required");
+ if ("_all".equals(jobId)) {
+ throw new IllegalArgumentException("use the default ctor to ask for all jobs");
+ }
+ this.jobId = jobId;
+ }
+
+ /**
+ * Create a request to load all rollup jobs.
+ */
+ public GetRollupJobRequest() {
+ this.jobId = "_all";
+ }
+
+ /**
+ * ID of the job to return.
+ */
+ public String getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public Optional validate() {
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final GetRollupJobRequest that = (GetRollupJobRequest) o;
+ return jobId.equals(that.jobId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobId);
+ }
+}
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobResponse.java
new file mode 100644
index 0000000000000..131e6ec0edabb
--- /dev/null
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobResponse.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.rollup;
+
+import org.elasticsearch.client.rollup.job.config.RollupJobConfig;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+import static java.util.Collections.unmodifiableList;
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Response from rollup's get jobs api.
+ */
+public class GetRollupJobResponse {
+ static final ParseField JOBS = new ParseField("jobs");
+ static final ParseField CONFIG = new ParseField("config");
+ static final ParseField STATS = new ParseField("stats");
+ static final ParseField STATUS = new ParseField("status");
+ static final ParseField NUM_PAGES = new ParseField("pages_processed");
+ static final ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
+ static final ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("rollups_indexed");
+ static final ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
+ static final ParseField STATE = new ParseField("job_state");
+ static final ParseField CURRENT_POSITION = new ParseField("current_position");
+ static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
+
+ private List jobs;
+
+ GetRollupJobResponse(final List jobs) {
+ this.jobs = Objects.requireNonNull(jobs, "jobs is required");
+ }
+
+ /**
+ * Jobs returned by the request.
+ */
+ public List getJobs() {
+ return jobs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final GetRollupJobResponse that = (GetRollupJobResponse) o;
+ return jobs.equals(that.jobs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobs);
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "get_rollup_job_response",
+ true,
+ args -> {
+ @SuppressWarnings("unchecked") // We're careful about the type in the list
+ List jobs = (List) args[0];
+ return new GetRollupJobResponse(unmodifiableList(jobs));
+ });
+ static {
+ PARSER.declareObjectArray(constructorArg(), JobWrapper.PARSER::apply, JOBS);
+ }
+
+ public static GetRollupJobResponse fromXContent(final XContentParser parser) throws IOException {
+ return PARSER.parse(parser, null);
+ }
+
+ @Override
+ public final String toString() {
+ return "{jobs=" + jobs.stream().map(Object::toString).collect(joining("\n")) + "\n}";
+ }
+
+ public static class JobWrapper {
+ private final RollupJobConfig job;
+ private final RollupIndexerJobStats stats;
+ private final RollupJobStatus status;
+
+ JobWrapper(RollupJobConfig job, RollupIndexerJobStats stats, RollupJobStatus status) {
+ this.job = job;
+ this.stats = stats;
+ this.status = status;
+ }
+
+ /**
+ * Configuration of the job.
+ */
+ public RollupJobConfig getJob() {
+ return job;
+ }
+
+ /**
+ * Statistics about the execution of the job.
+ */
+ public RollupIndexerJobStats getStats() {
+ return stats;
+ }
+
+ /**
+ * Current state of the job.
+ */
+ public RollupJobStatus getStatus() {
+ return status;
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ "job",
+ true,
+ a -> new JobWrapper((RollupJobConfig) a[0], (RollupIndexerJobStats) a[1], (RollupJobStatus) a[2]));
+ static {
+ PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> RollupJobConfig.fromXContent(p, null), CONFIG);
+ PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupIndexerJobStats.PARSER::apply, STATS);
+ PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupJobStatus.PARSER::apply, STATUS);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ JobWrapper other = (JobWrapper) obj;
+ return Objects.equals(job, other.job)
+ && Objects.equals(stats, other.stats)
+ && Objects.equals(status, other.status);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(job, stats, status);
+ }
+
+ @Override
+ public final String toString() {
+ return "{job=" + job
+ + ", stats=" + stats
+ + ", status=" + status + "}";
+ }
+ }
+
+ /**
+ * The Rollup specialization of stats for the AsyncTwoPhaseIndexer.
+ * Note: instead of `documents_indexed`, this XContent show `rollups_indexed`
+ */
+ public static class RollupIndexerJobStats {
+ private final long numPages;
+ private final long numInputDocuments;
+ private final long numOuputDocuments;
+ private final long numInvocations;
+
+ RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
+ this.numPages = numPages;
+ this.numInputDocuments = numInputDocuments;
+ this.numOuputDocuments = numOuputDocuments;
+ this.numInvocations = numInvocations;
+ }
+
+ /**
+ * The number of pages read from the input indices.
+ */
+ public long getNumPages() {
+ return numPages;
+ }
+
+ /**
+ * The number of documents read from the input indices.
+ */
+ public long getNumDocuments() {
+ return numInputDocuments;
+ }
+
+ /**
+ * Number of times that the job woke up to write documents.
+ */
+ public long getNumInvocations() {
+ return numInvocations;
+ }
+
+ /**
+ * Number of documents written to the result indices.
+ */
+ public long getOutputDocuments() {
+ return numOuputDocuments;
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ STATS.getPreferredName(),
+ true,
+ args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
+ static {
+ PARSER.declareLong(constructorArg(), NUM_PAGES);
+ PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
+ PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
+ PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (other == null || getClass() != other.getClass()) return false;
+ RollupIndexerJobStats that = (RollupIndexerJobStats) other;
+ return Objects.equals(this.numPages, that.numPages)
+ && Objects.equals(this.numInputDocuments, that.numInputDocuments)
+ && Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
+ && Objects.equals(this.numInvocations, that.numInvocations);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
+ }
+
+ @Override
+ public final String toString() {
+ return "{pages=" + numPages
+ + ", input_docs=" + numInputDocuments
+ + ", output_docs=" + numOuputDocuments
+ + ", invocations=" + numInvocations + "}";
+ }
+ }
+
+ /**
+ * Status of the rollup job.
+ */
+ public static class RollupJobStatus {
+ private final IndexerState state;
+ private final Map currentPosition;
+ private final boolean upgradedDocumentId;
+
+ RollupJobStatus(IndexerState state, Map position, boolean upgradedDocumentId) {
+ this.state = state;
+ this.currentPosition = position;
+ this.upgradedDocumentId = upgradedDocumentId;
+ }
+
+ /**
+ * The state of the writer.
+ */
+ public IndexerState getState() {
+ return state;
+ }
+ /**
+ * The current position of the writer.
+ */
+ public Map getCurrentPosition() {
+ return currentPosition;
+ }
+ /**
+ * Flag holds the state of the ID scheme, e.g. if it has been upgraded
+ * to the concatenation scheme.
+ */
+ public boolean getUpgradedDocumentId() {
+ return upgradedDocumentId;
+ }
+
+ private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(
+ STATUS.getPreferredName(),
+ true,
+ args -> {
+ IndexerState state = (IndexerState) args[0];
+ @SuppressWarnings("unchecked") // We're careful of the contents
+ Map currentPosition = (Map) args[1];
+ Boolean upgradedDocumentId = (Boolean) args[2];
+ return new RollupJobStatus(state, currentPosition, upgradedDocumentId == null ? false : upgradedDocumentId);
+ });
+ static {
+ PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), STATE, ObjectParser.ValueType.STRING);
+ PARSER.declareField(optionalConstructorArg(), p -> {
+ if (p.currentToken() == XContentParser.Token.START_OBJECT) {
+ return p.map();
+ }
+ if (p.currentToken() == XContentParser.Token.VALUE_NULL) {
+ return null;
+ }
+ throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
+ }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
+
+ // Optional to accommodate old versions of state
+ PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), UPGRADED_DOC_ID);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (other == null || getClass() != other.getClass()) return false;
+ RollupJobStatus that = (RollupJobStatus) other;
+ return Objects.equals(state, that.state)
+ && Objects.equals(currentPosition, that.currentPosition)
+ && upgradedDocumentId == that.upgradedDocumentId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(state, currentPosition, upgradedDocumentId);
+ }
+
+ @Override
+ public final String toString() {
+ return "{stats=" + state
+ + ", currentPosition=" + currentPosition
+ + ", upgradedDocumentId=" + upgradedDocumentId + "}";
+ }
+ }
+
+ /**
+ * IndexerState represents the internal state of the indexer. It
+ * is also persistent when changing from started/stopped in case the allocated
+ * task is restarted elsewhere.
+ */
+ public enum IndexerState {
+ /** Indexer is running, but not actively indexing data (e.g. it's idle). */
+ STARTED,
+
+ /** Indexer is actively indexing data. */
+ INDEXING,
+
+ /**
+ * Transition state to where an indexer has acknowledged the stop
+ * but is still in process of halting.
+ */
+ STOPPING,
+
+ /** Indexer is "paused" and ignoring scheduled triggers. */
+ STOPPED,
+
+ /**
+ * Something (internal or external) has requested the indexer abort
+ * and shutdown.
+ */
+ ABORTING;
+
+ static IndexerState fromString(String name) {
+ return valueOf(name.trim().toUpperCase(Locale.ROOT));
+ }
+
+ String value() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+ }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java
index 5d88b3f2e297b..9898dc971f928 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java
@@ -27,6 +27,10 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.rollup.GetRollupJobRequest;
+import org.elasticsearch.client.rollup.GetRollupJobResponse;
+import org.elasticsearch.client.rollup.GetRollupJobResponse.IndexerState;
+import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobResponse;
import org.elasticsearch.client.rollup.job.config.DateHistogramGroupConfig;
@@ -50,6 +54,13 @@
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.lessThan;
public class RollupIT extends ESRestHighLevelClientTestCase {
@@ -57,7 +68,7 @@ public class RollupIT extends ESRestHighLevelClientTestCase {
SumAggregationBuilder.NAME, AvgAggregationBuilder.NAME, ValueCountAggregationBuilder.NAME);
@SuppressWarnings("unchecked")
- public void testPutRollupJob() throws Exception {
+ public void testPutAndGetRollupJob() throws Exception {
double sum = 0.0d;
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
@@ -90,7 +101,7 @@ public void testPutRollupJob() throws Exception {
BulkResponse bulkResponse = highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
assertEquals(RestStatus.OK, bulkResponse.status());
- if (bulkResponse.hasFailures()) {
+ if (bulkResponse.hasFailures()) {
for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
if (itemResponse.isFailed()) {
logger.fatal(itemResponse.getFailureMessage());
@@ -158,5 +169,26 @@ public void testPutRollupJob() throws Exception {
}
}
});
+
+ // TODO when we move cleaning rollup into ESTestCase we can randomly choose the _all version of this request
+ GetRollupJobRequest getRollupJobRequest = new GetRollupJobRequest(id);
+ GetRollupJobResponse getResponse = execute(getRollupJobRequest, rollupClient::getRollupJob, rollupClient::getRollupJobAsync);
+ assertThat(getResponse.getJobs(), hasSize(1));
+ JobWrapper job = getResponse.getJobs().get(0);
+ assertEquals(putRollupJobRequest.getConfig(), job.getJob());
+ assertThat(job.getStats().getNumPages(), lessThan(10L));
+ assertEquals(numDocs, job.getStats().getNumDocuments());
+ assertThat(job.getStats().getNumInvocations(), greaterThan(0L));
+ assertEquals(1, job.getStats().getOutputDocuments());
+ assertThat(job.getStatus().getState(), either(equalTo(IndexerState.STARTED)).or(equalTo(IndexerState.INDEXING)));
+ assertThat(job.getStatus().getCurrentPosition(), hasKey("date.date_histogram"));
+ assertEquals(true, job.getStatus().getUpgradedDocumentId());
+ }
+
+ public void testGetMissingRollupJob() throws Exception {
+ GetRollupJobRequest getRollupJobRequest = new GetRollupJobRequest("missing");
+ RollupClient rollupClient = highLevelClient().rollup();
+ GetRollupJobResponse getResponse = execute(getRollupJobRequest, rollupClient::getRollupJob, rollupClient::getRollupJobAsync);
+ assertThat(getResponse.getJobs(), empty());
}
}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RollupRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RollupRequestConvertersTests.java
new file mode 100644
index 0000000000000..df7b2bbfca19e
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RollupRequestConvertersTests.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.elasticsearch.client.rollup.GetRollupJobRequest;
+import org.elasticsearch.client.rollup.PutRollupJobRequest;
+import org.elasticsearch.client.rollup.job.config.RollupJobConfig;
+import org.elasticsearch.client.rollup.job.config.RollupJobConfigTests;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.nullValue;
+
+public class RollupRequestConvertersTests extends ESTestCase {
+ public void testPutJob() throws IOException {
+ String job = randomAlphaOfLength(5);
+
+ RollupJobConfig config = RollupJobConfigTests.randomRollupJobConfig(job);
+ PutRollupJobRequest put = new PutRollupJobRequest(config);
+
+ Request request = RollupRequestConverters.putJob(put);
+ assertThat(request.getEndpoint(), equalTo("/_xpack/rollup/job/" + job));
+ assertThat(HttpPut.METHOD_NAME, equalTo(request.getMethod()));
+ assertThat(request.getParameters().keySet(), empty());
+ RequestConvertersTests.assertToXContentBody(put, request.getEntity());
+ }
+
+ public void testGetJob() {
+ boolean getAll = randomBoolean();
+ String job = getAll ? "_all" : RequestConvertersTests.randomIndicesNames(1, 1)[0];
+ GetRollupJobRequest get = getAll ? new GetRollupJobRequest() : new GetRollupJobRequest(job);
+
+ Request request = RollupRequestConverters.getJob(get);
+ assertThat(request.getEndpoint(), equalTo("/_xpack/rollup/job/" + job));
+ assertThat(HttpGet.METHOD_NAME, equalTo(request.getMethod()));
+ assertThat(request.getParameters().keySet(), empty());
+ assertThat(request.getEntity(), nullValue());
+ }
+}
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/RollupDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/RollupDocumentationIT.java
index aadb0f0f2005d..46b89d8682462 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/RollupDocumentationIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/RollupDocumentationIT.java
@@ -27,8 +27,15 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.ESRestHighLevelClientTestCase;
+import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.rollup.GetRollupJobRequest;
+import org.elasticsearch.client.rollup.GetRollupJobResponse;
+import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
+import org.elasticsearch.client.rollup.GetRollupJobResponse.RollupIndexerJobStats;
+import org.elasticsearch.client.rollup.GetRollupJobResponse.RollupJobStatus;
import org.elasticsearch.client.rollup.PutRollupJobRequest;
import org.elasticsearch.client.rollup.PutRollupJobResponse;
import org.elasticsearch.client.rollup.job.config.DateHistogramGroupConfig;
@@ -38,19 +45,26 @@
import org.elasticsearch.client.rollup.job.config.RollupJobConfig;
import org.elasticsearch.client.rollup.job.config.TermsGroupConfig;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
+import org.junit.After;
import org.junit.Before;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.hasSize;
public class RollupDocumentationIT extends ESRestHighLevelClientTestCase {
@@ -160,4 +174,110 @@ public void onFailure(Exception e) {
assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
}
+
+ public void testGetRollupJob() throws Exception {
+ testCreateRollupJob();
+ RestHighLevelClient client = highLevelClient();
+
+
+ // tag::x-pack-rollup-get-rollup-job-request
+ GetRollupJobRequest getAll = new GetRollupJobRequest(); // <1>
+ GetRollupJobRequest getJob = new GetRollupJobRequest("job_1"); // <2>
+ // end::x-pack-rollup-get-rollup-job-request
+
+ // tag::x-pack-rollup-get-rollup-job-execute
+ GetRollupJobResponse response = client.rollup().getRollupJob(getJob, RequestOptions.DEFAULT);
+ // end::x-pack-rollup-get-rollup-job-execute
+
+ // tag::x-pack-rollup-get-rollup-job-response
+ assertThat(response.getJobs(), hasSize(1));
+ JobWrapper job = response.getJobs().get(0); // <1>
+ RollupJobConfig config = job.getJob();
+ RollupJobStatus status = job.getStatus();
+ RollupIndexerJobStats stats = job.getStats();
+ // end::x-pack-rollup-get-rollup-job-response
+ assertNotNull(config);
+ assertNotNull(status);
+ assertNotNull(status);
+
+ // tag::x-pack-rollup-get-rollup-job-execute-listener
+ ActionListener listener = new ActionListener() {
+ @Override
+ public void onResponse(GetRollupJobResponse response) {
+ // <1>
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ // <2>
+ }
+ };
+ // end::x-pack-rollup-get-rollup-job-execute-listener
+
+ // Replace the empty listener by a blocking listener in test
+ final CountDownLatch latch = new CountDownLatch(1);
+ listener = new LatchedActionListener<>(listener, latch);
+
+ // tag::x-pack-rollup-get-rollup-job-execute-async
+ client.rollup().getRollupJobAsync(getJob, RequestOptions.DEFAULT, listener); // <1>
+ // end::x-pack-rollup-get-rollup-job-execute-async
+
+ assertTrue(latch.await(30L, TimeUnit.SECONDS));
+ }
+
+ @After
+ public void wipeRollup() throws Exception {
+ // TODO move this to ESRestTestCase
+ deleteRollupJobs();
+ waitForPendingRollupTasks();
+ }
+
+ private void deleteRollupJobs() throws Exception {
+ Response response = adminClient().performRequest(new Request("GET", "/_xpack/rollup/job/_all"));
+ Map jobs = entityAsMap(response);
+ @SuppressWarnings("unchecked")
+ List