-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: refresh the rollup index as part of the rollup indexer actions #86992
fix: refresh the rollup index as part of the rollup indexer actions #86992
Conversation
Using the command line provided in elastic#81983 I could not reproduce the issue. Anyway, changing the timeout from 5 seconds to just 1 second I could reproduce the issue. At the end, keeping a timeout of 1 second I could fix the issue moving the refresh operation just before running the match_all query. My understanding is that the timeout is used to give the rollup job enough time to complete the rollup operation. Anyway, other than that, after the rollup job has written data into the rollup index we need to refresh it in order for the subsequent search operation to hit the expected rolledup document.
Pinging @elastic/es-analytics-geo (Team:Analytics) |
Pinging @elastic/clients-team (Team:Clients) |
I genuinely unsure what's going on here. I wish we didn't have to delay some number of seconds like this. If the action is async we should be returning a task_id that we can wait_for_completion on. Is that a thing here? |
I will see in the API if we have such an option, but I think the issue here is that we might need to poll the status in the test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rollup v1 indeed does not refresh the index after adding the new rollup documents.
Moving the refresh action before the search would probably fix this test. However, I was thinking that it may be preferrable to add a refresh at the end of the rollup job.
Rollup v1 works by scheduling a cron job (persistent task) and it's not an asyc action. So, the delay is added to make sure that the job has started and completed after job |
Instead of refreshing from the yaml test we refresh once the indexing operation is complete in the callback. This reverts commit 7968c70.
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally the change looks good. I left some small comments
@Override | ||
protected void onFinish(ActionListener<Void> listener) { | ||
final RefreshRequest refreshRequest = new RefreshRequest(job.getConfig().getRollupIndex()); | ||
client.admin().indices().refresh(refreshRequest).actionGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the call to refresh
fails? actionGet()
may throw exceptions here?
Also, I think the call to refresh
sould be async by calling method
client.admin().indices().refresh(RefreshRequest request, ActionListener<RefreshResponse> listener)
The big question is what happens if the call to refresh
fails.
Since we got there, the index has been created successfully and we should not fail the whole job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess in both cases, successful or not we can just log something.
@@ -705,10 +707,12 @@ private void executeTestCase( | |||
IndexSearcher searcher = new IndexSearcher(reader); | |||
String dateHistoField = config.getGroupConfig().getDateHistogram().getField(); | |||
final ThreadPool threadPool = new TestThreadPool(getTestName()); | |||
final Client client = Mockito.mock(Client.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you would like to use NoOpClient
here and everywhere else. Although Mockito should do its job fine here, we generally prefer this as to mocking the class directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I didn't see we had a NoOpClient
. I like using it too...at least it spares us from having an additional dependency on Mockito (even if it is already there for one of the test classes).
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's worth trying implementing this logic in ClientRollupPageManager
@@ -85,13 +98,15 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj | |||
* @param jobStats jobstats instance for collecting stats | |||
*/ | |||
RollupIndexer( | |||
Client client, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking at the code again. RollupIndexer
is an abstract class and its implementaion is ClientRollupPageManager
that already has an instance of the client
. So, I think the index refresh code should be implemented in the ClientRollupPageManager#onFinish
. Is there any reason it should not be there?
@@ -115,6 +130,23 @@ protected void onStart(long now, ActionListener<Boolean> listener) { | |||
} | |||
} | |||
|
|||
@Override | |||
protected void onFinish(ActionListener<Void> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point, I think you should at least invoke the listener.onResponse(null)
method.
Alternatively, you can see the code at TransformIndexer
for inspiration.
Also, this PR fixes #53412 |
@elasticmachine update branch |
The client is used to call the refresh action too. As a result, using 'any()' as the action would match both the search and refresh actions while we just need to handle the search action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some more comments. Basically, now that refresh happens in ClientRollupPageManager
, we don't need the client
dependency in the RollupIndexer
class and all its subclasses.
@@ -61,6 +62,7 @@ | |||
public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, RollupIndexerJobStats> { | |||
static final String AGGREGATION_NAME = RollupField.NAME; | |||
|
|||
private final Client client; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since now we use the ClientRollupPageManager
implementation to refresh the rollup index,
client
instance does not need to be injected any more.
|
||
@Override | ||
public void onResponse(RefreshResponse refreshResponse) { | ||
logger.info("refreshing rollup index {} successful for job {}", jobConfig.getRollupIndex(), jobConfig.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should be logger.trace()
? It does not look very important information so that its logged at info level
); | ||
} | ||
}; | ||
client.admin().indices().refresh(new RefreshRequest(jobConfig.getRollupIndex()), refreshResponseActionListener); | ||
listener.onResponse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listener#onResponse
should probably be called from inside
refreshResponseActionListener#onResponse
and refreshResponseActionListener#onFailure
so that
the responses of the async calls are chained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this breaks tests in RollupJobTasks
like testTriggerWithHeaders
on
assertBusy(() -> assertTrue(finished.get()));
@@ -53,22 +56,24 @@ | |||
public class RollupIndexerStateTests extends ESTestCase { | |||
private static class EmptyRollupIndexer extends RollupIndexer { | |||
EmptyRollupIndexer( | |||
Client client, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EmptyRollupIndexer
and DelayedEmptyRollupIndexer
probably don't need a client
dependency
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, change looks good.
I have only left a comment that the client
arg should be removed from the SyncRollupIndexer
. I don't think Client
is used in this class. After changing this, no other review is required.
Thank you for fixing this bug.
@@ -819,6 +821,7 @@ class SyncRollupIndexer extends RollupIndexer { | |||
private Exception exc; | |||
|
|||
SyncRollupIndexer( | |||
Client client, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think the client
constructor parameter is used anywhere. Perhaps you should remove it
We also need to mock the refresh response for the client to actually propagate the result of the refresh call.
Yeah! Thanks a lot for fixing this, Salvatore. |
Before reading documents from the rollup index we need the index
to be refreshed to avoid the search API not hitting the expected results.
Here, instead of relying on the test to request a refresh, we let the
rollup job do the refresh of the rollup index as part of the
onFinish
callback.
Resolves #81983
Resolves #53412