From b3aa9728cd6a7ae522f2a96c5ca2f7be4518fb53 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Thu, 14 Mar 2024 13:13:27 +0100 Subject: [PATCH] fix: gracefully handle read errors when retrying documents (#137) * fix: gracefully handle read errors when retrying documents * fix: always size the copyBuf appropriately --- bulk_indexer.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/bulk_indexer.go b/bulk_indexer.go index 4ebb502..2324250 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -234,7 +234,8 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error b.copyBuf = slices.Grow(b.copyBuf, b.buf.Len()-cap(b.copyBuf)) b.copyBuf = b.copyBuf[:cap(b.copyBuf)] } - copy(b.copyBuf, b.buf.Bytes()) + n := copy(b.copyBuf, b.buf.Bytes()) + b.copyBuf = b.copyBuf[:n] } req := esapi.BulkRequest{ @@ -338,7 +339,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error if b.gzipw != nil { // First loop, read from the gzip reader if len(buf) == 0 { - n, _ := gr.Read(buf[:cap(buf)]) + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + } buf = buf[:n] } @@ -348,7 +352,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // loop until we've seen the start newline for seen+newlines < startln { seen += newlines - n, _ := gr.Read(buf[:cap(buf)]) + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) } @@ -364,7 +371,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // loop until we've seen the end newline for seen+newlines < endln { seen += newlines - n, _ := gr.Read(buf[:cap(buf)]) + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("failed to read from compressed buffer: %w", err) + } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) if seen+newlines < endln {