diff --git a/internal/keyfinder.go b/internal/keyfinder.go index 5d0aa00..3e6e1cd 100644 --- a/internal/keyfinder.go +++ b/internal/keyfinder.go @@ -13,43 +13,53 @@ import ( // NER is the error message returned when the input has less fields than the KeyFinder is configured for. const NER = "not enough bytes in record" -// KeyFinder is a slice of small integers representing field numbers; 1-based on the command line, 0-based here. -type KeyFinder []uint +// KeyFinder extracts a key based on the specified fields from a record. fields is a slice of small integers +// representing field numbers; 1-based on the command line, 0-based here. +type KeyFinder struct { + fields []uint + key []byte +} // NewKeyFinder creates a new key finder with the supplied field numbers, the input should be 1 based. +// KeyFinder is not threadsafe, you should Clone it for each goroutine that uses it. func NewKeyFinder(keys []uint) *KeyFinder { - if keys == nil { - return nil + kf := KeyFinder{ + key: make([]byte, 0, 128), } - - var kf KeyFinder for _, knum := range keys { - kf = append(kf, knum-1) + kf.fields = append(kf.fields, knum-1) } return &kf } +// Clone returns a new KeyFinder with the same configuration. Each goroutine should use its own +// KeyFinder instance. +func (kf *KeyFinder) Clone() *KeyFinder { + return &KeyFinder{ + fields: kf.fields, + key: make([]byte, 0, 128), + } +} + // GetKey extracts a key from the supplied record. This is applied to every record, -// so efficiency matters +// so efficiency matters. func (kf *KeyFinder) GetKey(record []byte) ([]byte, error) { // if there are no keyfinders just return the record, minus any trailing newlines - if kf == nil || len(*kf) == 0 { + if len(kf.fields) == 0 { if record[len(record)-1] == '\n' { record = record[0 : len(record)-1] } - // Make a copy of record as record is a pointing at a buffer that will be reused - // and the caller is going to hang onto this. - return append([]byte(nil), record...), nil + return record, nil } var err error - key := make([]byte, 0, 100) + kf.key = kf.key[:0] field := 0 index := 0 first := true // for each field in the key - for _, keyField := range *kf { + for _, keyField := range kf.fields { // bypass fields before the one we want for field < int(keyField) { @@ -64,18 +74,18 @@ func (kf *KeyFinder) GetKey(record []byte) ([]byte, error) { if first { first = false } else { - key = append(key, ' ') + kf.key = append(kf.key, ' ') } // attach desired field to key - key, index, err = gather(key, record, index) + kf.key, index, err = gather(kf.key, record, index) if err != nil { return nil, err } field++ } - return key, err + return kf.key, err } // pull in the bytes from a desired field diff --git a/internal/segmenter.go b/internal/segmenter.go index 519fd07..7fa428a 100644 --- a/internal/segmenter.go +++ b/internal/segmenter.go @@ -126,10 +126,12 @@ func readAll(s *Segment, filter *Filters, kf *KeyFinder, reportCh chan segmentRe reader := bufio.NewReaderSize(s.file, 16*1024) current := s.start counters := newSegmentCounter() + kf = kf.Clone() for current < s.end { - // ReadSlice results are only valid until the next call to Read, so we - // to be careful about how long we hang onto the record slice. - // In this case GetKey needs to never return a direct subslice of the record. + // ReadSlice results are only valid until the next call to Read, so we need + // to be careful about how long we hang onto the record slice. The SegmentCounter + // is the only thing that holds onto data from record, and it has to make a copy + // anyway when it constructs its string key. So this is safe. record, err := reader.ReadSlice('\n') // ReadSlice returns an error if a line doesn't fit in its buffer. We // deal with that by switching to ReadBytes to get the remainder of the line. diff --git a/internal/segmenter_test.go b/internal/segmenter_test.go index d72ae6e..775057b 100644 --- a/internal/segmenter_test.go +++ b/internal/segmenter_test.go @@ -61,7 +61,7 @@ func TestReadAll(t *testing.T) { func TestReadAllLongLine(t *testing.T) { counter := NewCounter(10) - err := ReadFileInSegments("../test/data/long_lines", &Filters{}, counter, nil, 1) + err := ReadFileInSegments("../test/data/long_lines", &Filters{}, counter, NewKeyFinder(nil), 1) if err != nil { t.Fatalf("Failed to process file %v", err) }