Skip to content

Commit

Permalink
[Transform][Rollup] remove unnecessary list indirection (elastic#75459)
Browse files Browse the repository at this point in the history
Remove an unnecessary indirection and refactor progress tracking. Both rollup and transform
process documents as stream, however in the AsyncTwoPhaseIndexer takes a List of index
requests. This change removes the unnecessary temporary container and makes upcoming
transform enhancements easier.
  • Loading branch information
Hendrik Muhs authored Jul 28, 2021
1 parent e99160d commit fb0846a
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -487,13 +485,12 @@ private void onSearchResponse(SearchResponse searchResponse) {
return;
}

final List<IndexRequest> docs = iterationResult.getToIndex();
final BulkRequest bulkRequest = new BulkRequest();
iterationResult.getToIndex().forEach(bulkRequest::add);
stats.markEndProcessing();

// an iteration result might return an empty set of documents to be indexed
if (docs.isEmpty() == false) {
final BulkRequest bulkRequest = new BulkRequest();
docs.forEach(bulkRequest::add);
stats.markEndProcessing();
if (bulkRequest.numberOfActions() > 0) {
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
Expand All @@ -512,7 +509,6 @@ private void onSearchResponse(SearchResponse searchResponse) {
onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure));
} else {
stats.markEndProcessing();
// no documents need to be indexed, continue with search
try {
JobPosition newPosition = iterationResult.getPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import org.elasticsearch.action.index.IndexRequest;

import java.util.List;
import java.util.stream.Stream;

/**
* Result object to hold the result of 1 iteration of iterative indexing.
Expand All @@ -19,18 +19,18 @@ public class IterationResult<JobPosition> {

private final boolean isDone;
private final JobPosition position;
private final List<IndexRequest> toIndex;
private final Stream<IndexRequest> toIndex;

/**
* Constructor for the result of 1 iteration.
*
* @param toIndex the list of requests to be indexed
* @param toIndex the stream of requests to be indexed
* @param position the extracted, persistable position of the job required for the search phase
* @param isDone true if source is exhausted and job should go to sleep
*
* Note: toIndex.empty() != isDone due to possible filtering in the specific implementation
*/
public IterationResult(List<IndexRequest> toIndex, JobPosition position, boolean isDone) {
public IterationResult(Stream<IndexRequest> toIndex, JobPosition position, boolean isDone) {
this.toIndex = toIndex;
this.position = position;
this.isDone = isDone;
Expand All @@ -53,11 +53,11 @@ public JobPosition getPosition() {
}

/**
* List of requests to be passed to bulk indexing.
* Stream of requests to be passed to bulk indexing.
*
* @return List of index requests.
* @return Stream of index requests.
*/
public List<IndexRequest> getToIndex() {
public Stream<IndexRequest> getToIndex() {
return toIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -74,7 +75,7 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished);
assertThat(step, equalTo(2));
++step;
return new IterationResult<>(Collections.emptyList(), 3, true);
return new IterationResult<>(Stream.empty(), 3, true);
}

private void awaitForLatch() {
Expand Down Expand Up @@ -191,13 +192,13 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {

++processOps;
if (processOps == 5) {
return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true);
return new IterationResult<>(Stream.of(new IndexRequest()), processOps, true);
}
else if (processOps % 2 == 0) {
return new IterationResult<>(Collections.emptyList(), processOps, false);
return new IterationResult<>(Stream.empty(), processOps, false);
}

return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false);
return new IterationResult<>(Stream.of(new IndexRequest()), processOps, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* These utilities are used to convert agg responses into a set of rollup documents.
Expand All @@ -45,9 +45,9 @@ class IndexerUtils {
* @param stats The stats accumulator for this job's task
* @param groupConfig The grouping configuration for the job
* @param jobId The ID for the job
* @return A list of rolled documents derived from the response
* @return A stream of rolled documents derived from the response
*/
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats,
static Stream<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats,
GroupConfig groupConfig, String jobId) {

logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]");
Expand All @@ -72,7 +72,7 @@ static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollup
IndexRequest request = new IndexRequest(rollupIndex).id(idGenerator.getID());
request.source(doc);
return request;
}).collect(Collectors.toList());
});
}

private static void processKeys(Map<String, Object> keys, Map<String, Object> doc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName;

Expand Down Expand Up @@ -130,7 +131,7 @@ protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchRe

if (response.getBuckets().isEmpty()) {
// do not reset the position as we want to continue from where we stopped
return new IterationResult<>(Collections.emptyList(), getPosition(), true);
return new IterationResult<>(Stream.empty(), getPosition(), true);
}

return new IterationResult<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomDateHistogramGroupConfig;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testMissingFields() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -174,7 +175,7 @@ public void testCorrectFields() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -227,7 +228,7 @@ public void testNumericTerms() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -287,7 +288,7 @@ public void testEmptyCounts() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -337,7 +338,8 @@ public void testKeyOrdering() {
});

GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1L, "abc"), null);
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo")
.collect(Collectors.toList());
assertThat(docs.size(), equalTo(1));
assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA"));
}
Expand Down Expand Up @@ -387,7 +389,8 @@ public void testKeyOrderingLong() {
});

GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1, "abc"), null);
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo")
.collect(Collectors.toList());
assertThat(docs.size(), equalTo(1));
assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw"));
}
Expand All @@ -414,7 +417,8 @@ public void testNullKeys() {
});

GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), randomHistogramGroupConfig(random()), null);
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo")
.collect(Collectors.toList());
assertThat(docs.size(), equalTo(1));
assertFalse(Strings.isNullOrEmpty(docs.get(0).id()));
}
Expand Down Expand Up @@ -471,7 +475,7 @@ public void testMissingBuckets() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(6));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -542,7 +546,7 @@ public void testTimezone() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(2));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ void preview(
* @param destinationPipeline the destination pipeline
* @param fieldMappings field mappings for the destination
* @param stats a stats object to record/collect stats
* @param progress a progress object to record/collect progress information
* @return a tuple with the stream of index requests and the cursor
*/
Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
SearchResponse searchResponse,
String destinationIndex,
String destinationPipeline,
Map<String, String> fieldMappings,
TransformIndexerStats stats
TransformIndexerStats stats,
TransformProgress progress
);
}
Loading

0 comments on commit fb0846a

Please sign in to comment.