Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gracefully handle read errors when retrying documents #137

Merged
merged 2 commits into from
Mar 14, 2024

Conversation

kruskall
Copy link
Member

@kruskall kruskall commented Mar 13, 2024

Do not ignore read errors.

Stop reading and returns the error

@kruskall kruskall requested a review from a team as a code owner March 13, 2024 19:43
@elastic-apm-tech elastic-apm-tech added the safe-to-test Automated label for running bench-diff on forked PRs label Mar 13, 2024
Copy link
Contributor

@vikmik vikmik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to make sure you don't read data that doesn't belong to the current request. If the previous request had a larger payload, then you will get non-EOF errors towards the end of the request because the gzip reader will attempt to read data at the end of copyBuf

I had fixed it here: https://github.com/elastic/go-docappender/pull/129/files#diff-c7cec697c2474f331487a79c052f6150eca5c9a2162a74094df972d831e8223aL235 - it only requires making sure copyBuf's length doesn't go past the data from the current request payload.

I think the tests are failing because of this right now

@@ -348,7 +351,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
// loop until we've seen the start newline
for seen+newlines < startln {
seen += newlines
n, _ := gr.Read(buf[:cap(buf)])
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you encounter an unexpected EOF then this will be an infinite loop. Defensive code should bail and error when that's the case, even though it shouldn't be possible in normal circumstances

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you encounter an unexpected EOF then this will be an infinite loop. Defensive code should bail and error when that's the case, even though it shouldn't be possible in normal circumstances

Could you clarify this ? If we get an error we just return early, I don't see why this is an infinite loop

Copy link
Contributor

@vikmik vikmik Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only return early if err != nil and err != EOF

If err == EOF and we haven't yet seen the start newline, this may never exit. This shouldn't happen if the contents of copyBuf are what we expect, so it depends how defensive you want to be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -364,7 +370,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
// loop until we've seen the end newline
for seen+newlines < endln {
seen += newlines
n, _ := gr.Read(buf[:cap(buf)])
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
Copy link
Contributor

@vikmik vikmik Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same here - this may be an infinite loop if we encounter an unexpected EOF while we still expect to find newlines)

Copy link
Contributor

@vikmik vikmik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean to block this PR behind the infinite loop stuff, which should in theory only happen on memory corruption scenarios. This isn't a regression - approving now as I need to step away in case you want to release now.

Otherwise feel free to amend as you see fit.

@kruskall
Copy link
Member Author

I don't mean to block this PR behind the infinite loop stuff, which should in theory only happen on memory corruption scenarios. This isn't a regression - approving now as I need to step away in case you want to release now.

I think we need to revisit this approach. With this approach we would end up losing any metrics for the current flush request if there's any error in the retry phase. I don't think that's ideal

@vikmik
Copy link
Contributor

vikmik commented Mar 14, 2024

I think we need to revisit this approach. With this approach we would end up losing any metrics for the current flush request if there's any error in the retry phase. I don't think that's ideal

Prior to this PR we were having spurious failed gzip reads due to a bug, but ignoring them. First and foremost error handling helps with maintainability/debuggability: now that the error handling was added, the "bug" surfaced and is now fixed with your latest commit.

After this, if there are any errors it means the payload data is corrupted or there's a more serious service bug. In general there are no reasons we shouldn't be able to properly decompress something we've just compressed. If memory is getting corrupted, metrics should be the last of our problems imho. This change is only for maintainability and to ensure we can catch very serious unexpected issues, not to catch something we expect to happen in steady state.

@kruskall kruskall merged commit b3aa972 into elastic:main Mar 14, 2024
5 checks passed
@kruskall kruskall deleted the gzip/err branch March 14, 2024 12:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe-to-test Automated label for running bench-diff on forked PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants