Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gracefully handle read errors when retrying documents #137

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
if b.gzipw != nil {
// First loop, read from the gzip reader
if len(buf) == 0 {
n, _ := gr.Read(buf[:cap(buf)])
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return resp, fmt.Errorf("failed to read from compressed buffer: %w", err)
}
buf = buf[:n]
}

Expand All @@ -348,7 +351,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
// loop until we've seen the start newline
for seen+newlines < startln {
seen += newlines
n, _ := gr.Read(buf[:cap(buf)])
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you encounter an unexpected EOF then this will be an infinite loop. Defensive code should bail and error when that's the case, even though it shouldn't be possible in normal circumstances

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you encounter an unexpected EOF then this will be an infinite loop. Defensive code should bail and error when that's the case, even though it shouldn't be possible in normal circumstances

Could you clarify this ? If we get an error we just return early, I don't see why this is an infinite loop

Copy link
Contributor

@vikmik vikmik Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only return early if err != nil and err != EOF

If err == EOF and we haven't yet seen the start newline, this may never exit. This shouldn't happen if the contents of copyBuf are what we expect, so it depends how defensive you want to be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return resp, fmt.Errorf("failed to read from compressed buffer: %w", err)
}
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
}
Expand All @@ -364,7 +370,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
// loop until we've seen the end newline
for seen+newlines < endln {
seen += newlines
n, _ := gr.Read(buf[:cap(buf)])
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
Copy link
Contributor

@vikmik vikmik Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same here - this may be an infinite loop if we encounter an unexpected EOF while we still expect to find newlines)

return resp, fmt.Errorf("failed to read from compressed buffer: %w", err)
}
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
if seen+newlines < endln {
Expand Down
Loading