diff --git a/bulk_indexer.go b/bulk_indexer.go index 5fcf50c..07e9745 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -232,9 +232,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error if b.maxDocumentRetry > 0 { if cap(b.copyBuf) < b.buf.Len() { b.copyBuf = slices.Grow(b.copyBuf, b.buf.Len()-cap(b.copyBuf)) - b.copyBuf = b.copyBuf[:cap(b.copyBuf)] + b.copyBuf = b.copyBuf[:b.buf.Len()] } - copy(b.copyBuf, b.buf.Bytes()) + n := copy(b.copyBuf, b.buf.Bytes()) + b.copyBuf = b.copyBuf[:n] } req := esapi.BulkRequest{ @@ -275,7 +276,7 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error // Only run the retry logic if document retries are enabled if b.maxDocumentRetry > 0 { - buf := make([]byte, 0, 1024) + buf := make([]byte, 0, 4096) // Eliminate previous retry counts that aren't present in the bulk // request response. @@ -307,8 +308,8 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error } // keep track of the previous newlines - // the buffer is being read lazily - seen := 0 + newlinesInBuf := 0 // number of newlines currently in `buf` + seen := 0 // number of newlines previously read (excluding the ones in `buf`) for _, res := range resp.FailedDocs { if res.Status == http.StatusTooManyRequests { @@ -336,43 +337,53 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error b.retryCounts[b.itemsAdded] = count if b.gzipw != nil { - // First loop, read from the gzip reader - if len(buf) == 0 { - n, _ := gr.Read(buf[:cap(buf)]) - buf = buf[:n] - } - - newlines := bytes.Count(buf, []byte{'\n'}) - // loop until we've seen the start newline - for seen+newlines < startln { - seen += newlines - n, _ := gr.Read(buf[:cap(buf)]) + for seen+newlinesInBuf < startln || len(buf) == 0 { + seen += newlinesInBuf + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("error reading gzip data: %w", err) + } buf = buf[:n] - newlines = bytes.Count(buf, []byte{'\n'}) + newlinesInBuf = bytes.Count(buf, []byte{'\n'}) + if err == io.EOF && seen+newlinesInBuf < startln { + return resp, fmt.Errorf("could not find first newline: %w", err) + } } startIdx := indexnth(buf, startln-seen, '\n') + 1 endIdx := indexnth(buf, endln-seen, '\n') + 1 - // If the end newline is not in the buffer read more data - if endIdx == 0 { - // loop until we've seen the end newline - for seen+newlines < endln { - // Add more capacity (let append pick how much). - buf = append(buf, 0)[:len(buf)] + if endIdx != 0 { + // The whole document is in buf - write it and move to the next one. + b.writer.Write(buf[startIdx:endIdx]) + continue + } - n, _ := gr.Read(buf[len(buf):cap(buf)]) - buf = buf[:len(buf)+n] + // We only have the beginning of the document. Write what we have so far. + b.writer.Write(buf[startIdx:]) - newlines = bytes.Count(buf, []byte{'\n'}) + // Loop until we've seen the end newline. + for seen+newlinesInBuf < endln { + seen += newlinesInBuf + n, err := gr.Read(buf[:cap(buf)]) + if err != nil && err != io.EOF { + return resp, fmt.Errorf("error reading gzip data: %w", err) + } + buf = buf[:n] + newlinesInBuf = bytes.Count(buf, []byte{'\n'}) + if seen+newlinesInBuf < endln { + if err == io.EOF { + return resp, fmt.Errorf("unable to find end newline in gzip data: %w", err) + } + // We haven't found the end newline yet - write the whole buffer. + b.writer.Write(buf) } - - // try again to find the end newline - endIdx = indexnth(buf, endln-seen, '\n') + 1 } - b.writer.Write(buf[startIdx:endIdx]) + // End newline found + endIdx = indexnth(buf, endln-seen, '\n') + 1 + b.writer.Write(buf[:endIdx]) } else { startIdx := indexnth(b.copyBuf[lastIdx:], startln-lastln, '\n') + 1 endIdx := indexnth(b.copyBuf[lastIdx:], endln-lastln, '\n') + 1