Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go-docappender: use a fixed-size buffer #129

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 41 additions & 30 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,10 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
if b.maxDocumentRetry > 0 {
if cap(b.copyBuf) < b.buf.Len() {
b.copyBuf = slices.Grow(b.copyBuf, b.buf.Len()-cap(b.copyBuf))
b.copyBuf = b.copyBuf[:cap(b.copyBuf)]
b.copyBuf = b.copyBuf[:b.buf.Len()]
}
copy(b.copyBuf, b.buf.Bytes())
n := copy(b.copyBuf, b.buf.Bytes())
b.copyBuf = b.copyBuf[:n]
Comment on lines +237 to +238
Copy link
Contributor Author

@vikmik vikmik Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this PR there was no error handling of the gzip reader.
I fixed that below, but that required resizing down copyBuf here when necessary - otherwise there might be unrelated data from previous flushes, causing failed gzip reads.

}

req := esapi.BulkRequest{
Expand Down Expand Up @@ -275,7 +276,7 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error

// Only run the retry logic if document retries are enabled
if b.maxDocumentRetry > 0 {
buf := make([]byte, 0, 1024)
buf := make([]byte, 0, 4096)

// Eliminate previous retry counts that aren't present in the bulk
// request response.
Expand Down Expand Up @@ -307,8 +308,8 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
}

// keep track of the previous newlines
// the buffer is being read lazily
seen := 0
newlinesInBuf := 0 // number of newlines currently in `buf`
seen := 0 // number of newlines previously read (excluding the ones in `buf`)

for _, res := range resp.FailedDocs {
if res.Status == http.StatusTooManyRequests {
Expand Down Expand Up @@ -336,43 +337,53 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
b.retryCounts[b.itemsAdded] = count

if b.gzipw != nil {
// First loop, read from the gzip reader
if len(buf) == 0 {
n, _ := gr.Read(buf[:cap(buf)])
buf = buf[:n]
}

newlines := bytes.Count(buf, []byte{'\n'})

// loop until we've seen the start newline
for seen+newlines < startln {
seen += newlines
n, _ := gr.Read(buf[:cap(buf)])
for seen+newlinesInBuf < startln || len(buf) == 0 {
seen += newlinesInBuf
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return resp, fmt.Errorf("error reading gzip data: %w", err)
}
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
newlinesInBuf = bytes.Count(buf, []byte{'\n'})
if err == io.EOF && seen+newlinesInBuf < startln {
return resp, fmt.Errorf("could not find first newline: %w", err)
}
}

startIdx := indexnth(buf, startln-seen, '\n') + 1
endIdx := indexnth(buf, endln-seen, '\n') + 1

// If the end newline is not in the buffer read more data
if endIdx == 0 {
// loop until we've seen the end newline
for seen+newlines < endln {
// Add more capacity (let append pick how much).
buf = append(buf, 0)[:len(buf)]
if endIdx != 0 {
// The whole document is in buf - write it and move to the next one.
b.writer.Write(buf[startIdx:endIdx])
continue
}

n, _ := gr.Read(buf[len(buf):cap(buf)])
buf = buf[:len(buf)+n]
// We only have the beginning of the document. Write what we have so far.
b.writer.Write(buf[startIdx:])

newlines = bytes.Count(buf, []byte{'\n'})
// Loop until we've seen the end newline.
for seen+newlinesInBuf < endln {
seen += newlinesInBuf
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return resp, fmt.Errorf("error reading gzip data: %w", err)
}
buf = buf[:n]
newlinesInBuf = bytes.Count(buf, []byte{'\n'})
if seen+newlinesInBuf < endln {
if err == io.EOF {
return resp, fmt.Errorf("unable to find end newline in gzip data: %w", err)
}
// We haven't found the end newline yet - write the whole buffer.
b.writer.Write(buf)
}

// try again to find the end newline
endIdx = indexnth(buf, endln-seen, '\n') + 1
}

b.writer.Write(buf[startIdx:endIdx])
// End newline found
endIdx = indexnth(buf, endln-seen, '\n') + 1
b.writer.Write(buf[:endIdx])
} else {
startIdx := indexnth(b.copyBuf[lastIdx:], startln-lastln, '\n') + 1
endIdx := indexnth(b.copyBuf[lastIdx:], endln-lastln, '\n') + 1
Expand Down
Loading