Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform][Rollup] remove unnecessary list indirection #75459

Merged
merged 4 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -487,13 +485,12 @@ private void onSearchResponse(SearchResponse searchResponse) {
return;
}

final List<IndexRequest> docs = iterationResult.getToIndex();
final BulkRequest bulkRequest = new BulkRequest();
iterationResult.getToIndex().forEach(bulkRequest::add);
stats.markEndProcessing();

// an iteration result might return an empty set of documents to be indexed
if (docs.isEmpty() == false) {
final BulkRequest bulkRequest = new BulkRequest();
docs.forEach(bulkRequest::add);
stats.markEndProcessing();
if (bulkRequest.numberOfActions() > 0) {
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
Expand All @@ -512,7 +509,6 @@ private void onSearchResponse(SearchResponse searchResponse) {
onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure));
} else {
stats.markEndProcessing();
// no documents need to be indexed, continue with search
try {
JobPosition newPosition = iterationResult.getPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import org.elasticsearch.action.index.IndexRequest;

import java.util.List;
import java.util.stream.Stream;

/**
* Result object to hold the result of 1 iteration of iterative indexing.
Expand All @@ -19,18 +19,18 @@ public class IterationResult<JobPosition> {

private final boolean isDone;
private final JobPosition position;
private final List<IndexRequest> toIndex;
private final Stream<IndexRequest> toIndex;

/**
* Constructor for the result of 1 iteration.
*
* @param toIndex the list of requests to be indexed
* @param toIndex the stream of requests to be indexed
* @param position the extracted, persistable position of the job required for the search phase
* @param isDone true if source is exhausted and job should go to sleep
*
* Note: toIndex.empty() != isDone due to possible filtering in the specific implementation
*/
public IterationResult(List<IndexRequest> toIndex, JobPosition position, boolean isDone) {
public IterationResult(Stream<IndexRequest> toIndex, JobPosition position, boolean isDone) {
this.toIndex = toIndex;
this.position = position;
this.isDone = isDone;
Expand All @@ -53,11 +53,11 @@ public JobPosition getPosition() {
}

/**
* List of requests to be passed to bulk indexing.
* Stream of requests to be passed to bulk indexing.
*
* @return List of index requests.
* @return Stream of index requests.
*/
public List<IndexRequest> getToIndex() {
public Stream<IndexRequest> getToIndex() {
return toIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -74,7 +75,7 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {
assertFalse("should not be called as stoppedBeforeFinished is false", stoppedBeforeFinished);
assertThat(step, equalTo(2));
++step;
return new IterationResult<>(Collections.emptyList(), 3, true);
return new IterationResult<>(Stream.empty(), 3, true);
}

private void awaitForLatch() {
Expand Down Expand Up @@ -191,13 +192,13 @@ protected IterationResult<Integer> doProcess(SearchResponse searchResponse) {

++processOps;
if (processOps == 5) {
return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, true);
return new IterationResult<>(Stream.of(new IndexRequest()), processOps, true);
}
else if (processOps % 2 == 0) {
return new IterationResult<>(Collections.emptyList(), processOps, false);
return new IterationResult<>(Stream.empty(), processOps, false);
}

return new IterationResult<>(Collections.singletonList(new IndexRequest()), processOps, false);
return new IterationResult<>(Stream.of(new IndexRequest()), processOps, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* These utilities are used to convert agg responses into a set of rollup documents.
Expand All @@ -45,9 +45,9 @@ class IndexerUtils {
* @param stats The stats accumulator for this job's task
* @param groupConfig The grouping configuration for the job
* @param jobId The ID for the job
* @return A list of rolled documents derived from the response
* @return A stream of rolled documents derived from the response
*/
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats,
static Stream<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupIndexerJobStats stats,
GroupConfig groupConfig, String jobId) {

logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]");
Expand All @@ -72,7 +72,7 @@ static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollup
IndexRequest request = new IndexRequest(rollupIndex).id(idGenerator.getID());
request.source(doc);
return request;
}).collect(Collectors.toList());
});
}

private static void processKeys(Map<String, Object> keys, Map<String, Object> doc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName;

Expand Down Expand Up @@ -130,7 +131,7 @@ protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchRe

if (response.getBuckets().isEmpty()) {
// do not reset the position as we want to continue from where we stopped
return new IterationResult<>(Collections.emptyList(), getPosition(), true);
return new IterationResult<>(Stream.empty(), getPosition(), true);
}

return new IterationResult<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomDateHistogramGroupConfig;
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testMissingFields() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -174,7 +175,7 @@ public void testCorrectFields() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -227,7 +228,7 @@ public void testNumericTerms() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -287,7 +288,7 @@ public void testEmptyCounts() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(numDocs));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -337,7 +338,8 @@ public void testKeyOrdering() {
});

GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1L, "abc"), null);
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo")
.collect(Collectors.toList());
assertThat(docs.size(), equalTo(1));
assertThat(docs.get(0).id(), equalTo("foo$c9LcrFqeFW92uN_Z7sv1hA"));
}
Expand Down Expand Up @@ -387,7 +389,8 @@ public void testKeyOrderingLong() {
});

GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), new HistogramGroupConfig(1, "abc"), null);
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo")
.collect(Collectors.toList());
assertThat(docs.size(), equalTo(1));
assertThat(docs.get(0).id(), equalTo("foo$VAFKZpyaEqYRPLyic57_qw"));
}
Expand All @@ -414,7 +417,8 @@ public void testNullKeys() {
});

GroupConfig groupConfig = new GroupConfig(randomDateHistogramGroupConfig(random()), randomHistogramGroupConfig(random()), null);
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, "foo", new RollupIndexerJobStats(), groupConfig, "foo")
.collect(Collectors.toList());
assertThat(docs.size(), equalTo(1));
assertFalse(Strings.isNullOrEmpty(docs.get(0).id()));
}
Expand Down Expand Up @@ -471,7 +475,7 @@ public void testMissingBuckets() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(6));
for (IndexRequest doc : docs) {
Expand Down Expand Up @@ -542,7 +546,7 @@ public void testTimezone() throws IOException {
directory.close();

final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo");
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo").collect(Collectors.toList());

assertThat(docs.size(), equalTo(2));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ void preview(
* @param destinationPipeline the destination pipeline
* @param fieldMappings field mappings for the destination
* @param stats a stats object to record/collect stats
* @param progress a progress object to record/collect progress information
* @return a tuple with the stream of index requests and the cursor
*/
Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
SearchResponse searchResponse,
String destinationIndex,
String destinationPipeline,
Map<String, String> fieldMappings,
TransformIndexerStats stats
TransformIndexerStats stats,
TransformProgress progress
);
}
Loading