Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Aggregator#buildTopLevel() to search worker thread. #98715

Merged
merged 20 commits into from
Sep 19, 2023

Conversation

iverase
Copy link
Contributor

@iverase iverase commented Aug 22, 2023

This PR introduces an AggregatorCollector that contains a finish method which performs aggregation postcollection and builds the internal aggregation for this collector.This method is called on the worker thread at the end of the collection phase.

The PR is set as a draft because it found an issue with global ordinals. In this case you get errors looking like:

java.lang.AssertionError: Sorted doc values are only supposed to be consumed in the thread in which they have been acquired. But was acquired in Thread[#267,elasticsearch[node_s3][search][T#5],5,TGRP-NestedIT] and consumed in Thread[#273,elasticsearch[node_s3][search_worker][T#4],5,TGRP-NestedIT].
	at __randomizedtesting.SeedInfo.seed([B198436B0B2A21D7]:0)
	at org.apache.lucene.tests.index.AssertingLeafReader.assertThread(AssertingLeafReader.java:67)
	at org.apache.lucene.tests.index.AssertingLeafReader$AssertingSortedDocValues.lookupOrd(AssertingLeafReader.java:908)
	at org.apache.lucene.index.SingletonSortedSetDocValues.lookupOrd(SingletonSortedSetDocValues.java:95)
	at org.elasticsearch.search.aggregations.bucket.terms.GlobalOrdinalsStringTermsAggregator$StandardTermsResults.convertTempBucketToRealBucket(GlobalOrdinalsStringTermsAggregator.java:752)

The issue is that global ordinals are created on the first collector but then reused by the other collectors during the postcollection / internal aggregation building phase.

In order to get around the issue we disable the asserting codec.

closes #98705

@iverase iverase added :Analytics/Geo Indexing, search aggregations of geo points and shapes v8.11.0 labels Aug 22, 2023
@iverase iverase requested a review from martijnvg August 22, 2023 09:18
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Aug 22, 2023
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytics-geo (Team:Analytics)

@martijnvg
Copy link
Member

martijnvg commented Sep 8, 2023

I think in order to fix the timeout exception for search cancellation issue that we see with the GraphTests#testTimedoutQueryCrawl() test, we need to have the ability to not throw the TimeExceededException for search worker threads that are doing post collection stuff. I tried the below patch (on top of this PR) and it seemed to fix the test in question locally:

Subject: [PATCH] search-worker-overwrites
---
Index: server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java
--- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java	(revision 5ac1b3057cff072e2d82e932697215ea022be7bb)
+++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java	(date 1694187992273)
@@ -54,10 +54,12 @@
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
@@ -489,7 +491,12 @@
             // otherwise the state of the aggregation might be undefined and running post collection
             // might result in an exception
             if (success || timeExceeded) {
-                doAggregationPostCollection(collector);
+                try {
+                    timeoutOverwrites.put(Thread.currentThread(), true);
+                    doAggregationPostCollection(collector);
+                } finally {
+                    timeoutOverwrites.remove(Thread.currentThread());
+                }
             }
         }
     }
@@ -505,8 +512,12 @@
         return timeExceeded;
     }
 
+    private final Map<Thread, Boolean> timeoutOverwrites = new ConcurrentHashMap<>();
+
     public void throwTimeExceededException() {
-        throw new TimeExceededException();
+        if (timeoutOverwrites.getOrDefault(Thread.currentThread(), false) == false) {
+            throw new TimeExceededException();
+        }
     }
 
     private static class TimeExceededException extends RuntimeException {

@iverase iverase marked this pull request as ready for review September 12, 2023 10:01
Copy link
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@jpountz
Copy link
Contributor

jpountz commented Sep 14, 2023

This change is a bit hard for me to review because I've been away from this code from some time. That said, given that the approach for aggregations is to treat each slice of segments as a mini-shard, it makes sense to me to run buildTopLevel() in the same thread where we ran the collector. This would not only address this issue, but also better parallelize aggregation execution, as I believe that this buildTopLevel() operation is not always cheap, e.g. terms aggregations?

@iverase
Copy link
Contributor Author

iverase commented Sep 14, 2023

This would not only address this issue, but also better parallelize aggregation execution, as I believe that this buildTopLevel() operation is not always cheap, e.g. terms aggregations?

That's right

@@ -498,7 +516,9 @@ public boolean timeExceeded() {
}

public void throwTimeExceededException() {
throw new TimeExceededException();
if (timeoutOverwrites.getOrDefault(Thread.currentThread(), false) == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the changes above I am wondering if there are situations where post collection does want timeout to be thrown. Are there? If not is there a way to disable timeouts in post collection directly? I get worried that this type of change will make it harder to migrate to lucene's timeout support.

Copy link
Contributor Author

@iverase iverase Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the current logic expect no timeouts during the post-collection phase.

is there a way to disable timeouts in post collection directly?

No as far as I know. The main issue is the deferrable aggregations which run during that phase and they actually access the directory which can throw timeouts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we not doing something similar to what we were doing before? I mean, what is the point of having timeouts if we don't throw an exception when there is one? Should we rather remove the timeout runnable at this point before post collection? Or are you worried that we may not honour cancellation if we do so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said, we cannot remove the timeout as it affects all running threads. We still want other threads to honour cancellation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that before we also just build the top level internal aggregations when timeout occurred (in AggregationPhase). This workaround allows us to do the same, otherwise we can't return partial aggregation response (we just fail producing the search response).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it, thanks for explaining.

@iverase
Copy link
Contributor Author

iverase commented Sep 18, 2023

@ellasticmachine run elasticsearch-ci/part-1

Copy link
Member

@javanna javanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/Geo Indexing, search aggregations of geo points and shapes >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.11.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Move Aggregator#buildTopLevel() to search worker thread.
5 participants