From 1a00847dc93ca1a03b58d0fb0257731f1590725a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 10 Aug 2018 15:38:14 +0200 Subject: [PATCH] temporary merge PR 32743 (#32776) This is a temporary (squashed) commit of #32743 till ed4feb6, to be reverted and replaced by the final version of this PR --- .../docs/en/rest-api/rollup/get-job.asciidoc | 6 +- .../IndexerJobStats.java} | 68 ++-- .../job => indexing}/IndexerState.java | 2 +- .../xpack/core/indexing/Iteration.java | 62 +++ .../xpack/core/indexing/IterativeIndexer.java | 384 ++++++++++++++++++ .../rollup/action/GetRollupJobsAction.java | 14 +- .../core/rollup/job/RollupJobStatus.java | 1 + .../core/indexing/IndexerJobStatsTests.java | 34 ++ .../IndexerStateEnumTests.java | 2 +- .../core/indexing/IterativeIndexerTests.java | 133 ++++++ .../job/JobWrapperSerializingTests.java | 4 +- .../core/rollup/job/RollupJobStatsTests.java | 35 -- .../core/rollup/job/RollupJobStatusTests.java | 1 + .../xpack/rollup/job/IndexerUtils.java | 4 +- .../xpack/rollup/job/RollupIndexer.java | 346 ++-------------- .../xpack/rollup/job/RollupJobTask.java | 6 +- .../xpack/rollup/job/IndexerUtilsTests.java | 20 +- .../job/RollupIndexerIndexingTests.java | 2 +- .../rollup/job/RollupIndexerStateTests.java | 12 +- .../xpack/rollup/job/RollupJobTaskTests.java | 2 +- .../rest-api-spec/test/rollup/delete_job.yml | 6 +- .../rest-api-spec/test/rollup/get_jobs.yml | 6 +- .../rest-api-spec/test/rollup/put_job.yml | 2 +- .../elasticsearch/multi_node/RollupIT.java | 2 +- 24 files changed, 729 insertions(+), 425 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/job/RollupJobStats.java => indexing/IndexerJobStats.java} (62%) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/job => indexing}/IndexerState.java (97%) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/Iteration.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterativeIndexer.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IndexerJobStatsTests.java rename x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/{rollup/job => indexing}/IndexerStateEnumTests.java (98%) create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IterativeIndexerTests.java delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatsTests.java diff --git a/x-pack/docs/en/rest-api/rollup/get-job.asciidoc b/x-pack/docs/en/rest-api/rollup/get-job.asciidoc index 96053dbfea64f..1a90557106bfc 100644 --- a/x-pack/docs/en/rest-api/rollup/get-job.asciidoc +++ b/x-pack/docs/en/rest-api/rollup/get-job.asciidoc @@ -99,7 +99,7 @@ Which will yield the following response: "stats" : { "pages_processed" : 0, "documents_processed" : 0, - "rollups_indexed" : 0, + "documents_indexed" : 0, "trigger_count" : 0 } } @@ -219,7 +219,7 @@ Which will yield the following response: "stats" : { "pages_processed" : 0, "documents_processed" : 0, - "rollups_indexed" : 0, + "documents_indexed" : 0, "trigger_count" : 0 } }, @@ -268,7 +268,7 @@ Which will yield the following response: "stats" : { "pages_processed" : 0, "documents_processed" : 0, - "rollups_indexed" : 0, + "documents_indexed" : 0, "trigger_count" : 0 } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerJobStats.java similarity index 62% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStats.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerJobStats.java index 06cfb520af552..f64ed5804535d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerJobStats.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.rollup.job; +package org.elasticsearch.xpack.core.indexing; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; @@ -13,7 +13,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; - import java.io.IOException; import java.util.Objects; @@ -24,45 +23,46 @@ * and are only for external monitoring/reference. Statistics are not persisted with the job, so if the * allocated task is shutdown/restarted on a different node all the stats will reset. */ -public class RollupJobStats implements ToXContentObject, Writeable { +public class IndexerJobStats implements ToXContentObject, Writeable { public static final ParseField NAME = new ParseField("job_stats"); private static ParseField NUM_PAGES = new ParseField("pages_processed"); - private static ParseField NUM_DOCUMENTS = new ParseField("documents_processed"); - private static ParseField NUM_ROLLUPS = new ParseField("rollups_indexed"); + private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed"); + // BWC for RollupJobStats + private static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed").withDeprecation("rollups_indexed"); private static ParseField NUM_INVOCATIONS = new ParseField("trigger_count"); private long numPages = 0; - private long numDocuments = 0; - private long numRollups = 0; + private long numInputDocuments = 0; + private long numOuputDocuments = 0; private long numInvocations = 0; - public static final ConstructingObjectParser PARSER = + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME.getPreferredName(), - args -> new RollupJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); + args -> new IndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); static { PARSER.declareLong(constructorArg(), NUM_PAGES); - PARSER.declareLong(constructorArg(), NUM_DOCUMENTS); - PARSER.declareLong(constructorArg(), NUM_ROLLUPS); + PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS); + PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS); PARSER.declareLong(constructorArg(), NUM_INVOCATIONS); } - public RollupJobStats() { + public IndexerJobStats() { } - public RollupJobStats(long numPages, long numDocuments, long numRollups, long numInvocations) { + public IndexerJobStats(long numPages, long numDocuments, long numOuputDocuments, long numInvocations) { this.numPages = numPages; - this.numDocuments = numDocuments; - this.numRollups = numRollups; + this.numInputDocuments = numDocuments; + this.numOuputDocuments = numOuputDocuments; this.numInvocations = numInvocations; } - public RollupJobStats(StreamInput in) throws IOException { + public IndexerJobStats(StreamInput in) throws IOException { this.numPages = in.readVLong(); - this.numDocuments = in.readVLong(); - this.numRollups = in.readVLong(); + this.numInputDocuments = in.readVLong(); + this.numOuputDocuments = in.readVLong(); this.numInvocations = in.readVLong(); } @@ -71,15 +71,15 @@ public long getNumPages() { } public long getNumDocuments() { - return numDocuments; + return numInputDocuments; } public long getNumInvocations() { return numInvocations; } - public long getNumRollups() { - return numRollups; + public long getOutputDocuments() { + return numOuputDocuments; } public void incrementNumPages(long n) { @@ -89,7 +89,7 @@ public void incrementNumPages(long n) { public void incrementNumDocuments(long n) { assert(n >= 0); - numDocuments += n; + numInputDocuments += n; } public void incrementNumInvocations(long n) { @@ -97,20 +97,20 @@ public void incrementNumInvocations(long n) { numInvocations += n; } - public void incrementNumRollups(long n) { + public void incrementNumOutputDocuments(long n) { assert(n >= 0); - numRollups += n; + numOuputDocuments += n; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(numPages); - out.writeVLong(numDocuments); - out.writeVLong(numRollups); + out.writeVLong(numInputDocuments); + out.writeVLong(numOuputDocuments); out.writeVLong(numInvocations); } - public static RollupJobStats fromXContent(XContentParser parser) { + public static IndexerJobStats fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); } catch (IOException e) { @@ -122,8 +122,8 @@ public static RollupJobStats fromXContent(XContentParser parser) { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(NUM_PAGES.getPreferredName(), numPages); - builder.field(NUM_DOCUMENTS.getPreferredName(), numDocuments); - builder.field(NUM_ROLLUPS.getPreferredName(), numRollups); + builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments); + builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments); builder.field(NUM_INVOCATIONS.getPreferredName(), numInvocations); builder.endObject(); return builder; @@ -139,18 +139,16 @@ public boolean equals(Object other) { return false; } - RollupJobStats that = (RollupJobStats) other; + IndexerJobStats that = (IndexerJobStats) other; return Objects.equals(this.numPages, that.numPages) - && Objects.equals(this.numDocuments, that.numDocuments) - && Objects.equals(this.numRollups, that.numRollups) + && Objects.equals(this.numInputDocuments, that.numInputDocuments) + && Objects.equals(this.numOuputDocuments, that.numOuputDocuments) && Objects.equals(this.numInvocations, that.numInvocations); } @Override public int hashCode() { - return Objects.hash(numPages, numDocuments, numRollups, numInvocations); + return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations); } - } - diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/IndexerState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerState.java similarity index 97% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/IndexerState.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerState.java index 6e211c1df9e3e..1b6b9a943cba2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/IndexerState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IndexerState.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.rollup.job; +package org.elasticsearch.xpack.core.indexing; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/Iteration.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/Iteration.java new file mode 100644 index 0000000000000..5568ecd5ff806 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/Iteration.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.indexing; + +import org.elasticsearch.action.index.IndexRequest; + +import java.util.List; + +/** + * Result object to hold the result of 1 iteration of iterative indexing. + * Acts as an interface between the implementation and the generic indexer. + */ +public class Iteration { + + private final boolean isDone; + private final JobPosition position; + private final List toIndex; + + /** + * Constructor for the result of 1 iteration. + * + * @param toIndex the list of requests to be indexed + * @param position the extracted, persistable position of the job required for the search phase + * @param isDone true if source is exhausted and job should go to sleep + * + * Note: toIndex.empty() != isDone due to possible filtering in the specific implementation + */ + public Iteration(List toIndex, JobPosition position, boolean isDone) { + this.toIndex = toIndex; + this.position = position; + this.isDone = isDone; + } + + /** + * Returns true if this indexing iteration is done and job should go into sleep mode. + */ + public boolean isDone() { + return isDone; + } + + /** + * Return the position of the job, a generic to be passed to the next query construction. + * + * @return the position + */ + public JobPosition getPosition() { + return position; + } + + /** + * List of requests to be passed to bulk indexing. + * + * @return List of index requests. + */ + public List getToIndex() { + return toIndex; + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterativeIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterativeIndexer.java new file mode 100644 index 0000000000000..9ec71b88e466c --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterativeIndexer.java @@ -0,0 +1,384 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.indexing; + +import org.apache.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, + * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). + * Only one background job can run simultaneously and {@link #onFinish()} is called when the job + * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is + * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when + * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. + * + * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query, + * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input. + * + * @param Type that defines a job position to be defined by the implementation. + */ +public abstract class IterativeIndexer { + private static final Logger logger = Logger.getLogger(IterativeIndexer.class.getName()); + + private final IndexerJobStats stats; + + private final AtomicReference state; + private final AtomicReference position; + private final Executor executor; + + protected IterativeIndexer(Executor executor, AtomicReference initialState, JobPosition initialPosition) { + this.executor = executor; + this.state = initialState; + this.position = new AtomicReference<>(initialPosition); + this.stats = new IndexerJobStats(); + } + + /** + * Get the current state of the indexer. + */ + public IndexerState getState() { + return state.get(); + } + + /** + * Get the current position of the indexer. + */ + public JobPosition getPosition() { + return position.get(); + } + + /** + * Get the stats of this indexer. + */ + public IndexerJobStats getStats() { + return stats; + } + + /** + * Sets the internal state to {@link IndexerState#STARTED} if the previous state + * was {@link IndexerState#STOPPED}. Setting the state to STARTED allows a job + * to run in the background when {@link #maybeTriggerAsyncJob(long)} is called. + * + * @return The new state for the indexer (STARTED, INDEXING or ABORTING if the + * job was already aborted). + */ + public synchronized IndexerState start() { + state.compareAndSet(IndexerState.STOPPED, IndexerState.STARTED); + return state.get(); + } + + /** + * Sets the internal state to {@link IndexerState#STOPPING} if an async job is + * running in the background and in such case {@link #onFinish()} will be called + * as soon as the background job detects that the indexer is stopped. If there + * is no job running when this function is called, the state is directly set to + * {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called. + * + * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the + * job was already aborted). + */ + public synchronized IndexerState stop() { + IndexerState currentState = state.updateAndGet(previousState -> { + if (previousState == IndexerState.INDEXING) { + return IndexerState.STOPPING; + } else if (previousState == IndexerState.STARTED) { + return IndexerState.STOPPED; + } else { + return previousState; + } + }); + return currentState; + } + + /** + * Sets the internal state to {@link IndexerState#ABORTING}. It returns false if + * an async job is running in the background and in such case {@link #onAbort} + * will be called as soon as the background job detects that the indexer is + * aborted. If there is no job running when this function is called, it returns + * true and {@link #onAbort()} will never be called. + * + * @return true if the indexer is aborted, false if a background job is running + * and abort is delayed. + */ + public synchronized boolean abort() { + IndexerState prevState = state.getAndUpdate((prev) -> IndexerState.ABORTING); + return prevState == IndexerState.STOPPED || prevState == IndexerState.STARTED; + } + + /** + * Triggers a background job that builds the index asynchronously iff + * there is no other job that runs and the indexer is started + * ({@link IndexerState#STARTED}. + * + * @param now + * The current time in milliseconds (used to limit the job to + * complete buckets) + * @return true if a job has been triggered, false otherwise + */ + public synchronized boolean maybeTriggerAsyncJob(long now) { + final IndexerState currentState = state.get(); + switch (currentState) { + case INDEXING: + case STOPPING: + case ABORTING: + logger.warn("Schedule was triggered for job [" + getJobId() + "], but prior indexer is still running."); + return false; + + case STOPPED: + logger.debug("Schedule was triggered for job [" + getJobId() + "] but job is stopped. Ignoring trigger."); + return false; + + case STARTED: + logger.debug("Schedule was triggered for job [" + getJobId() + "], state: [" + currentState + "]"); + stats.incrementNumInvocations(1); + onStart(now); + + if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) { + // fire off the search. Note this is async, the method will return from here + executor.execute(() -> doNextSearch(buildSearchRequest(), + ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)))); + logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]"); + return true; + } else { + logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]"); + return false; + } + + default: + logger.warn("Encountered unexpected state [" + currentState + "] while indexing"); + throw new IllegalStateException("Job encountered an illegal state [" + currentState + "]"); + } + } + + /** + * Called to get the Id of the job, used for logging. + * + * @return a string with the id of the job + */ + protected abstract String getJobId(); + + /** + * Called to process a response from the 1 search request in order to turn it into a {@link Iteration}. + * + * @param searchResponse response from the search phase. + * @return Iteration object to be passed to indexing phase. + */ + protected abstract Iteration doProcess(SearchResponse searchResponse); + + /** + * Called to build the next search request. + * + * @return SearchRequest to be passed to the search phase. + */ + protected abstract SearchRequest buildSearchRequest(); + + /** + * Called at startup after job has been triggered using {@link #maybeTriggerAsyncJob(long)} and the + * internal state is {@link IndexerState#STARTED}. + * + * @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)} + */ + protected abstract void onStart(long now); + + /** + * Executes the {@link SearchRequest} and calls nextPhase with the + * response or the exception if an error occurs. + * + * @param request + * The search request to execute + * @param nextPhase + * Listener for the next phase + */ + protected abstract void doNextSearch(SearchRequest request, ActionListener nextPhase); + + /** + * Executes the {@link BulkRequest} and calls nextPhase with the + * response or the exception if an error occurs. + * + * @param request + * The bulk request to execute + * @param nextPhase + * Listener for the next phase + */ + protected abstract void doNextBulk(BulkRequest request, ActionListener nextPhase); + + /** + * Called periodically during the execution of a background job. Implementation + * should persists the state somewhere and continue the execution asynchronously + * using next. + * + * @param state + * The current state of the indexer + * @param position + * The current position of the indexer + * @param next + * Runnable for the next phase + */ + protected abstract void doSaveState(IndexerState state, JobPosition position, Runnable next); + + /** + * Called when a failure occurs in an async job causing the execution to stop. + * + * @param exc + * The exception + */ + protected abstract void onFailure(Exception exc); + + /** + * Called when a background job finishes. + */ + protected abstract void onFinish(); + + /** + * Called when a background job detects that the indexer is aborted causing the + * async execution to stop. + */ + protected abstract void onAbort(); + + private void finishWithFailure(Exception exc) { + doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc)); + } + + private IndexerState finishAndSetState() { + return state.updateAndGet(prev -> { + switch (prev) { + case INDEXING: + // ready for another job + return IndexerState.STARTED; + + case STOPPING: + // must be started again + return IndexerState.STOPPED; + + case ABORTING: + // abort and exit + onAbort(); + return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first + + case STOPPED: + // No-op. Shouldn't really be possible to get here (should have to go through + // STOPPING + // first which will be handled) but is harmless to no-op and we don't want to + // throw exception here + return IndexerState.STOPPED; + + default: + // any other state is unanticipated at this point + throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]"); + } + }); + } + + private void onSearchResponse(SearchResponse searchResponse) { + try { + if (checkState(getState()) == false) { + return; + } + if (searchResponse.getShardFailures().length != 0) { + throw new RuntimeException("Shard failures encountered while running indexer for job [" + getJobId() + "]: " + + Arrays.toString(searchResponse.getShardFailures())); + } + + stats.incrementNumPages(1); + Iteration iteration = doProcess(searchResponse); + + if (iteration.isDone()) { + logger.debug("Finished indexing for job [" + getJobId() + "], saving state and shutting down."); + + // Change state first, then try to persist. This prevents in-progress + // STOPPING/ABORTING from + // being persisted as STARTED but then stop the job + doSaveState(finishAndSetState(), position.get(), this::onFinish); + return; + } + + final List docs = iteration.getToIndex(); + final BulkRequest bulkRequest = new BulkRequest(); + docs.forEach(bulkRequest::add); + + // TODO this might be a valid case, e.g. if implementation filters + assert bulkRequest.requests().size() > 0; + + doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> { + // TODO we should check items in the response and move after accordingly to + // resume the failing buckets ? + if (bulkResponse.hasFailures()) { + logger.warn("Error while attempting to bulk index documents: " + bulkResponse.buildFailureMessage()); + } + stats.incrementNumOutputDocuments(bulkResponse.getItems().length); + if (checkState(getState()) == false) { + return; + } + + JobPosition newPosition = iteration.getPosition(); + position.set(newPosition); + + onBulkResponse(bulkResponse, newPosition); + }, exc -> finishWithFailure(exc))); + } catch (Exception e) { + finishWithFailure(e); + } + } + + private void onBulkResponse(BulkResponse response, JobPosition position) { + try { + + ActionListener listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure); + // TODO probably something more intelligent than every-50 is needed + if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) { + doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener)); + } else { + doNextSearch(buildSearchRequest(), listener); + } + } catch (Exception e) { + finishWithFailure(e); + } + } + + /** + * Checks the {@link IndexerState} and returns false if the execution should be + * stopped. + */ + private boolean checkState(IndexerState currentState) { + switch (currentState) { + case INDEXING: + // normal state; + return true; + + case STOPPING: + logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); + doSaveState(finishAndSetState(), getPosition(), () -> { + }); + return false; + + case STOPPED: + return false; + + case ABORTING: + logger.info("Requested shutdown of indexer for job [" + getJobId() + "]"); + onAbort(); + return false; + + default: + // Anything other than indexing, aborting or stopping is unanticipated + logger.warn("Encountered unexpected state [" + currentState + "] while indexing"); + throw new IllegalStateException("Indexer job encountered an illegal state [" + currentState + "]"); + } + } + +} \ No newline at end of file diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java index 50f7931508585..4bfd5b621e780 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java @@ -25,9 +25,9 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.indexing.IndexerJobStats; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; -import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import java.io.IOException; @@ -204,20 +204,20 @@ public final String toString() { public static class JobWrapper implements Writeable, ToXContentObject { private final RollupJobConfig job; - private final RollupJobStats stats; + private final IndexerJobStats stats; private final RollupJobStatus status; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, a -> new JobWrapper((RollupJobConfig) a[0], - (RollupJobStats) a[1], (RollupJobStatus)a[2])); + (IndexerJobStats) a[1], (RollupJobStatus)a[2])); static { PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> RollupJobConfig.fromXContent(p, null), CONFIG); - PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupJobStats.PARSER::apply, STATS); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), IndexerJobStats.PARSER::apply, STATS); PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupJobStatus.PARSER::apply, STATUS); } - public JobWrapper(RollupJobConfig job, RollupJobStats stats, RollupJobStatus status) { + public JobWrapper(RollupJobConfig job, IndexerJobStats stats, RollupJobStatus status) { this.job = job; this.stats = stats; this.status = status; @@ -225,7 +225,7 @@ public JobWrapper(RollupJobConfig job, RollupJobStats stats, RollupJobStatus sta public JobWrapper(StreamInput in) throws IOException { this.job = new RollupJobConfig(in); - this.stats = new RollupJobStats(in); + this.stats = new IndexerJobStats(in); this.status = new RollupJobStatus(in); } @@ -240,7 +240,7 @@ public RollupJobConfig getJob() { return job; } - public RollupJobStats getStats() { + public IndexerJobStats getStats() { return stats; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java index 640385c9c80d5..0a2f046907c80 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.indexing.IndexerState; import java.io.IOException; import java.util.HashMap; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IndexerJobStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IndexerJobStatsTests.java new file mode 100644 index 0000000000000..e60573d3ed071 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IndexerJobStatsTests.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexing; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +public class IndexerJobStatsTests extends AbstractSerializingTestCase { + + @Override + protected IndexerJobStats createTestInstance() { + return randomStats(); + } + + @Override + protected Writeable.Reader instanceReader() { + return IndexerJobStats::new; + } + + @Override + protected IndexerJobStats doParseInstance(XContentParser parser) { + return IndexerJobStats.fromXContent(parser); + } + + public static IndexerJobStats randomStats() { + return new IndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong()); + } +} + diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/IndexerStateEnumTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IndexerStateEnumTests.java similarity index 98% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/IndexerStateEnumTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IndexerStateEnumTests.java index ec17a37e23b2b..329800c2f1a24 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/IndexerStateEnumTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IndexerStateEnumTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.rollup.job; +package org.elasticsearch.xpack.core.indexing; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IterativeIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IterativeIndexerTests.java new file mode 100644 index 0000000000000..85066cb42f519 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/IterativeIndexerTests.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.indexing; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; + +public class IterativeIndexerTests extends ESTestCase { + + AtomicBoolean isFinished = new AtomicBoolean(false); + + private class MockIndexer extends IterativeIndexer { + + // test the execution order + private int step; + + protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition) { + super(executor, initialState, initialPosition); + } + + @Override + protected String getJobId() { + return "mock"; + } + + @Override + protected Iteration doProcess(SearchResponse searchResponse) { + assertThat(step, equalTo(3)); + ++step; + return new Iteration(Collections.emptyList(), 3, true); + } + + @Override + protected SearchRequest buildSearchRequest() { + assertThat(step, equalTo(1)); + ++step; + return null; + } + + @Override + protected void onStart(long now) { + assertThat(step, equalTo(0)); + ++step; + } + + @Override + protected void doNextSearch(SearchRequest request, ActionListener nextPhase) { + assertThat(step, equalTo(2)); + ++step; + final SearchResponseSections sections = new SearchResponseSections(new SearchHits(new SearchHit[0], 0, 0), null, null, false, + null, null, 1); + + nextPhase.onResponse(new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null)); + } + + @Override + protected void doNextBulk(BulkRequest request, ActionListener nextPhase) { + fail("should not be called"); + } + + @Override + protected void doSaveState(IndexerState state, Integer position, Runnable next) { + assertThat(step, equalTo(4)); + ++step; + next.run(); + } + + @Override + protected void onFailure(Exception exc) { + fail(exc.getMessage()); + } + + @Override + protected void onFinish() { + assertThat(step, equalTo(5)); + ++step; + isFinished.set(true); + } + + @Override + protected void onAbort() { + } + + public int getStep() { + return step; + } + + } + + public void testStateMachine() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + + try { + + MockIndexer indexer = new MockIndexer(executor, state, 2); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + assertThat(indexer.getPosition(), equalTo(2)); + ESTestCase.awaitBusy(() -> isFinished.get()); + assertThat(indexer.getStep(), equalTo(6)); + assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); + assertThat(indexer.getStats().getNumPages(), equalTo(1L)); + assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); + assertTrue(indexer.abort()); + } finally { + executor.shutdownNow(); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java index a0df63bc38dde..cb827c040801d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/JobWrapperSerializingTests.java @@ -8,6 +8,8 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.indexing.IndexerJobStats; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; @@ -40,7 +42,7 @@ protected GetRollupJobsAction.JobWrapper createTestInstance() { } return new GetRollupJobsAction.JobWrapper(ConfigTestHelpers.randomRollupJobConfig(random()), - new RollupJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), + new IndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), new RollupJobStatus(state, Collections.emptyMap(), randomBoolean())); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatsTests.java deleted file mode 100644 index 0091b21dc40d0..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatsTests.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.core.rollup.job; - -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractSerializingTestCase; -import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; - -public class RollupJobStatsTests extends AbstractSerializingTestCase { - - @Override - protected RollupJobStats createTestInstance() { - return randomStats(); - } - - @Override - protected Writeable.Reader instanceReader() { - return RollupJobStats::new; - } - - @Override - protected RollupJobStats doParseInstance(XContentParser parser) { - return RollupJobStats.fromXContent(parser); - } - - public static RollupJobStats randomStats() { - return new RollupJobStats(randomNonNegativeLong(), randomNonNegativeLong(), - randomNonNegativeLong(), randomNonNegativeLong()); - } -} - diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatusTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatusTests.java index 2c802a7e41dc3..f46bda788bf5b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatusTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatusTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.indexing.IndexerState; import java.util.HashMap; import java.util.Map; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java index 9119a5445d42e..594221d921488 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java @@ -14,10 +14,10 @@ import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.xpack.core.indexing.IndexerJobStats; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; -import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; import org.elasticsearch.xpack.rollup.Rollup; import java.util.ArrayList; @@ -46,7 +46,7 @@ class IndexerUtils { * @param isUpgradedDocID `true` if this job is using the new ID scheme * @return A list of rolled documents derived from the response */ - static List processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats, + static List processBuckets(CompositeAggregation agg, String rollupIndex, IndexerJobStats stats, GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) { logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]"); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 87294706b3b7d..c2a07b47349aa 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -5,11 +5,6 @@ */ package org.elasticsearch.xpack.rollup.job; -import org.apache.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.unit.TimeValue; @@ -20,16 +15,16 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.indexing.Iteration; +import org.elasticsearch.xpack.core.indexing.IterativeIndexer; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; -import org.elasticsearch.xpack.core.rollup.job.IndexerState; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; -import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,25 +33,13 @@ import java.util.concurrent.atomic.AtomicReference; /** - * An abstract class that builds a rollup index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, - * it will create the rollup index from the source index up to the last complete bucket that is allowed to be built (based on the current - * time and the delay set on the rollup job). Only one background job can run simultaneously and {@link #onFinish()} is called when the job - * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is - * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when - * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. + * An abstract implementation of {@link IterativeIndexer} that builds a rollup index incrementally. */ -public abstract class RollupIndexer { - private static final Logger logger = Logger.getLogger(RollupIndexer.class.getName()); - +public abstract class RollupIndexer extends IterativeIndexer > { static final String AGGREGATION_NAME = RollupField.NAME; private final RollupJob job; - private final RollupJobStats stats; - private final AtomicReference state; - private final AtomicReference> position; - private final Executor executor; protected final AtomicBoolean upgradedDocumentID; - private final CompositeAggregationBuilder compositeBuilder; private long maxBoundary; @@ -66,84 +49,16 @@ public abstract class RollupIndexer { * @param job The rollup job * @param initialState Initial state for the indexer * @param initialPosition The last indexed bucket of the task + * @param upgradedDocumentID whether job has updated IDs (for BWC) */ - RollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, - Map initialPosition, AtomicBoolean upgradedDocumentID) { - this.executor = executor; + RollupIndexer(Executor executor, RollupJob job, AtomicReference initialState, Map initialPosition, + AtomicBoolean upgradedDocumentID) { + super(executor, initialState, initialPosition); this.job = job; - this.stats = new RollupJobStats(); - this.state = initialState; - this.position = new AtomicReference<>(initialPosition); this.compositeBuilder = createCompositeBuilder(job.getConfig()); this.upgradedDocumentID = upgradedDocumentID; } - /** - * Executes the {@link SearchRequest} and calls nextPhase with the response - * or the exception if an error occurs. - * - * @param request The search request to execute - * @param nextPhase Listener for the next phase - */ - protected abstract void doNextSearch(SearchRequest request, ActionListener nextPhase); - - /** - * Executes the {@link BulkRequest} and calls nextPhase with the response - * or the exception if an error occurs. - * - * @param request The bulk request to execute - * @param nextPhase Listener for the next phase - */ - protected abstract void doNextBulk(BulkRequest request, ActionListener nextPhase); - - /** - * Called periodically during the execution of a background job. Implementation should - * persists the state somewhere and continue the execution asynchronously using next. - * - * @param state The current state of the indexer - * @param position The current position of the indexer - * @param next Runnable for the next phase - */ - protected abstract void doSaveState(IndexerState state, Map position, Runnable next); - - /** - * Called when a failure occurs in an async job causing the execution to stop. - * @param exc The exception - */ - protected abstract void onFailure(Exception exc); - - /** - * Called when a background job finishes. - */ - protected abstract void onFinish(); - - /** - * Called when a background job detects that the indexer is aborted causing the async execution - * to stop. - */ - protected abstract void onAbort(); - - /** - * Get the current state of the indexer. - */ - public IndexerState getState() { - return state.get(); - } - - /** - * Get the current position of the indexer. - */ - public Map getPosition() { - return position.get(); - } - - /** - * Get the stats of this indexer. - */ - public RollupJobStats getStats() { - return stats; - } - /** * Returns if this job has upgraded it's ID scheme yet or not */ @@ -151,229 +66,28 @@ public boolean isUpgradedDocumentID() { return upgradedDocumentID.get(); } - /** - * Sets the internal state to {@link IndexerState#STARTED} if the previous state was {@link IndexerState#STOPPED}. Setting the state to - * STARTED allows a job to run in the background when {@link #maybeTriggerAsyncJob(long)} is called. - * @return The new state for the indexer (STARTED, INDEXING or ABORTING if the job was already aborted). - */ - public synchronized IndexerState start() { - state.compareAndSet(IndexerState.STOPPED, IndexerState.STARTED); - return state.get(); - } - - /** - * Sets the internal state to {@link IndexerState#STOPPING} if an async job is running in the background and in such case - * {@link #onFinish()} will be called as soon as the background job detects that the indexer is stopped. If there is no job running when - * this function is called, the state is directly set to {@link IndexerState#STOPPED} and {@link #onFinish()} will never be called. - * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). - */ - public synchronized IndexerState stop() { - IndexerState currentState = state.updateAndGet(previousState -> { - if (previousState == IndexerState.INDEXING) { - return IndexerState.STOPPING; - } else if (previousState == IndexerState.STARTED) { - return IndexerState.STOPPED; - } else { - return previousState; - } - }); - return currentState; - } - - /** - * Sets the internal state to {@link IndexerState#ABORTING}. It returns false if an async job is running in the background and in such - * case {@link #onAbort} will be called as soon as the background job detects that the indexer is aborted. If there is no job running - * when this function is called, it returns true and {@link #onAbort()} will never be called. - * @return true if the indexer is aborted, false if a background job is running and abort is delayed. - */ - public synchronized boolean abort() { - IndexerState prevState = state.getAndUpdate((prev) -> IndexerState.ABORTING); - return prevState == IndexerState.STOPPED || prevState == IndexerState.STARTED; - } - - /** - * Triggers a background job that builds the rollup index asynchronously iff there is no other job that runs - * and the indexer is started ({@link IndexerState#STARTED}. - * - * @param now The current time in milliseconds (used to limit the job to complete buckets) - * @return true if a job has been triggered, false otherwise - */ - public synchronized boolean maybeTriggerAsyncJob(long now) { - final IndexerState currentState = state.get(); - switch (currentState) { - case INDEXING: - case STOPPING: - case ABORTING: - logger.warn("Schedule was triggered for rollup job [" + job.getConfig().getId() + "], but prior indexer is still running."); - return false; - - case STOPPED: - logger.debug("Schedule was triggered for rollup job [" + job.getConfig().getId() - + "] but job is stopped. Ignoring trigger."); - return false; - - case STARTED: - logger.debug("Schedule was triggered for rollup job [" + job.getConfig().getId() + "], state: [" + currentState + "]"); - // Only valid time to start indexing is when we are STARTED but not currently INDEXING. - stats.incrementNumInvocations(1); - - // rounds the current time to its current bucket based on the date histogram interval. - // this is needed to exclude buckets that can still receive new documents. - DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram(); - long rounded = dateHisto.createRounding().round(now); - if (dateHisto.getDelay() != null) { - // if the job has a delay we filter all documents that appear before it. - maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis(); - } else { - maxBoundary = rounded; - } - - if (state.compareAndSet(IndexerState.STARTED, IndexerState.INDEXING)) { - // fire off the search. Note this is async, the method will return from here - executor.execute(() -> doNextSearch(buildSearchRequest(), - ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)))); - logger.debug("Beginning to rollup [" + job.getConfig().getId() + "], state: [" + currentState + "]"); - return true; - } else { - logger.debug("Could not move from STARTED to INDEXING state because current state is [" + state.get() + "]"); - return false; - } - - default: - logger.warn("Encountered unexpected state [" + currentState + "] while indexing"); - throw new IllegalStateException("Rollup job encountered an illegal state [" + currentState + "]"); - } - } - - /** - * Checks the {@link IndexerState} and returns false if the execution - * should be stopped. - */ - private boolean checkState(IndexerState currentState) { - switch (currentState) { - case INDEXING: - // normal state; - return true; - - case STOPPING: - logger.info("Rollup job encountered [" + IndexerState.STOPPING + "] state, halting indexer."); - doSaveState(finishAndSetState(), getPosition(), () -> {}); - return false; - - case STOPPED: - return false; - - case ABORTING: - logger.info("Requested shutdown of indexer for job [" + job.getConfig().getId() + "]"); - onAbort(); - return false; - - default: - // Anything other than indexing, aborting or stopping is unanticipated - logger.warn("Encountered unexpected state [" + currentState + "] while indexing"); - throw new IllegalStateException("Rollup job encountered an illegal state [" + currentState + "]"); - } + @Override + protected String getJobId() { + return job.getConfig().getId(); } - private void onBulkResponse(BulkResponse response, Map after) { - // TODO we should check items in the response and move after accordingly to resume the failing buckets ? - stats.incrementNumRollups(response.getItems().length); - if (response.hasFailures()) { - logger.warn("Error while attempting to bulk index rollup documents: " + response.buildFailureMessage()); - } - try { - if (checkState(getState()) == false) { - return ; - } - position.set(after); - ActionListener listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure); - // TODO probably something more intelligent than every-50 is needed - if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) { - doSaveState(IndexerState.INDEXING, after, () -> doNextSearch(buildSearchRequest(), listener)); - } else { - doNextSearch(buildSearchRequest(), listener); - } - } catch (Exception e) { - finishWithFailure(e); + @Override + protected void onStart(long now) { + // this is needed to exclude buckets that can still receive new documents. + DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram(); + long rounded = dateHisto.createRounding().round(now); + if (dateHisto.getDelay() != null) { + // if the job has a delay we filter all documents that appear before it. + maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis(); + } else { + maxBoundary = rounded; } } - - private void onSearchResponse(SearchResponse searchResponse) { - try { - if (checkState(getState()) == false) { - return ; - } - if (searchResponse.getShardFailures().length != 0) { - throw new RuntimeException("Shard failures encountered while running indexer for rollup job [" - + job.getConfig().getId() + "]: " + Arrays.toString(searchResponse.getShardFailures())); - } - final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME); - if (response == null) { - throw new IllegalStateException("Missing composite response for query: " + compositeBuilder.toString()); - } - stats.incrementNumPages(1); - if (response.getBuckets().isEmpty()) { - // this is the end... - logger.debug("Finished indexing for job [" + job.getConfig().getId() + "], saving state and shutting down."); - - // Change state first, then try to persist. This prevents in-progress STOPPING/ABORTING from - // being persisted as STARTED but then stop the job - doSaveState(finishAndSetState(), position.get(), this::onFinish); - return; - } - - final BulkRequest bulkRequest = new BulkRequest(); + + @Override + protected SearchRequest buildSearchRequest() { // Indexer is single-threaded, and only place that the ID scheme can get upgraded is doSaveState(), so // we can pass down the boolean value rather than the atomic here - final List docs = IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), - stats, job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get()); - docs.forEach(bulkRequest::add); - assert bulkRequest.requests().size() > 0; - doNextBulk(bulkRequest, - ActionListener.wrap( - bulkResponse -> onBulkResponse(bulkResponse, response.afterKey()), - exc -> finishWithFailure(exc) - ) - ); - } catch(Exception e) { - finishWithFailure(e); - } - } - - private void finishWithFailure(Exception exc) { - doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc)); - } - - private IndexerState finishAndSetState() { - return state.updateAndGet( - prev -> { - switch (prev) { - case INDEXING: - // ready for another job - return IndexerState.STARTED; - - case STOPPING: - // must be started again - return IndexerState.STOPPED; - - case ABORTING: - // abort and exit - onAbort(); - return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first - - case STOPPED: - // No-op. Shouldn't really be possible to get here (should have to go through STOPPING - // first which will be handled) but is harmless to no-op and we don't want to throw exception here - return IndexerState.STOPPED; - - default: - // any other state is unanticipated at this point - throw new IllegalStateException("Rollup job encountered an illegal state [" + prev + "]"); - } - }); - } - - private SearchRequest buildSearchRequest() { final Map position = getPosition(); SearchSourceBuilder searchSource = new SearchSourceBuilder() .size(0) @@ -383,6 +97,16 @@ private SearchRequest buildSearchRequest() { .aggregation(compositeBuilder.aggregateAfter(position)); return new SearchRequest(job.getConfig().getIndexPattern()).source(searchSource); } + + @Override + protected Iteration> doProcess(SearchResponse searchResponse) { + final CompositeAggregation response = searchResponse.getAggregations().get(AGGREGATION_NAME); + + return new Iteration<>( + IndexerUtils.processBuckets(response, job.getConfig().getRollupIndex(), getStats(), + job.getConfig().getGroupConfig(), job.getConfig().getId(), upgradedDocumentID.get()), + response.afterKey(), response.getBuckets().isEmpty()); + } /** * Creates a skeleton {@link CompositeAggregationBuilder} from the provided job config. diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 65362f9ad9dd3..4898e099319b5 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -25,13 +25,13 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.indexing.IndexerJobStats; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; -import org.elasticsearch.xpack.core.rollup.job.IndexerState; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; -import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.rollup.Rollup; @@ -218,7 +218,7 @@ public Status getStatus() { * Gets the stats for this task. * @return The stats of this task */ - public RollupJobStats getStats() { + public IndexerJobStats getStats() { return indexer.getStats(); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index e8c66f7e8c118..5a16740f94611 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -35,12 +35,12 @@ import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; +import org.elasticsearch.xpack.core.indexing.IndexerJobStats; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.MetricConfig; -import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; import org.joda.time.DateTime; import org.mockito.stubbing.Answer; @@ -64,7 +64,7 @@ public class IndexerUtilsTests extends AggregatorTestCase { public void testMissingFields() throws IOException { String indexName = randomAlphaOfLengthBetween(1, 10); - RollupJobStats stats = new RollupJobStats(0, 0, 0, 0); + IndexerJobStats stats = new IndexerJobStats(0, 0, 0, 0); String timestampField = "the_histo"; String valueField = "the_avg"; @@ -126,7 +126,7 @@ public void testMissingFields() throws IOException { public void testCorrectFields() throws IOException { String indexName = randomAlphaOfLengthBetween(1, 10); - RollupJobStats stats= new RollupJobStats(0, 0, 0, 0); + IndexerJobStats stats= new IndexerJobStats(0, 0, 0, 0); String timestampField = "the_histo"; String valueField = "the_avg"; @@ -193,7 +193,7 @@ public void testCorrectFields() throws IOException { public void testNumericTerms() throws IOException { String indexName = randomAlphaOfLengthBetween(1, 10); - RollupJobStats stats= new RollupJobStats(0, 0, 0, 0); + IndexerJobStats stats= new IndexerJobStats(0, 0, 0, 0); String timestampField = "the_histo"; String valueField = "the_avg"; @@ -249,7 +249,7 @@ public void testNumericTerms() throws IOException { public void testEmptyCounts() throws IOException { String indexName = randomAlphaOfLengthBetween(1, 10); - RollupJobStats stats= new RollupJobStats(0, 0, 0, 0); + IndexerJobStats stats= new IndexerJobStats(0, 0, 0, 0); String timestampField = "ts"; String valueField = "the_avg"; @@ -355,7 +355,7 @@ public void testKeyOrderingOldID() { // The content of the config don't actually matter for this test // because the test is just looking at agg keys GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(123L, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", false); + List docs = IndexerUtils.processBuckets(composite, "foo", new IndexerJobStats(), groupConfig, "foo", false); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("1237859798")); } @@ -399,7 +399,7 @@ public void testKeyOrderingNewID() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1L, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", true); + List docs = IndexerUtils.processBuckets(composite, "foo", new IndexerJobStats(), groupConfig, "foo", true); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA")); } @@ -449,7 +449,7 @@ public void testKeyOrderingNewIDLong() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", true); + List docs = IndexerUtils.processBuckets(composite, "foo", new IndexerJobStats(), groupConfig, "foo", true); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw")); } @@ -476,14 +476,14 @@ public void testNullKeys() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), randomHistogramGroupConfig(random()), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig, "foo", randomBoolean()); + List docs = IndexerUtils.processBuckets(composite, "foo", new IndexerJobStats(), groupConfig, "foo", randomBoolean()); assertThat(docs.size(), equalTo(1)); assertFalse(Strings.isNullOrEmpty(docs.get(0).id())); } public void testMissingBuckets() throws IOException { String indexName = randomAlphaOfLengthBetween(1, 10); - RollupJobStats stats= new RollupJobStats(0, 0, 0, 0); + IndexerJobStats stats= new IndexerJobStats(0, 0, 0, 0); String metricField = "metric_field"; String valueField = "value_field"; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java index 6d29ee9f9ba6d..55f1cfbdbb29c 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerIndexingTests.java @@ -50,10 +50,10 @@ import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; -import org.elasticsearch.xpack.core.rollup.job.IndexerState; import org.elasticsearch.xpack.core.rollup.job.MetricConfig; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Before; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java index 955dcbc2beb48..c74ecbadf4fbe 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupIndexerStateTests.java @@ -23,8 +23,8 @@ import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; -import org.elasticsearch.xpack.core.rollup.job.IndexerState; import org.elasticsearch.xpack.core.rollup.job.RollupJob; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import org.mockito.stubbing.Answer; @@ -639,7 +639,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws assertThat(indexer.getStats().getNumPages(), equalTo(1L)); // Note: no docs were indexed - assertThat(indexer.getStats().getNumRollups(), equalTo(0L)); + assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { executor.shutdownNow(); @@ -743,7 +743,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws assertThat(indexer.getStats().getNumPages(), equalTo(1L)); // Note: no docs were indexed - assertThat(indexer.getStats().getNumRollups(), equalTo(0L)); + assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { executor.shutdownNow(); @@ -763,7 +763,7 @@ public void testSearchShardFailure() throws Exception { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); Consumer failureConsumer = e -> { - assertThat(e.getMessage(), startsWith("Shard failures encountered while running indexer for rollup job")); + assertThat(e.getMessage(), startsWith("Shard failures encountered while running indexer for job")); isFinished.set(true); }; @@ -786,7 +786,7 @@ public void testSearchShardFailure() throws Exception { // Note: no pages processed, no docs were indexed assertThat(indexer.getStats().getNumPages(), equalTo(0L)); - assertThat(indexer.getStats().getNumRollups(), equalTo(0L)); + assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { executor.shutdownNow(); @@ -896,7 +896,7 @@ protected void doNextBulk(BulkRequest request, ActionListener next assertThat(indexer.getStats().getNumPages(), equalTo(1L)); // Note: no docs were indexed - assertThat(indexer.getStats().getNumRollups(), equalTo(0L)); + assertThat(indexer.getStats().getOutputDocuments(), equalTo(0L)); assertTrue(indexer.abort()); } finally { executor.shutdownNow(); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 13290f09e8eb8..c5f20b06a2314 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -19,11 +19,11 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; -import org.elasticsearch.xpack.core.rollup.job.IndexerState; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml index 298cf27fa2f9d..d157fd61138d6 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/delete_job.yml @@ -66,7 +66,7 @@ setup: stats: pages_processed: 0 documents_processed: 0 - rollups_indexed: 0 + documents_indexed: 0 trigger_count: 0 status: job_state: "stopped" @@ -113,7 +113,7 @@ setup: stats: pages_processed: 0 documents_processed: 0 - rollups_indexed: 0 + documents_indexed: 0 trigger_count: 0 status: job_state: "stopped" @@ -160,7 +160,7 @@ setup: stats: pages_processed: 0 documents_processed: 0 - rollups_indexed: 0 + documents_indexed: 0 trigger_count: 0 status: job_state: "stopped" diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml index f3fa8114ddbd0..ec2c6f7581038 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/get_jobs.yml @@ -67,7 +67,7 @@ setup: stats: pages_processed: 0 documents_processed: 0 - rollups_indexed: 0 + documents_indexed: 0 trigger_count: 0 status: job_state: "stopped" @@ -178,7 +178,7 @@ setup: stats: pages_processed: 0 documents_processed: 0 - rollups_indexed: 0 + documents_indexed: 0 trigger_count: 0 status: job_state: "stopped" @@ -204,7 +204,7 @@ setup: stats: pages_processed: 0 documents_processed: 0 - rollups_indexed: 0 + documents_indexed: 0 trigger_count: 0 status: job_state: "stopped" diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml index 516be25be2a2d..e4d635bbe3995 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/put_job.yml @@ -67,7 +67,7 @@ setup: stats: pages_processed: 0 documents_processed: 0 - rollups_indexed: 0 + documents_indexed: 0 trigger_count: 0 status: job_state: "stopped" diff --git a/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java index 43ad4dc0a45a2..a26f3cfb2b213 100644 --- a/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java +++ b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java @@ -156,7 +156,7 @@ public void testBigRollup() throws Exception { Map job = getJob(getRollupJobResponse, "rollup-job-test"); if (job != null) { assertThat(ObjectPath.eval("status.job_state", job), equalTo("started")); - assertThat(ObjectPath.eval("stats.rollups_indexed", job), equalTo(41)); + assertThat(ObjectPath.eval("stats.documents_indexed", job), equalTo(41)); } }, 30L, TimeUnit.SECONDS);