Skip to content

Commit

Permalink
perf: try harder to operate on new data (#124)
Browse files Browse the repository at this point in the history
* perf: avoid rereading the buffer from the start for noncompressed requests

Instead of rereading the buffer from the beginning start from
where we left off with the previous failed item.

* perf: reuse gzip reader and avoid rereading buffer for compressed request

Similar to the other change for uncompressed requests, do not reread
the buffer from the beginning and start where we left off.

The gzip reader is being reused and more data is read lazily.

* perf: try harder to operate on new data

With gzip the buffer can grow a lot, leading to a lot more work when
counting newlines. To improve performance we discard everything up to the
previous document when preparing the buffer for the next document.

* refactor: reduce complexity and reuse the buffer more

* refactor: remove endIdx optimization

* refactor: readd first loop condition
  • Loading branch information
kruskall authored Mar 13, 2024
1 parent 1df4ab2 commit 5a759bb
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
buf = buf[:n]
}

// newlines in the current buf
newlines := bytes.Count(buf, []byte{'\n'})

// loop until we've seen the start newline
Expand All @@ -357,22 +358,29 @@ func (b *bulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error

// If the end newline is not in the buffer read more data
if endIdx == 0 {
// Write what we have
b.writer.Write(buf[startIdx:])

// loop until we've seen the end newline
for seen+newlines < endln {
// Add more capacity (let append pick how much).
buf = append(buf, 0)[:len(buf)]

n, _ := gr.Read(buf[len(buf):cap(buf)])
buf = buf[:len(buf)+n]

seen += newlines
n, _ := gr.Read(buf[:cap(buf)])
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
if seen+newlines < endln {
// endln is not here, write what we have and keep going
b.writer.Write(buf)
}
}

// try again to find the end newline
// try again to find the end newline in the extra data
// we just read.
endIdx = indexnth(buf, endln-seen, '\n') + 1
b.writer.Write(buf[:endIdx])
} else {
// If the end newline is in the buffer write the event
b.writer.Write(buf[startIdx:endIdx])
}

b.writer.Write(buf[startIdx:endIdx])
} else {
startIdx := indexnth(b.copyBuf[lastIdx:], startln-lastln, '\n') + 1
endIdx := indexnth(b.copyBuf[lastIdx:], endln-lastln, '\n') + 1
Expand Down

0 comments on commit 5a759bb

Please sign in to comment.