-
Notifications
You must be signed in to change notification settings - Fork 8.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Search] Add cancel function to pollSearch #85787
Conversation
Makes sure that DELETE requests are properly sent even if consumer unsubscribes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While playing around with this, I realized we have another issue with pollSearch
. It doesn't defer
, which means the search
function will be called (just once) before there are any subscribers. Do you mind wrapping it in a defer
and adding a test case for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The de-duplication of the cancel
looks elegant. 👍
Unfortunately these changes don't quite make the triggering of the abortSignal
and the unsubscription commutative. I left a suggestion for another variant in the comments.
(I'm using the Logs UI's log entry details flyout as a test-bed for verification.)
Thanks for following up on the issue!
const aborted = options?.abortSignal | ||
? abortSignalToPromise(options?.abortSignal) | ||
: { promise: new Promise(() => {}), cleanup: () => {} }; | ||
|
||
return from(search()).pipe( | ||
expand(() => timer(pollInterval).pipe(switchMap(search))), | ||
tap((response) => { | ||
if (isErrorResponse(response)) throw new AbortError(); | ||
}), | ||
takeWhile<Response>(isPartialResponse, true), | ||
takeUntil<Response>(from(aborted.promise)), | ||
finalize(aborted.cleanup) | ||
); | ||
aborted.promise.catch(() => { | ||
if (cancel) cancel(); | ||
}); | ||
|
||
return from(search()).pipe( | ||
expand(() => timer(pollInterval).pipe(switchMap(search))), | ||
tap((response) => { | ||
if (isErrorResponse(response)) { | ||
throw response ? new Error('Received partial response') : new AbortError(); | ||
} | ||
}), | ||
takeWhile<Response>(isPartialResponse, true), | ||
takeUntil<Response>(from(aborted.promise)), | ||
finalize(aborted.cleanup) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this still prevents cancel
call from being called if the consumer unsubscribes from the returned Observable
before the abortSignal
is triggered. The finalize
unregisters the aborted.promise
from the abortSignal
.
How about the following?
export const pollSearch = <Response extends IKibanaSearchResponse>(
search: () => Promise<Response>,
cancel?: () => void,
{ pollInterval = 1000, abortSignal }: IAsyncSearchOptions = {}
): Observable<Response> => {
return defer(() => {
if (abortSignal?.aborted) {
return EMPTY;
}
if (cancel && abortSignal) {
abortSignal.addEventListener('abort', cancel, { once: true });
}
const aborted$ = (abortSignal ? fromEvent(abortSignal, 'abort') : EMPTY).pipe(
map(() => {
throw new AbortError();
})
);
return from(search()).pipe(
expand(() => timer(pollInterval).pipe(switchMap(search))),
tap((response) => {
if (isErrorResponse(response)) {
throw response ? new Error('Received partial response') : new AbortError();
}
}),
takeWhile<Response>(isPartialResponse, true),
takeUntil<Response>(aborted$)
);
});
};
This seems to work and avoids an unnecessary intermediate Promise
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this result in a memory leak since it is never removing the event listener from the abortSignal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suspect not, because nothing holds a reference to the abort signal once the observable goes out of scope. The gc should be able to clean it up afterwards.
catchError((e: AbortError) => { | ||
if (id && !isSavedToBackground) this.deps.http.delete(`/internal/search/${strategy}/${id}`); | ||
catchError((e: Error) => { | ||
cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The de-duplication is neat, but we have to guard against calling this when the abortSignal
has been triggered, because that has already caused the DELETE
request to be sent.
cancel(); | |
if (!combinedSignal.aborted) { | |
cancel(); | |
} |
Otherwise it will be sent twice on explicit abort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukasolson are you still considering to fix this scenario?
Hi @lizozom, Any tips on how reviewers can test this? Any of the checklist items need to be checked off or removed? |
@tsullivan I've been using the Logs UI "log entry details fly-out" to test: It shows a progress bar, offers an explicit "cancel" button and cancels implicitly when the fly-out is closed while loading. It represents a more realistic usage with varying sequences of events (subscribe, unsubscribe, abort, ...) that unit tests don't cover. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested the code change in a plugin, and found that if the consumer unsubscribes the search observable early, then the DELETE
request is not sent.
If the abort signal is called explicitly, then the DELETE
request is sent twice as @weltenwort noted in their comment.
@elasticmachine merge upstream |
💚 Build SucceededMetrics [docs]Async chunks
Page load bundle
History
To update your PR or re-run it, just comment with: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancellation seems to work reliably in all situations I tested. Thanks for hanging in there! 👍
@@ -83,9 +84,9 @@ export class EnhancedSearchInterceptor extends SearchInterceptor { | |||
isSavedToBackground = true; | |||
}); | |||
|
|||
const cancel = () => { | |||
const cancel = once(() => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elegant 👍
* Add cancel functionality to pollSearch Makes sure that DELETE requests are properly sent even if consumer unsubscribes. * Update poll_search.test.ts * cancel on abort * fix jest * ADded jest test * Cancel to be called once * Only cancel internally on abort * ts + addd defer * ts * make cancel code prettier * Cancel even after unsubscribe * Throw abort error if already aborted * Only delete once Co-authored-by: Lukas Olson <[email protected]>
* Add cancel functionality to pollSearch Makes sure that DELETE requests are properly sent even if consumer unsubscribes. * Update poll_search.test.ts * cancel on abort * fix jest * ADded jest test * Cancel to be called once * Only cancel internally on abort * ts + addd defer * ts * make cancel code prettier * Cancel even after unsubscribe * Throw abort error if already aborted * Only delete once Co-authored-by: Lukas Olson <[email protected]> Co-authored-by: Liza Katz <[email protected]>
Summary
Resolves #85603
Makes sure that Elasticsearch DELETE requests for async searches are properly sent on abort, even if consumer unsubscribes, both from client and server.
Checklist
Delete any items that are not applicable to this PR.
For maintainers