Skip to content

Commit

Permalink
KeyFinder reuses a buffer for generating keys
Browse files Browse the repository at this point in the history
  • Loading branch information
superfell committed Apr 1, 2021
1 parent e9001af commit f6cf869
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 21 deletions.
44 changes: 27 additions & 17 deletions internal/keyfinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,53 @@ import (
// NER is the error message returned when the input has less fields than the KeyFinder is configured for.
const NER = "not enough bytes in record"

// KeyFinder is a slice of small integers representing field numbers; 1-based on the command line, 0-based here.
type KeyFinder []uint
// KeyFinder extracts a key based on the specified fields from a record. fields is a slice of small integers
// representing field numbers; 1-based on the command line, 0-based here.
type KeyFinder struct {
fields []uint
key []byte
}

// NewKeyFinder creates a new key finder with the supplied field numbers, the input should be 1 based.
// KeyFinder is not threadsafe, you should Clone it for each goroutine that uses it.
func NewKeyFinder(keys []uint) *KeyFinder {
if keys == nil {
return nil
kf := KeyFinder{
key: make([]byte, 0, 128),
}

var kf KeyFinder
for _, knum := range keys {
kf = append(kf, knum-1)
kf.fields = append(kf.fields, knum-1)
}
return &kf
}

// Clone returns a new KeyFinder with the same configuration. Each goroutine should use its own
// KeyFinder instance.
func (kf *KeyFinder) Clone() *KeyFinder {
return &KeyFinder{
fields: kf.fields,
key: make([]byte, 0, 128),
}
}

// GetKey extracts a key from the supplied record. This is applied to every record,
// so efficiency matters
// so efficiency matters.
func (kf *KeyFinder) GetKey(record []byte) ([]byte, error) {
// if there are no keyfinders just return the record, minus any trailing newlines
if kf == nil || len(*kf) == 0 {
if len(kf.fields) == 0 {
if record[len(record)-1] == '\n' {
record = record[0 : len(record)-1]
}
// Make a copy of record as record is a pointing at a buffer that will be reused
// and the caller is going to hang onto this.
return append([]byte(nil), record...), nil
return record, nil
}

var err error
key := make([]byte, 0, 100)
kf.key = kf.key[:0]
field := 0
index := 0
first := true

// for each field in the key
for _, keyField := range *kf {
for _, keyField := range kf.fields {

// bypass fields before the one we want
for field < int(keyField) {
Expand All @@ -64,18 +74,18 @@ func (kf *KeyFinder) GetKey(record []byte) ([]byte, error) {
if first {
first = false
} else {
key = append(key, ' ')
kf.key = append(kf.key, ' ')
}

// attach desired field to key
key, index, err = gather(key, record, index)
kf.key, index, err = gather(kf.key, record, index)
if err != nil {
return nil, err
}

field++
}
return key, err
return kf.key, err
}

// pull in the bytes from a desired field
Expand Down
8 changes: 5 additions & 3 deletions internal/segmenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ func readAll(s *Segment, filter *Filters, kf *KeyFinder, reportCh chan segmentRe
reader := bufio.NewReaderSize(s.file, 16*1024)
current := s.start
counters := newSegmentCounter()
kf = kf.Clone()
for current < s.end {
// ReadSlice results are only valid until the next call to Read, so we
// to be careful about how long we hang onto the record slice.
// In this case GetKey needs to never return a direct subslice of the record.
// ReadSlice results are only valid until the next call to Read, so we need
// to be careful about how long we hang onto the record slice. The SegmentCounter
// is the only thing that holds onto data from record, and it has to make a copy
// anyway when it constructs its string key. So this is safe.
record, err := reader.ReadSlice('\n')
// ReadSlice returns an error if a line doesn't fit in its buffer. We
// deal with that by switching to ReadBytes to get the remainder of the line.
Expand Down
2 changes: 1 addition & 1 deletion internal/segmenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestReadAll(t *testing.T) {

func TestReadAllLongLine(t *testing.T) {
counter := NewCounter(10)
err := ReadFileInSegments("../test/data/long_lines", &Filters{}, counter, nil, 1)
err := ReadFileInSegments("../test/data/long_lines", &Filters{}, counter, NewKeyFinder(nil), 1)
if err != nil {
t.Fatalf("Failed to process file %v", err)
}
Expand Down

0 comments on commit f6cf869

Please sign in to comment.