diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ef8a3edc61722..194aaf5c484ff 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -12,14 +12,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -487,13 +485,12 @@ private void onSearchResponse(SearchResponse searchResponse) { return; } - final List docs = iterationResult.getToIndex(); + final BulkRequest bulkRequest = new BulkRequest(); + iterationResult.getToIndex().forEach(bulkRequest::add); + stats.markEndProcessing(); // an iteration result might return an empty set of documents to be indexed - if (docs.isEmpty() == false) { - final BulkRequest bulkRequest = new BulkRequest(); - docs.forEach(bulkRequest::add); - stats.markEndProcessing(); + if (bulkRequest.numberOfActions() > 0) { stats.markStartIndexing(); doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> { // TODO we should check items in the response and move after accordingly to @@ -512,7 +509,6 @@ private void onSearchResponse(SearchResponse searchResponse) { onBulkResponse(bulkResponse, newPosition); }, this::finishWithIndexingFailure)); } else { - stats.markEndProcessing(); // no documents need to be indexed, continue with search try { JobPosition newPosition = iterationResult.getPosition(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java index f397cb1143a9b..47906e96b672e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.index.IndexRequest; -import java.util.List; +import java.util.stream.Stream; /** * Result object to hold the result of 1 iteration of iterative indexing. @@ -19,18 +19,18 @@ public class IterationResult { private final boolean isDone; private final JobPosition position; - private final List toIndex; + private final Stream toIndex; /** * Constructor for the result of 1 iteration. * - * @param toIndex the list of requests to be indexed + * @param toIndex the stream of requests to be indexed * @param position the extracted, persistable position of the job required for the search phase * @param isDone true if source is exhausted and job should go to sleep * * Note: toIndex.empty() != isDone due to possible filtering in the specific implementation */ - public IterationResult(List toIndex, JobPosition position, boolean isDone) { + public IterationResult(Stream toIndex, JobPosition position, boolean isDone) { this.toIndex = toIndex; this.position = position; this.isDone = isDone; @@ -53,11 +53,11 @@ public JobPosition getPosition() { } /** - * List of requests to be passed to bulk indexing. + * Stream of requests to be passed to bulk indexing. * - * @return List of index requests. + * @return Stream of index requests. */ - public List getToIndex() { + public Stream getToIndex() { return toIndex; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 74f1e8d371d69..263870b1ce759 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -17,8 +17,8 @@ import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; @@ -74,7 +75,7 @@ protected IterationResult doProcess(SearchResponse searchResponse) { assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished); assertThat(step, equalTo(2)); ++step; - return new IterationResult<>(Collections.emptyList(), 3, true); + return new IterationResult<>(Stream.empty(), 3, true); } private void awaitForLatch() { @@ -191,13 +192,13 @@ protected IterationResult doProcess(SearchResponse searchResponse) { ++processOps; if (processOps == 5) { - return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true); + return new IterationResult<>(Stream.of(new IndexRequest()), processOps, true); } else if (processOps % 2 == 0) { - return new IterationResult<>(Collections.emptyList(), processOps, false); + return new IterationResult<>(Stream.empty(), processOps, false); } - return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false); + return new IterationResult<>(Stream.of(new IndexRequest()), processOps, false); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java index 391ad1315aa50..d96ade3884e2e 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.stream.Collectors; +import java.util.stream.Stream; /** * These utilities are used to convert agg responses into a set of rollup documents. @@ -46,10 +46,10 @@ class IndexerUtils { * @param groupConfig The grouping configuration for the job * @param jobId The ID for the job * @param isUpgradedDocID `true` if this job is using the new ID scheme - * @return A list of rolled documents derived from the response + * @return A stream of rolled documents derived from the response */ - static List processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats, - GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) { + static Stream processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats, + GroupConfig groupConfig, String jobId, boolean isUpgradedDocID) { logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]"); return agg.getBuckets().stream().map(b ->{ @@ -79,7 +79,7 @@ static List processBuckets(CompositeAggregation agg, String rollup IndexRequest request = new IndexRequest(rollupIndex, RollupField.TYPE_NAME, idGenerator.getID()); request.source(doc); return request; - }).collect(Collectors.toList()); + }); } private static void processKeys(Map keys, Map doc, diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 52a3d72ae2489..9395c83072854 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName; @@ -142,7 +143,7 @@ protected IterationResult> doProcess(SearchResponse searchRe if (response.getBuckets().isEmpty()) { // do not reset the position as we want to continue from where we stopped - return new IterationResult<>(Collections.emptyList(), getPosition(), true); + return new IterationResult<>(Stream.empty(), getPosition(), true); } return new IterationResult<>( diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index 0978822f55a80..f7487626ce6aa 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -33,8 +33,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; import org.elasticsearch.xpack.core.rollup.RollupField; @@ -55,6 +55,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomDateHistogramGroupConfig; @@ -113,7 +114,8 @@ public void testMissingFields() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -174,7 +176,8 @@ public void testCorrectFields() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -227,7 +230,8 @@ public void testNumericTerms() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -287,7 +291,8 @@ public void testEmptyCounts() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -339,7 +344,8 @@ public void testKeyOrderingOldID() { // The content of the config don't actually matter for this test // because the test is just looking at agg keys GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(123L, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", false); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", false) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("1237859798")); } @@ -383,7 +389,8 @@ public void testKeyOrderingNewID() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1L, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA")); } @@ -433,7 +440,8 @@ public void testKeyOrderingNewIDLong() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo", true) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw")); } @@ -461,7 +469,7 @@ public void testNullKeys() { GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), randomHistogramGroupConfig(random()), null); List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), - groupConfig, "foo", randomBoolean()); + groupConfig, "foo", randomBoolean()).collect(Collectors.toList()); assertThat(docs.size(), equalTo(1)); assertFalse(Strings.isNullOrEmpty(docs.get(0).id())); } @@ -518,7 +526,8 @@ public void testMissingBuckets() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(6)); for (IndexRequest doc : docs) { @@ -589,7 +598,8 @@ public void testTimezone() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean()) + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(2)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java index b242ad731d043..0ba82d7513ddc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java @@ -209,6 +209,7 @@ void preview( * @param destinationPipeline the destination pipeline * @param fieldMappings field mappings for the destination * @param stats a stats object to record/collect stats + * @param progress a progress object to record/collect progress information * @return a tuple with the stream of index requests and the cursor */ Tuple, Map> processSearchResponse( @@ -216,6 +217,7 @@ Tuple, Map> processSearchResponse( String destinationIndex, String destinationPipeline, Map fieldMappings, - TransformIndexerStats stats + TransformIndexerStats stats, + TransformProgress progress ); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index c050c959f45a3..8379d77d60b05 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -52,13 +52,11 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class TransformIndexer extends AsyncTwoPhaseIndexer { @@ -146,7 +144,7 @@ public TransformIndexer( this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); this.auditor = transformServices.getAuditor(); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); - this.progress = transformProgress; + this.progress = progress != null ? progress : new TransformProgress(); this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint"); this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint"); this.context = ExceptionsHelper.requireNonNull(context, "context"); @@ -297,10 +295,10 @@ protected void onStart(long now, ActionListener listener) { doGetInitialProgress(request, ActionListener.wrap(response -> { function.getInitialProgressFromResponse(response, ActionListener.wrap(newProgress -> { logger.trace("[{}] reset the progress from [{}] to [{}].", getJobId(), progress, newProgress); - progress = newProgress; + progress = newProgress != null ? newProgress : new TransformProgress(); finalListener.onResponse(null); }, failure -> { - progress = null; + progress = new TransformProgress(); logger.warn( new ParameterizedMessage("[{}] unable to load progress information for task.", getJobId()), failure @@ -308,7 +306,7 @@ protected void onStart(long now, ActionListener listener) { finalListener.onResponse(null); })); }, failure -> { - progress = null; + progress = new TransformProgress(); logger.warn(new ParameterizedMessage("[{}] unable to load progress information for task.", getJobId()), failure); finalListener.onResponse(null); })); @@ -516,19 +514,16 @@ private void finalizeCheckpoint(ActionListener listener) { // NOTE: this method is called in the same thread as the processing thread. // Theoretically, there should not be a race condition with updating progress here. // NOTE 2: getPercentComplete should only NOT be null on the first (batch) checkpoint - if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { + if (progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed()); } if (lastCheckpoint != null) { long docsIndexed = 0; long docsProcessed = 0; - // This should not happen as we simply create a new one when we reach continuous checkpoints - // but this is a paranoid `null` check - if (progress != null) { - docsIndexed = progress.getDocumentsIndexed(); - docsProcessed = progress.getDocumentsProcessed(); - } + docsIndexed = progress.getDocumentsIndexed(); + docsProcessed = progress.getDocumentsProcessed(); + long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp(); getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed); } @@ -872,19 +867,18 @@ private void sourceHasChanged(ActionListener hasChangedListener) { } private IterationResult processBuckets(final SearchResponse searchResponse) { - long docsBeforeProcess = getStats().getNumDocuments(); - Tuple, Map> indexRequestStreamAndCursor = function.processSearchResponse( searchResponse, getConfig().getDestination().getIndex(), getConfig().getDestination().getPipeline(), getFieldMappings(), - getStats() + getStats(), + progress ); if (indexRequestStreamAndCursor == null || indexRequestStreamAndCursor.v1() == null) { if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || changeCollector.queryForChanges() == false) { - return new IterationResult<>(Collections.emptyList(), null, true); + return new IterationResult<>(Stream.empty(), null, true); } // cleanup changed Buckets @@ -894,11 +888,7 @@ private IterationResult processBuckets(final SearchRes runState = RunState.IDENTIFY_CHANGES; // advance the cursor for changed bucket detection - return new IterationResult<>( - Collections.emptyList(), - new TransformIndexerPosition(null, nextChangeCollectorBucketPosition), - false - ); + return new IterationResult<>(Stream.empty(), new TransformIndexerPosition(null, nextChangeCollectorBucketPosition), false); } Stream indexRequestStream = indexRequestStreamAndCursor.v1(); @@ -908,28 +898,7 @@ private IterationResult processBuckets(final SearchRes oldPosition != null ? getPosition().getBucketsPosition() : null ); - List indexRequests = indexRequestStream.collect(Collectors.toList()); - if (logger.isDebugEnabled()) { - if (indexRequests.isEmpty()) { - logger.debug("[{}] processed buckets, nothing to be indexed", getJobId()); - } else { - logger.debug( - "[{}] processed buckets and created [{}] documents to be indexed, 1st document: [{}]", - getJobId(), - indexRequests.size(), - indexRequests.get(0) - ); - } - } - IterationResult result = new IterationResult<>(indexRequests, newPosition, indexRequests.isEmpty()); - - // NOTE: progress is also mutated in onFinish - if (progress != null) { - progress.incrementDocsProcessed(getStats().getNumDocuments() - docsBeforeProcess); - progress.incrementDocsIndexed(result.getToIndex().size()); - } - - return result; + return new IterationResult<>(indexRequestStream, newPosition, false); } private IterationResult processChangedBuckets(final SearchResponse searchResponse) { @@ -937,13 +906,13 @@ private IterationResult processChangedBuckets(final Se if (nextChangeCollectorBucketPosition == null) { changeCollector.clear(); - return new IterationResult<>(Collections.emptyList(), null, true); + return new IterationResult<>(Stream.empty(), null, true); } // reset the runState to fetch the partial updates next runState = RunState.APPLY_RESULTS; - return new IterationResult<>(Collections.emptyList(), getPosition(), false); + return new IterationResult<>(Stream.empty(), getPosition(), false); } protected QueryBuilder buildFilterQuery() { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java index 64b94b1f7cc8a..a47a3657f2efd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java @@ -80,15 +80,17 @@ public void preview( final Aggregations aggregations = r.getAggregations(); if (aggregations == null) { listener.onFailure( - new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST)); + new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST) + ); return; } final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); TransformIndexerStats stats = new TransformIndexerStats(); + TransformProgress progress = new TransformProgress(); - List> docs = extractResults(agg, fieldTypeMap, stats) - .map(this::documentTransformationFunction) - .collect(Collectors.toList()); + List> docs = extractResults(agg, fieldTypeMap, stats, progress).map( + this::documentTransformationFunction + ).collect(Collectors.toList()); listener.onResponse(docs); } catch (AggregationResultUtils.AggregationExtractionException extractionException) { @@ -108,11 +110,10 @@ public void validateQuery(Client client, SourceConfig sourceConfig, ActionListen } if (response.status() != RestStatus.OK) { listener.onFailure( - new ValidationException() - .addValidationError( - new ParameterizedMessage("Unexpected status from response of test query: {}", response.status()) - .getFormattedMessage() - ) + new ValidationException().addValidationError( + new ParameterizedMessage("Unexpected status from response of test query: {}", response.status()) + .getFormattedMessage() + ) ); return; } @@ -123,10 +124,9 @@ public void validateQuery(Client client, SourceConfig sourceConfig, ActionListen ? ((ElasticsearchException) unwrapped).status() : RestStatus.SERVICE_UNAVAILABLE; listener.onFailure( - new ValidationException(unwrapped) - .addValidationError( - new ParameterizedMessage("Failed to test query, received status: {}", status).getFormattedMessage() - ) + new ValidationException(unwrapped).addValidationError( + new ParameterizedMessage("Failed to test query, received status: {}", status).getFormattedMessage() + ) ); })); } @@ -137,7 +137,8 @@ public Tuple, Map> processSearchResponse( String destinationIndex, String destinationPipeline, Map fieldTypeMap, - TransformIndexerStats stats + TransformIndexerStats stats, + TransformProgress progress ) { Aggregations aggregations = searchResponse.getAggregations(); @@ -152,16 +153,15 @@ public Tuple, Map> processSearchResponse( return null; } - Stream indexRequestStream = extractResults(compositeAgg, fieldTypeMap, stats) - .map(doc -> { - String docId = (String)doc.remove(TransformField.DOCUMENT_ID_FIELD); - return DocumentConversionUtils.convertDocumentToIndexRequest( - docId, - documentTransformationFunction(doc), - destinationIndex, - destinationPipeline - ); - }); + Stream indexRequestStream = extractResults(compositeAgg, fieldTypeMap, stats, progress).map(doc -> { + String docId = (String) doc.remove(TransformField.DOCUMENT_ID_FIELD); + return DocumentConversionUtils.convertDocumentToIndexRequest( + docId, + documentTransformationFunction(doc), + destinationIndex, + destinationPipeline + ); + }); return Tuple.tuple(indexRequestStream, compositeAgg.afterKey()); } @@ -171,17 +171,15 @@ public Tuple, Map> processSearchResponse( protected abstract Stream> extractResults( CompositeAggregation agg, Map fieldTypeMap, - TransformIndexerStats transformIndexerStats + TransformIndexerStats transformIndexerStats, + TransformProgress progress ); private SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() - .query(sourceConfig.getQueryConfig().getQuery()) + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(sourceConfig.getQueryConfig().getQuery()) .runtimeMappings(sourceConfig.getRuntimeMappings()); buildSearchQuery(sourceBuilder, null, pageSize); - return new SearchRequest(sourceConfig.getIndex()) - .source(sourceBuilder) - .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + return new SearchRequest(sourceConfig.getIndex()).source(sourceBuilder).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java index 3d0a82e3a50a1..1930515d6928d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig; import org.elasticsearch.xpack.transform.transforms.IDGenerator; import org.elasticsearch.xpack.transform.transforms.common.AbstractCompositeAggFunction; @@ -52,14 +53,13 @@ public Latest(LatestConfig config) { } private static CompositeAggregationBuilder createCompositeAggregation(LatestConfig config) { - List> sources = - config.getUniqueKey().stream() - .map(field -> new TermsValuesSourceBuilder(field).field(field).missingBucket(true)) - .collect(toList()); - TopHitsAggregationBuilder topHitsAgg = - AggregationBuilders.topHits(TOP_HITS_AGGREGATION_NAME) - .size(1) // we are only interested in the top-1 - .sorts(config.getSorts()); // we copy the sort config directly from the function config + List> sources = config.getUniqueKey() + .stream() + .map(field -> new TermsValuesSourceBuilder(field).field(field).missingBucket(true)) + .collect(toList()); + TopHitsAggregationBuilder topHitsAgg = AggregationBuilders.topHits(TOP_HITS_AGGREGATION_NAME) + .size(1) // we are only interested in the top-1 + .sorts(config.getSorts()); // we copy the sort config directly from the function config return AggregationBuilders.composite(COMPOSITE_AGGREGATION_NAME, sources).subAggregation(topHitsAgg); } @@ -73,15 +73,22 @@ public ChangeCollector buildChangeCollector(String synchronizationField) { return new LatestChangeCollector(synchronizationField); } - private static Map convertBucketToDocument(CompositeAggregation.Bucket bucket, - LatestConfig config, - TransformIndexerStats transformIndexerStats) { + private static Map convertBucketToDocument( + CompositeAggregation.Bucket bucket, + LatestConfig config, + TransformIndexerStats transformIndexerStats, + TransformProgress progress + ) { transformIndexerStats.incrementNumDocuments(bucket.getDocCount()); + progress.incrementDocsProcessed(bucket.getDocCount()); + progress.incrementDocsIndexed(1L); TopHits topHits = bucket.getAggregations().get(TOP_HITS_AGGREGATION_NAME); if (topHits.getHits().getHits().length != 1) { throw new ElasticsearchException( - "Unexpected number of hits in the top_hits aggregation result. Wanted: 1, was: {}", topHits.getHits().getHits().length); + "Unexpected number of hits in the top_hits aggregation result. Wanted: 1, was: {}", + topHits.getHits().getHits().length + ); } Map document = topHits.getHits().getHits()[0].getSourceAsMap(); @@ -121,9 +128,10 @@ public SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilde protected Stream> extractResults( CompositeAggregation agg, Map fieldTypeMap, - TransformIndexerStats transformIndexerStats + TransformIndexerStats transformIndexerStats, + TransformProgress transformProgress ) { - return agg.getBuckets().stream().map(bucket -> convertBucketToDocument(bucket, config, transformIndexerStats)); + return agg.getBuckets().stream().map(bucket -> convertBucketToDocument(bucket, config, transformIndexerStats, transformProgress)); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java index f4d6e9e5064b0..7234f9e9f8f59 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.spatial.search.aggregations.GeoShapeMetricAggregation; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; @@ -98,10 +99,14 @@ public static Stream> extractCompositeAggregationResults( Collection pipelineAggs, Map fieldTypeMap, TransformIndexerStats stats, + TransformProgress progress, boolean datesAsEpoch ) { return agg.getBuckets().stream().map(bucket -> { stats.incrementNumDocuments(bucket.getDocCount()); + progress.incrementDocsProcessed(bucket.getDocCount()); + progress.incrementDocsIndexed(1L); + Map document = new HashMap<>(); // generator to create unique but deterministic document ids, so we // - do not create duplicates if we re-run after failure @@ -450,11 +455,8 @@ static class GeoShapeMetricAggExtractor implements AggValueExtractor { @Override public Object value(Aggregation aggregation, Map fieldTypeMap, String lookupFieldPrefix) { - assert aggregation instanceof GeoShapeMetricAggregation : "Unexpected type [" - + aggregation.getClass().getName() - + "] for aggregation [" - + aggregation.getName() - + "]"; + assert aggregation instanceof GeoShapeMetricAggregation + : "Unexpected type [" + aggregation.getClass().getName() + "] for aggregation [" + aggregation.getName() + "]"; return ((GeoShapeMetricAggregation) aggregation).geoJSONGeometry(); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index d84bcbe06f68f..1bbf932231ddc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import org.elasticsearch.xpack.transform.Transform; @@ -127,7 +128,8 @@ protected Map documentTransformationFunction(Map protected Stream> extractResults( CompositeAggregation agg, Map fieldTypeMap, - TransformIndexerStats transformIndexerStats + TransformIndexerStats transformIndexerStats, + TransformProgress transformProgress ) { // defines how dates are written, if not specified in settings // < 7.11 as epoch millis @@ -144,6 +146,7 @@ protected Stream> extractResults( config.getAggregationConfig().getPipelineAggregatorFactories(), fieldTypeMap, transformIndexerStats, + transformProgress, datesAsEpoch ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index e6390a3b261f5..14e972c2c6a29 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -66,6 +66,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; @@ -415,7 +416,7 @@ public void testDoProcessAggNullCheck() { ); IterationResult newPosition = indexer.doProcess(searchResponse); - assertThat(newPosition.getToIndex(), is(empty())); + assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty())); assertThat(newPosition.getPosition(), is(nullValue())); assertThat(newPosition.isDone(), is(true)); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 12ac1f0f6a5d7..9b12cd5f456e7 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -63,6 +63,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; @@ -232,7 +233,11 @@ protected void doSaveState(IndexerState state, TransformIndexerPosition position protected IterationResult doProcess(SearchResponse searchResponse) { // pretend that we processed 10k documents for each call getStats().incrementNumDocuments(10_000); - return new IterationResult<>(Collections.singletonList(new IndexRequest()), new TransformIndexerPosition(null, null), false); + return new IterationResult<>( + Stream.of(new IndexRequest()), + new TransformIndexerPosition(null, null), + false + ); } public boolean waitingForNextSearch() { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 3ef07eec169f3..fb2579229f1fd 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -60,6 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; @@ -247,7 +248,7 @@ protected IterationResult doProcess(SearchResponse sea // pretend that we processed 10k documents for each call getStats().incrementNumDocuments(10_000); return new IterationResult<>( - Collections.singletonList(new IndexRequest()), + Stream.of(new IndexRequest()), new TransformIndexerPosition(null, null), numberOfLoops == 0 ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java index d4a1ce3dbb7ed..fcbfc31026cc0 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java @@ -7,12 +7,12 @@ package org.elasticsearch.xpack.transform.transforms.pivot; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; @@ -66,6 +66,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils.BucketKeyExtractor; @@ -647,6 +648,7 @@ public void testExtractCompositeAggregationResultsDocIDs() throws IOException { ) ); TransformIndexerStats stats = new TransformIndexerStats(); + TransformProgress progress = new TransformProgress(); Map fieldTypeMap = asStringMap(aggName, "double", targetField, "keyword", targetField2, "keyword"); @@ -656,7 +658,8 @@ public void testExtractCompositeAggregationResultsDocIDs() throws IOException { Collections.emptyList(), inputFirstRun, fieldTypeMap, - stats + stats, + progress ); List> resultSecondRun = runExtraction( groupBy, @@ -664,7 +667,8 @@ public void testExtractCompositeAggregationResultsDocIDs() throws IOException { Collections.emptyList(), inputSecondRun, fieldTypeMap, - stats + stats, + progress ); assertNotEquals(resultFirstRun, resultSecondRun); @@ -1074,6 +1078,7 @@ private void executeTest( long expectedDocCounts ) throws IOException { TransformIndexerStats stats = new TransformIndexerStats(); + TransformProgress progress = new TransformProgress(); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); @@ -1083,7 +1088,8 @@ private void executeTest( pipelineAggregationBuilders, input, fieldTypeMap, - stats + stats, + progress ); // remove the document ids and test uniqueness @@ -1102,7 +1108,8 @@ private List> runExtraction( Collection pipelineAggregationBuilders, Map input, Map fieldTypeMap, - TransformIndexerStats stats + TransformIndexerStats stats, + TransformProgress progress ) throws IOException { XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); @@ -1117,6 +1124,7 @@ private List> runExtraction( pipelineAggregationBuilders, fieldTypeMap, stats, + progress, true ).collect(Collectors.toList()); }