From 89254bece2d4b0d2be03f545207c956b2efcca2b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 19 Jul 2021 16:10:42 +0200 Subject: [PATCH 1/4] remove an unnecessary indirection and refactor progress tracking --- .../core/indexing/AsyncTwoPhaseIndexer.java | 14 +++---- .../xpack/core/indexing/IterationResult.java | 14 +++---- .../indexing/AsyncTwoPhaseIndexerTests.java | 11 +++--- .../xpack/rollup/job/IndexerUtils.java | 8 ++-- .../xpack/rollup/job/RollupIndexer.java | 3 +- .../xpack/rollup/job/IndexerUtilsTests.java | 22 ++++++----- .../xpack/transform/transforms/Function.java | 4 +- .../transforms/TransformIndexer.java | 38 ++++--------------- .../common/AbstractCompositeAggFunction.java | 10 +++-- .../transform/transforms/latest/Latest.java | 13 +++++-- .../pivot/AggregationResultUtils.java | 7 ++++ .../transform/transforms/pivot/Pivot.java | 5 ++- .../TransformIndexerFailureHandlingTests.java | 3 +- .../TransformIndexerStateTests.java | 6 ++- .../transforms/TransformIndexerTests.java | 2 +- .../pivot/AggregationResultUtilsTests.java | 1 + 16 files changed, 83 insertions(+), 78 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ef8a3edc61722..194aaf5c484ff 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -12,14 +12,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -487,13 +485,12 @@ private void onSearchResponse(SearchResponse searchResponse) { return; } - final List docs = iterationResult.getToIndex(); + final BulkRequest bulkRequest = new BulkRequest(); + iterationResult.getToIndex().forEach(bulkRequest::add); + stats.markEndProcessing(); // an iteration result might return an empty set of documents to be indexed - if (docs.isEmpty() == false) { - final BulkRequest bulkRequest = new BulkRequest(); - docs.forEach(bulkRequest::add); - stats.markEndProcessing(); + if (bulkRequest.numberOfActions() > 0) { stats.markStartIndexing(); doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> { // TODO we should check items in the response and move after accordingly to @@ -512,7 +509,6 @@ private void onSearchResponse(SearchResponse searchResponse) { onBulkResponse(bulkResponse, newPosition); }, this::finishWithIndexingFailure)); } else { - stats.markEndProcessing(); // no documents need to be indexed, continue with search try { JobPosition newPosition = iterationResult.getPosition(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java index f397cb1143a9b..47906e96b672e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/IterationResult.java @@ -9,7 +9,7 @@ import org.elasticsearch.action.index.IndexRequest; -import java.util.List; +import java.util.stream.Stream; /** * Result object to hold the result of 1 iteration of iterative indexing. @@ -19,18 +19,18 @@ public class IterationResult { private final boolean isDone; private final JobPosition position; - private final List toIndex; + private final Stream toIndex; /** * Constructor for the result of 1 iteration. * - * @param toIndex the list of requests to be indexed + * @param toIndex the stream of requests to be indexed * @param position the extracted, persistable position of the job required for the search phase * @param isDone true if source is exhausted and job should go to sleep * * Note: toIndex.empty() != isDone due to possible filtering in the specific implementation */ - public IterationResult(List toIndex, JobPosition position, boolean isDone) { + public IterationResult(Stream toIndex, JobPosition position, boolean isDone) { this.toIndex = toIndex; this.position = position; this.isDone = isDone; @@ -53,11 +53,11 @@ public JobPosition getPosition() { } /** - * List of requests to be passed to bulk indexing. + * Stream of requests to be passed to bulk indexing. * - * @return List of index requests. + * @return Stream of index requests. */ - public List getToIndex() { + public Stream getToIndex() { return toIndex; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 74f1e8d371d69..64a71c61baefe 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -17,8 +17,8 @@ import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; @@ -74,7 +75,7 @@ protected IterationResult doProcess(SearchResponse searchResponse) { assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished); assertThat(step, equalTo(2)); ++step; - return new IterationResult<>(Collections.emptyList(), 3, true); + return new IterationResult<>(Stream.empty(), 3, true); } private void awaitForLatch() { @@ -191,13 +192,13 @@ protected IterationResult doProcess(SearchResponse searchResponse) { ++processOps; if (processOps == 5) { - return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true); + return new IterationResult<>(Collections.singletonList(new IndexRequest()).stream(), processOps, true); } else if (processOps % 2 == 0) { - return new IterationResult<>(Collections.emptyList(), processOps, false); + return new IterationResult<>(Stream.empty(), processOps, false); } - return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false); + return new IterationResult<>(Collections.singletonList(new IndexRequest()).stream(), processOps, false); } @Override diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java index 44d7af32b1e33..2aa0dbe1e75e1 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.stream.Collectors; +import java.util.stream.Stream; /** * These utilities are used to convert agg responses into a set of rollup documents. @@ -45,9 +45,9 @@ class IndexerUtils { * @param stats The stats accumulator for this job's task * @param groupConfig The grouping configuration for the job * @param jobId The ID for the job - * @return A list of rolled documents derived from the response + * @return A stream of rolled documents derived from the response */ - static List processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats, + static Stream processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats, GroupConfig groupConfig, String jobId) { logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]"); @@ -72,7 +72,7 @@ static List processBuckets(CompositeAggregation agg, String rollup IndexRequest request = new IndexRequest(rollupIndex).id(idGenerator.getID()); request.source(doc); return request; - }).collect(Collectors.toList()); + }); } private static void processKeys(Map keys, Map doc, diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index 39ee3a0faa52e..8c2db00ddc787 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName; @@ -130,7 +131,7 @@ protected IterationResult> doProcess(SearchResponse searchRe if (response.getBuckets().isEmpty()) { // do not reset the position as we want to continue from where we stopped - return new IterationResult<>(Collections.emptyList(), getPosition(), true); + return new IterationResult<>(Stream.empty(), getPosition(), true); } return new IterationResult<>( diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index 106fd1ce62872..7485be399c400 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -55,6 +55,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomDateHistogramGroupConfig; @@ -113,7 +114,7 @@ public void testMissingFields() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -174,7 +175,7 @@ public void testCorrectFields() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -227,7 +228,7 @@ public void testNumericTerms() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -287,7 +288,7 @@ public void testEmptyCounts() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList()); assertThat(docs.size(), equalTo(numDocs)); for (IndexRequest doc : docs) { @@ -337,7 +338,8 @@ public void testKeyOrdering() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1L, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo") + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA")); } @@ -387,7 +389,8 @@ public void testKeyOrderingLong() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1, "abc"), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo") + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(1)); assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw")); } @@ -414,7 +417,8 @@ public void testNullKeys() { }); GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), randomHistogramGroupConfig(random()), null); - List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo") + .collect(Collectors.toList()); assertThat(docs.size(), equalTo(1)); assertFalse(Strings.isNullOrEmpty(docs.get(0).id())); } @@ -471,7 +475,7 @@ public void testMissingBuckets() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList()); assertThat(docs.size(), equalTo(6)); for (IndexRequest doc : docs) { @@ -542,7 +546,7 @@ public void testTimezone() throws IOException { directory.close(); final GroupConfig groupConfig = randomGroupConfig(random()); - List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo"); + List docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList()); assertThat(docs.size(), equalTo(2)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java index b242ad731d043..0ba82d7513ddc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java @@ -209,6 +209,7 @@ void preview( * @param destinationPipeline the destination pipeline * @param fieldMappings field mappings for the destination * @param stats a stats object to record/collect stats + * @param progress a progress object to record/collect progress information * @return a tuple with the stream of index requests and the cursor */ Tuple, Map> processSearchResponse( @@ -216,6 +217,7 @@ Tuple, Map> processSearchResponse( String destinationIndex, String destinationPipeline, Map fieldMappings, - TransformIndexerStats stats + TransformIndexerStats stats, + TransformProgress progress ); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index c050c959f45a3..6ba3a20fdd413 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -52,13 +52,11 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import java.util.stream.Stream; public abstract class TransformIndexer extends AsyncTwoPhaseIndexer { @@ -872,19 +870,18 @@ private void sourceHasChanged(ActionListener hasChangedListener) { } private IterationResult processBuckets(final SearchResponse searchResponse) { - long docsBeforeProcess = getStats().getNumDocuments(); - Tuple, Map> indexRequestStreamAndCursor = function.processSearchResponse( searchResponse, getConfig().getDestination().getIndex(), getConfig().getDestination().getPipeline(), getFieldMappings(), - getStats() + getStats(), + progress ); if (indexRequestStreamAndCursor == null || indexRequestStreamAndCursor.v1() == null) { if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || changeCollector.queryForChanges() == false) { - return new IterationResult<>(Collections.emptyList(), null, true); + return new IterationResult<>(Stream.empty(), null, true); } // cleanup changed Buckets @@ -895,7 +892,7 @@ private IterationResult processBuckets(final SearchRes // advance the cursor for changed bucket detection return new IterationResult<>( - Collections.emptyList(), + Stream.empty(), new TransformIndexerPosition(null, nextChangeCollectorBucketPosition), false ); @@ -908,28 +905,7 @@ private IterationResult processBuckets(final SearchRes oldPosition != null ? getPosition().getBucketsPosition() : null ); - List indexRequests = indexRequestStream.collect(Collectors.toList()); - if (logger.isDebugEnabled()) { - if (indexRequests.isEmpty()) { - logger.debug("[{}] processed buckets, nothing to be indexed", getJobId()); - } else { - logger.debug( - "[{}] processed buckets and created [{}] documents to be indexed, 1st document: [{}]", - getJobId(), - indexRequests.size(), - indexRequests.get(0) - ); - } - } - IterationResult result = new IterationResult<>(indexRequests, newPosition, indexRequests.isEmpty()); - - // NOTE: progress is also mutated in onFinish - if (progress != null) { - progress.incrementDocsProcessed(getStats().getNumDocuments() - docsBeforeProcess); - progress.incrementDocsIndexed(result.getToIndex().size()); - } - - return result; + return new IterationResult<>(indexRequestStream, newPosition, false); } private IterationResult processChangedBuckets(final SearchResponse searchResponse) { @@ -937,13 +913,13 @@ private IterationResult processChangedBuckets(final Se if (nextChangeCollectorBucketPosition == null) { changeCollector.clear(); - return new IterationResult<>(Collections.emptyList(), null, true); + return new IterationResult<>(Stream.empty(), null, true); } // reset the runState to fetch the partial updates next runState = RunState.APPLY_RESULTS; - return new IterationResult<>(Collections.emptyList(), getPosition(), false); + return new IterationResult<>(Stream.empty(), getPosition(), false); } protected QueryBuilder buildFilterQuery() { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java index 64b94b1f7cc8a..1965a551c7ef3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java @@ -86,7 +86,7 @@ public void preview( final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); TransformIndexerStats stats = new TransformIndexerStats(); - List> docs = extractResults(agg, fieldTypeMap, stats) + List> docs = extractResults(agg, fieldTypeMap, stats, null) .map(this::documentTransformationFunction) .collect(Collectors.toList()); @@ -137,7 +137,8 @@ public Tuple, Map> processSearchResponse( String destinationIndex, String destinationPipeline, Map fieldTypeMap, - TransformIndexerStats stats + TransformIndexerStats stats, + TransformProgress progress ) { Aggregations aggregations = searchResponse.getAggregations(); @@ -152,7 +153,7 @@ public Tuple, Map> processSearchResponse( return null; } - Stream indexRequestStream = extractResults(compositeAgg, fieldTypeMap, stats) + Stream indexRequestStream = extractResults(compositeAgg, fieldTypeMap, stats, progress) .map(doc -> { String docId = (String)doc.remove(TransformField.DOCUMENT_ID_FIELD); return DocumentConversionUtils.convertDocumentToIndexRequest( @@ -171,7 +172,8 @@ public Tuple, Map> processSearchResponse( protected abstract Stream> extractResults( CompositeAggregation agg, Map fieldTypeMap, - TransformIndexerStats transformIndexerStats + TransformIndexerStats transformIndexerStats, + TransformProgress progress ); private SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java index 3d0a82e3a50a1..fe949459e5960 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfig; import org.elasticsearch.xpack.transform.transforms.IDGenerator; import org.elasticsearch.xpack.transform.transforms.common.AbstractCompositeAggFunction; @@ -75,8 +76,13 @@ public ChangeCollector buildChangeCollector(String synchronizationField) { private static Map convertBucketToDocument(CompositeAggregation.Bucket bucket, LatestConfig config, - TransformIndexerStats transformIndexerStats) { + TransformIndexerStats transformIndexerStats, + TransformProgress progress) { transformIndexerStats.incrementNumDocuments(bucket.getDocCount()); + if (progress != null) { + progress.incrementDocsProcessed(bucket.getDocCount()); + progress.incrementDocsIndexed(1L); + } TopHits topHits = bucket.getAggregations().get(TOP_HITS_AGGREGATION_NAME); if (topHits.getHits().getHits().length != 1) { @@ -121,9 +127,10 @@ public SearchSourceBuilder buildSearchQueryForInitialProgress(SearchSourceBuilde protected Stream> extractResults( CompositeAggregation agg, Map fieldTypeMap, - TransformIndexerStats transformIndexerStats + TransformIndexerStats transformIndexerStats, + TransformProgress transformProgress ) { - return agg.getBuckets().stream().map(bucket -> convertBucketToDocument(bucket, config, transformIndexerStats)); + return agg.getBuckets().stream().map(bucket -> convertBucketToDocument(bucket, config, transformIndexerStats, transformProgress)); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java index f4d6e9e5064b0..4a354abe96f08 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.spatial.search.aggregations.GeoShapeMetricAggregation; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.GeoTileGroupSource; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; @@ -98,10 +99,16 @@ public static Stream> extractCompositeAggregationResults( Collection pipelineAggs, Map fieldTypeMap, TransformIndexerStats stats, + TransformProgress progress, boolean datesAsEpoch ) { return agg.getBuckets().stream().map(bucket -> { stats.incrementNumDocuments(bucket.getDocCount()); + if (progress != null) { + progress.incrementDocsProcessed(bucket.getDocCount()); + progress.incrementDocsIndexed(1L); + } + Map document = new HashMap<>(); // generator to create unique but deterministic document ids, so we // - do not create duplicates if we re-run after failure diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index d84bcbe06f68f..1bbf932231ddc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import org.elasticsearch.xpack.transform.Transform; @@ -127,7 +128,8 @@ protected Map documentTransformationFunction(Map protected Stream> extractResults( CompositeAggregation agg, Map fieldTypeMap, - TransformIndexerStats transformIndexerStats + TransformIndexerStats transformIndexerStats, + TransformProgress transformProgress ) { // defines how dates are written, if not specified in settings // < 7.11 as epoch millis @@ -144,6 +146,7 @@ protected Stream> extractResults( config.getAggregationConfig().getPipelineAggregatorFactories(), fieldTypeMap, transformIndexerStats, + transformProgress, datesAsEpoch ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index baee4cae51f0c..5efe488f2156e 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -66,6 +66,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; @@ -415,7 +416,7 @@ public void testDoProcessAggNullCheck() { ); IterationResult newPosition = indexer.doProcess(searchResponse); - assertThat(newPosition.getToIndex(), is(empty())); + assertThat(newPosition.getToIndex().collect(Collectors.toList()), is(empty())); assertThat(newPosition.getPosition(), is(nullValue())); assertThat(newPosition.isDone(), is(true)); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 12ac1f0f6a5d7..c8ff66a808184 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -232,7 +232,11 @@ protected void doSaveState(IndexerState state, TransformIndexerPosition position protected IterationResult doProcess(SearchResponse searchResponse) { // pretend that we processed 10k documents for each call getStats().incrementNumDocuments(10_000); - return new IterationResult<>(Collections.singletonList(new IndexRequest()), new TransformIndexerPosition(null, null), false); + return new IterationResult<>( + Collections.singletonList(new IndexRequest()).stream(), + new TransformIndexerPosition(null, null), + false + ); } public boolean waitingForNextSearch() { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 3ef07eec169f3..9282ca781ea75 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -247,7 +247,7 @@ protected IterationResult doProcess(SearchResponse sea // pretend that we processed 10k documents for each call getStats().incrementNumDocuments(10_000); return new IterationResult<>( - Collections.singletonList(new IndexRequest()), + Collections.singletonList(new IndexRequest()).stream(), new TransformIndexerPosition(null, null), numberOfLoops == 0 ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java index f1aa2d59700f4..ccb2a0cc7ee1e 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java @@ -1100,6 +1100,7 @@ private List> runExtraction( pipelineAggregationBuilders, fieldTypeMap, stats, + null, true ).collect(Collectors.toList()); } From a039c1bf3d4d51a712d64dd51c4ba2ae2abe1ab5 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 19 Jul 2021 16:10:56 +0200 Subject: [PATCH 2/4] apply spotless --- .../transforms/TransformIndexer.java | 6 +-- .../common/AbstractCompositeAggFunction.java | 51 +++++++++---------- .../transform/transforms/latest/Latest.java | 29 ++++++----- .../pivot/AggregationResultUtils.java | 7 +-- .../pivot/AggregationResultUtilsTests.java | 2 +- 5 files changed, 43 insertions(+), 52 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 6ba3a20fdd413..8073b74bbee3d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -891,11 +891,7 @@ private IterationResult processBuckets(final SearchRes runState = RunState.IDENTIFY_CHANGES; // advance the cursor for changed bucket detection - return new IterationResult<>( - Stream.empty(), - new TransformIndexerPosition(null, nextChangeCollectorBucketPosition), - false - ); + return new IterationResult<>(Stream.empty(), new TransformIndexerPosition(null, nextChangeCollectorBucketPosition), false); } Stream indexRequestStream = indexRequestStreamAndCursor.v1(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java index 1965a551c7ef3..f80b9d203b9de 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java @@ -80,15 +80,16 @@ public void preview( final Aggregations aggregations = r.getAggregations(); if (aggregations == null) { listener.onFailure( - new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST)); + new ElasticsearchStatusException("Source indices have been deleted or closed.", RestStatus.BAD_REQUEST) + ); return; } final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); TransformIndexerStats stats = new TransformIndexerStats(); - List> docs = extractResults(agg, fieldTypeMap, stats, null) - .map(this::documentTransformationFunction) - .collect(Collectors.toList()); + List> docs = extractResults(agg, fieldTypeMap, stats, null).map( + this::documentTransformationFunction + ).collect(Collectors.toList()); listener.onResponse(docs); } catch (AggregationResultUtils.AggregationExtractionException extractionException) { @@ -108,11 +109,10 @@ public void validateQuery(Client client, SourceConfig sourceConfig, ActionListen } if (response.status() != RestStatus.OK) { listener.onFailure( - new ValidationException() - .addValidationError( - new ParameterizedMessage("Unexpected status from response of test query: {}", response.status()) - .getFormattedMessage() - ) + new ValidationException().addValidationError( + new ParameterizedMessage("Unexpected status from response of test query: {}", response.status()) + .getFormattedMessage() + ) ); return; } @@ -123,10 +123,9 @@ public void validateQuery(Client client, SourceConfig sourceConfig, ActionListen ? ((ElasticsearchException) unwrapped).status() : RestStatus.SERVICE_UNAVAILABLE; listener.onFailure( - new ValidationException(unwrapped) - .addValidationError( - new ParameterizedMessage("Failed to test query, received status: {}", status).getFormattedMessage() - ) + new ValidationException(unwrapped).addValidationError( + new ParameterizedMessage("Failed to test query, received status: {}", status).getFormattedMessage() + ) ); })); } @@ -153,16 +152,15 @@ public Tuple, Map> processSearchResponse( return null; } - Stream indexRequestStream = extractResults(compositeAgg, fieldTypeMap, stats, progress) - .map(doc -> { - String docId = (String)doc.remove(TransformField.DOCUMENT_ID_FIELD); - return DocumentConversionUtils.convertDocumentToIndexRequest( - docId, - documentTransformationFunction(doc), - destinationIndex, - destinationPipeline - ); - }); + Stream indexRequestStream = extractResults(compositeAgg, fieldTypeMap, stats, progress).map(doc -> { + String docId = (String) doc.remove(TransformField.DOCUMENT_ID_FIELD); + return DocumentConversionUtils.convertDocumentToIndexRequest( + docId, + documentTransformationFunction(doc), + destinationIndex, + destinationPipeline + ); + }); return Tuple.tuple(indexRequestStream, compositeAgg.afterKey()); } @@ -177,13 +175,10 @@ protected abstract Stream> extractResults( ); private SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map position, int pageSize) { - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() - .query(sourceConfig.getQueryConfig().getQuery()) + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(sourceConfig.getQueryConfig().getQuery()) .runtimeMappings(sourceConfig.getRuntimeMappings()); buildSearchQuery(sourceBuilder, null, pageSize); - return new SearchRequest(sourceConfig.getIndex()) - .source(sourceBuilder) - .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + return new SearchRequest(sourceConfig.getIndex()).source(sourceBuilder).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java index fe949459e5960..c866fd05f6331 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java @@ -53,14 +53,13 @@ public Latest(LatestConfig config) { } private static CompositeAggregationBuilder createCompositeAggregation(LatestConfig config) { - List> sources = - config.getUniqueKey().stream() - .map(field -> new TermsValuesSourceBuilder(field).field(field).missingBucket(true)) - .collect(toList()); - TopHitsAggregationBuilder topHitsAgg = - AggregationBuilders.topHits(TOP_HITS_AGGREGATION_NAME) - .size(1) // we are only interested in the top-1 - .sorts(config.getSorts()); // we copy the sort config directly from the function config + List> sources = config.getUniqueKey() + .stream() + .map(field -> new TermsValuesSourceBuilder(field).field(field).missingBucket(true)) + .collect(toList()); + TopHitsAggregationBuilder topHitsAgg = AggregationBuilders.topHits(TOP_HITS_AGGREGATION_NAME) + .size(1) // we are only interested in the top-1 + .sorts(config.getSorts()); // we copy the sort config directly from the function config return AggregationBuilders.composite(COMPOSITE_AGGREGATION_NAME, sources).subAggregation(topHitsAgg); } @@ -74,10 +73,12 @@ public ChangeCollector buildChangeCollector(String synchronizationField) { return new LatestChangeCollector(synchronizationField); } - private static Map convertBucketToDocument(CompositeAggregation.Bucket bucket, - LatestConfig config, - TransformIndexerStats transformIndexerStats, - TransformProgress progress) { + private static Map convertBucketToDocument( + CompositeAggregation.Bucket bucket, + LatestConfig config, + TransformIndexerStats transformIndexerStats, + TransformProgress progress + ) { transformIndexerStats.incrementNumDocuments(bucket.getDocCount()); if (progress != null) { progress.incrementDocsProcessed(bucket.getDocCount()); @@ -87,7 +88,9 @@ private static Map convertBucketToDocument(CompositeAggregation. TopHits topHits = bucket.getAggregations().get(TOP_HITS_AGGREGATION_NAME); if (topHits.getHits().getHits().length != 1) { throw new ElasticsearchException( - "Unexpected number of hits in the top_hits aggregation result. Wanted: 1, was: {}", topHits.getHits().getHits().length); + "Unexpected number of hits in the top_hits aggregation result. Wanted: 1, was: {}", + topHits.getHits().getHits().length + ); } Map document = topHits.getHits().getHits()[0].getSourceAsMap(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java index 4a354abe96f08..5f3c38fa81f95 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java @@ -457,11 +457,8 @@ static class GeoShapeMetricAggExtractor implements AggValueExtractor { @Override public Object value(Aggregation aggregation, Map fieldTypeMap, String lookupFieldPrefix) { - assert aggregation instanceof GeoShapeMetricAggregation : "Unexpected type [" - + aggregation.getClass().getName() - + "] for aggregation [" - + aggregation.getName() - + "]"; + assert aggregation instanceof GeoShapeMetricAggregation + : "Unexpected type [" + aggregation.getClass().getName() + "] for aggregation [" + aggregation.getName() + "]"; return ((GeoShapeMetricAggregation) aggregation).geoJSONGeometry(); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java index ccb2a0cc7ee1e..87ce05f951160 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java @@ -7,12 +7,12 @@ package org.elasticsearch.xpack.transform.transforms.pivot; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; From 0cb6c0affc4c29712cd5a3f64422fb9986a59daa Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 27 Jul 2021 14:35:48 +0200 Subject: [PATCH 3/4] use Stream.of --- .../xpack/core/indexing/AsyncTwoPhaseIndexerTests.java | 4 ++-- .../transform/transforms/TransformIndexerStateTests.java | 3 ++- .../xpack/transform/transforms/TransformIndexerTests.java | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 64a71c61baefe..263870b1ce759 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -192,13 +192,13 @@ protected IterationResult doProcess(SearchResponse searchResponse) { ++processOps; if (processOps == 5) { - return new IterationResult<>(Collections.singletonList(new IndexRequest()).stream(), processOps, true); + return new IterationResult<>(Stream.of(new IndexRequest()), processOps, true); } else if (processOps % 2 == 0) { return new IterationResult<>(Stream.empty(), processOps, false); } - return new IterationResult<>(Collections.singletonList(new IndexRequest()).stream(), processOps, false); + return new IterationResult<>(Stream.of(new IndexRequest()), processOps, false); } @Override diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index c8ff66a808184..9b12cd5f456e7 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -63,6 +63,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; @@ -233,7 +234,7 @@ protected IterationResult doProcess(SearchResponse sea // pretend that we processed 10k documents for each call getStats().incrementNumDocuments(10_000); return new IterationResult<>( - Collections.singletonList(new IndexRequest()).stream(), + Stream.of(new IndexRequest()), new TransformIndexerPosition(null, null), false ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 9282ca781ea75..fb2579229f1fd 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -60,6 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; @@ -247,7 +248,7 @@ protected IterationResult doProcess(SearchResponse sea // pretend that we processed 10k documents for each call getStats().incrementNumDocuments(10_000); return new IterationResult<>( - Collections.singletonList(new IndexRequest()).stream(), + Stream.of(new IndexRequest()), new TransformIndexerPosition(null, null), numberOfLoops == 0 ); From d6358cbd4433d8a4dbd78fee55fe2813b9a63ae5 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 27 Jul 2021 15:44:41 +0200 Subject: [PATCH 4/4] ensure progress is never null --- .../transforms/TransformIndexer.java | 19 ++++++++----------- .../common/AbstractCompositeAggFunction.java | 3 ++- .../transform/transforms/latest/Latest.java | 6 ++---- .../pivot/AggregationResultUtils.java | 6 ++---- .../pivot/AggregationResultUtilsTests.java | 17 ++++++++++++----- 5 files changed, 26 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 8073b74bbee3d..8379d77d60b05 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -144,7 +144,7 @@ public TransformIndexer( this.checkpointProvider = ExceptionsHelper.requireNonNull(checkpointProvider, "checkpointProvider"); this.auditor = transformServices.getAuditor(); this.transformConfig = ExceptionsHelper.requireNonNull(transformConfig, "transformConfig"); - this.progress = transformProgress; + this.progress = progress != null ? progress : new TransformProgress(); this.lastCheckpoint = ExceptionsHelper.requireNonNull(lastCheckpoint, "lastCheckpoint"); this.nextCheckpoint = ExceptionsHelper.requireNonNull(nextCheckpoint, "nextCheckpoint"); this.context = ExceptionsHelper.requireNonNull(context, "context"); @@ -295,10 +295,10 @@ protected void onStart(long now, ActionListener listener) { doGetInitialProgress(request, ActionListener.wrap(response -> { function.getInitialProgressFromResponse(response, ActionListener.wrap(newProgress -> { logger.trace("[{}] reset the progress from [{}] to [{}].", getJobId(), progress, newProgress); - progress = newProgress; + progress = newProgress != null ? newProgress : new TransformProgress(); finalListener.onResponse(null); }, failure -> { - progress = null; + progress = new TransformProgress(); logger.warn( new ParameterizedMessage("[{}] unable to load progress information for task.", getJobId()), failure @@ -306,7 +306,7 @@ protected void onStart(long now, ActionListener listener) { finalListener.onResponse(null); })); }, failure -> { - progress = null; + progress = new TransformProgress(); logger.warn(new ParameterizedMessage("[{}] unable to load progress information for task.", getJobId()), failure); finalListener.onResponse(null); })); @@ -514,19 +514,16 @@ private void finalizeCheckpoint(ActionListener listener) { // NOTE: this method is called in the same thread as the processing thread. // Theoretically, there should not be a race condition with updating progress here. // NOTE 2: getPercentComplete should only NOT be null on the first (batch) checkpoint - if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { + if (progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) { progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed()); } if (lastCheckpoint != null) { long docsIndexed = 0; long docsProcessed = 0; - // This should not happen as we simply create a new one when we reach continuous checkpoints - // but this is a paranoid `null` check - if (progress != null) { - docsIndexed = progress.getDocumentsIndexed(); - docsProcessed = progress.getDocumentsProcessed(); - } + docsIndexed = progress.getDocumentsIndexed(); + docsProcessed = progress.getDocumentsProcessed(); + long durationMs = System.currentTimeMillis() - lastCheckpoint.getTimestamp(); getStats().incrementCheckpointExponentialAverages(durationMs < 0 ? 0 : durationMs, docsIndexed, docsProcessed); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java index f80b9d203b9de..a47a3657f2efd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java @@ -86,8 +86,9 @@ public void preview( } final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); TransformIndexerStats stats = new TransformIndexerStats(); + TransformProgress progress = new TransformProgress(); - List> docs = extractResults(agg, fieldTypeMap, stats, null).map( + List> docs = extractResults(agg, fieldTypeMap, stats, progress).map( this::documentTransformationFunction ).collect(Collectors.toList()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java index c866fd05f6331..1930515d6928d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/latest/Latest.java @@ -80,10 +80,8 @@ private static Map convertBucketToDocument( TransformProgress progress ) { transformIndexerStats.incrementNumDocuments(bucket.getDocCount()); - if (progress != null) { - progress.incrementDocsProcessed(bucket.getDocCount()); - progress.incrementDocsIndexed(1L); - } + progress.incrementDocsProcessed(bucket.getDocCount()); + progress.incrementDocsIndexed(1L); TopHits topHits = bucket.getAggregations().get(TOP_HITS_AGGREGATION_NAME); if (topHits.getHits().getHits().length != 1) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java index 5f3c38fa81f95..7234f9e9f8f59 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java @@ -104,10 +104,8 @@ public static Stream> extractCompositeAggregationResults( ) { return agg.getBuckets().stream().map(bucket -> { stats.incrementNumDocuments(bucket.getDocCount()); - if (progress != null) { - progress.incrementDocsProcessed(bucket.getDocCount()); - progress.incrementDocsIndexed(1L); - } + progress.incrementDocsProcessed(bucket.getDocCount()); + progress.incrementDocsIndexed(1L); Map document = new HashMap<>(); // generator to create unique but deterministic document ids, so we diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java index 87ce05f951160..0d323b59495e3 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtilsTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; +import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig; import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils.BucketKeyExtractor; @@ -647,6 +648,7 @@ public void testExtractCompositeAggregationResultsDocIDs() throws IOException { ) ); TransformIndexerStats stats = new TransformIndexerStats(); + TransformProgress progress = new TransformProgress(); Map fieldTypeMap = asStringMap(aggName, "double", targetField, "keyword", targetField2, "keyword"); @@ -656,7 +658,8 @@ public void testExtractCompositeAggregationResultsDocIDs() throws IOException { Collections.emptyList(), inputFirstRun, fieldTypeMap, - stats + stats, + progress ); List> resultSecondRun = runExtraction( groupBy, @@ -664,7 +667,8 @@ public void testExtractCompositeAggregationResultsDocIDs() throws IOException { Collections.emptyList(), inputSecondRun, fieldTypeMap, - stats + stats, + progress ); assertNotEquals(resultFirstRun, resultSecondRun); @@ -1057,6 +1061,7 @@ private void executeTest( long expectedDocCounts ) throws IOException { TransformIndexerStats stats = new TransformIndexerStats(); + TransformProgress progress = new TransformProgress(); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); @@ -1066,7 +1071,8 @@ private void executeTest( pipelineAggregationBuilders, input, fieldTypeMap, - stats + stats, + progress ); // remove the document ids and test uniqueness @@ -1085,7 +1091,8 @@ private List> runExtraction( Collection pipelineAggregationBuilders, Map input, Map fieldTypeMap, - TransformIndexerStats stats + TransformIndexerStats stats, + TransformProgress progress ) throws IOException { XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); @@ -1100,7 +1107,7 @@ private List> runExtraction( pipelineAggregationBuilders, fieldTypeMap, stats, - null, + progress, true ).collect(Collectors.toList()); }