-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Os source buffer backoff retry #2849
Os source buffer backoff retry #2849
Conversation
14068c0
to
598202c
Compare
Signed-off-by: Taylor Gray <[email protected]>
598202c
to
168227d
Compare
@@ -0,0 +1,130 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we pull out the original BufferAccumulator into a place where multiple plugins can consume it? It'd be better to maintain the logic in a singular place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed will do in a follow up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the followup PR after this one is merged (#2857)
try { | ||
bufferAccumulator.add(record); | ||
} catch (Exception e) { | ||
LOG.error("Failed writing OpenSearch documents to buffer due to: {}", e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we emit a metric here and/or log the document that failed? The user won't be able to take action without knowing which documents are failing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following the s3 source flow that just logs and moves on. Will log that last added record's index and document id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking back at the code, I think my comment was wrong. BufferAccumulator::add adds to the local collection and flushes if needed. There might be a case where adding to the local collection fails, but this exception should almost always be because the flush failed. I thought we were silently dropping the record if this path occurred, but that's not the case.
Having the doc/index in the log is useful to denote progress nonetheless
* | ||
* @param <T> Type of record to accumulate | ||
*/ | ||
public class BufferAccumulator<T extends Record<?>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use javax annotation @NotThreadSafe
Signed-off-by: Taylor Gray <[email protected]>
long nextDelay = INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION.toMillis(); | ||
boolean flushedSuccessfully; | ||
|
||
for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this MAX_FLUSH_RETRIES_ON_IO_EXCEPTION
to be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I don’t think there’s any value in it. But it’s always an option in the future
|
||
private final Collection<T> recordsAccumulated; | ||
|
||
private BufferAccumulator(final Buffer<T> buffer, final int numberOfRecordsToAccumulate, final Duration bufferTimeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other Buffer Accumulator implementations. Do you think they can all be combined to one implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I have a follow-up PR to pull this out into a reusable class (#2857)
Use buffer accumulator in opensearch source to backoff and retry Signed-off-by: Taylor Gray <[email protected]> Signed-off-by: Marcos_Gonzalez_Mayedo <[email protected]>
Description
This change uses the
BufferAccumulator
class from the s3 source to write and flush documents to the buffer with backoff and retry.The
records_to_accumulate
is currently the same as thebatch_size
(pagination size) for searching.Waiting on #2847 before rebasing and pushing non-draft PR
Issues Resolved
Related to #1985
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.