diff --git a/pkg/kgo/record_formatter.go b/pkg/kgo/record_formatter.go index 317da27b..082d97f6 100644 --- a/pkg/kgo/record_formatter.go +++ b/pkg/kgo/record_formatter.go @@ -78,6 +78,7 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r * // %o offset // %e leader epoch // %d timestamp (date, formatting described below) +// %a record attributes (formatting required, described below) // %x producer id // %y producer epoch // @@ -124,6 +125,7 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r * // All number verbs accept braces that control how the number is printed: // // %v{ascii} the default, print the number as ascii +// %v{number} alias for ascii // // %v{hex64} print 16 hex characters for the number // %v{hex32} print 8 hex characters for the number @@ -143,6 +145,7 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r * // %v{little8} alias for byte // // %v{byte} print the number as a single byte +// %v{bool} print "true" if the number is non-zero, otherwise "false" // // All numbers are truncated as necessary per each given format. // @@ -161,6 +164,38 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r * // simply passed to the time package's AppendFormat function. For strftime, all // "man strftime" options are supported. Time is always in UTC. // +// Attributes +// +// Records attributes require formatting, where each formatting option selects +// which attribute to print and how to print it. +// +// %a{compression} +// %a{compression;number} +// %a{compression;big64} +// %a{compression;hex8} +// +// By default, prints the compression as text ("none", "gzip", ...). +// Compression can be printed as a number with ";number", where number is any +// number formatting option described above. +// +// %a{timestamp-type} +// %a{timestamp-type;big64} +// +// Prints -1 for pre-0.10 records, 0 for client generated timestamps, and 1 for +// broker generated. Number formatting can be controlled with ";number". +// +// %a{transactional-bit} +// %a{transactional-bit;bool} +// +// Prints 1 if the record is a transaction marker and 0 if the record is not a +// transaction marker. Number formatting can be controlled with ";number". +// +// %a{control-bit} +// %a{control-bit;bool} +// +// Prints 1 if the record is a control record and 0 if the record is not a +// control record. Number formatting can be controlled with ";number". +// // Text // // Topics, keys, and values have "base64", "base64raw", "hex", and "unpack" @@ -197,7 +232,6 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r * // affects everything that follows. It is possible to switch endianness // multiple times. If the parser needs more data than available, or if the more // input remains after '$', an error message will be appended. -// func NewRecordFormatter(layout string) (*RecordFormatter, error) { var f RecordFormatter @@ -377,6 +411,105 @@ func NewRecordFormatter(layout string) (*RecordFormatter, error) { }) } + case 'a': + if !isOpenBrace { + return nil, errors.New("missing open brace sequence on %a signifying how attributes should be written") + } + handledBrace = true + + num := func(skipText string, rfn func(*Record) int64) error { + layout = layout[len(skipText):] + numfn, n, err := parseNumWriteLayout(layout) + if err != nil { + return err + } + layout = layout[n:] + f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte { + return writeR(b, r, func(b []byte, r *Record) []byte { return numfn(b, rfn(r)) }) + }) + return nil + } + bi64 := func(b bool) int64 { + if b { + return 1 + } + return 0 + } + + switch { + case strings.HasPrefix(layout, "compression}"): + layout = layout[len("compression}"):] + f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte { + return writeR(b, r, func(b []byte, r *Record) []byte { + switch r.Attrs.CompressionType() { + case 0: + return append(b, "none"...) + case 1: + return append(b, "gzip"...) + case 2: + return append(b, "snappy"...) + case 3: + return append(b, "lz4"...) + case 4: + return append(b, "zstd"...) + default: + return append(b, "unknown"...) + } + }) + }) + case strings.HasPrefix(layout, "compression;"): + if err := num("compression;", func(r *Record) int64 { return int64(r.Attrs.CompressionType()) }); err != nil { + return nil, err + } + + case strings.HasPrefix(layout, "timestamp-type}"): + layout = layout[len("timestamp-type}"):] + f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte { + return writeR(b, r, func(b []byte, r *Record) []byte { + return strconv.AppendInt(b, int64(r.Attrs.TimestampType()), 10) + }) + }) + case strings.HasPrefix(layout, "timestamp-type;"): + if err := num("timestamp-type;", func(r *Record) int64 { return int64(r.Attrs.TimestampType()) }); err != nil { + return nil, err + } + + case strings.HasPrefix(layout, "transactional-bit}"): + layout = layout[len("transactional-bit}"):] + f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte { + return writeR(b, r, func(b []byte, r *Record) []byte { + if r.Attrs.IsTransactional() { + return append(b, '1') + } else { + return append(b, '0') + } + }) + }) + case strings.HasPrefix(layout, "transactional-bit;"): + if err := num("transactional-bit;", func(r *Record) int64 { return bi64(r.Attrs.IsTransactional()) }); err != nil { + return nil, err + } + + case strings.HasPrefix(layout, "control-bit}"): + layout = layout[len("control-bit}"):] + f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte { + return writeR(b, r, func(b []byte, r *Record) []byte { + if r.Attrs.IsControl() { + return append(b, '1') + } else { + return append(b, '0') + } + }) + }) + case strings.HasPrefix(layout, "control-bit;"): + if err := num("control-bit;", func(r *Record) int64 { return bi64(r.Attrs.IsControl()) }); err != nil { + return nil, err + } + + default: + return nil, errors.New("unknown %a formatting") + } + case 'h': if !isOpenBrace { return nil, errors.New("missing open brace sequence on %h signifying how headers are written") @@ -675,7 +808,7 @@ func parseNumWriteLayout(layout string) (func([]byte, int64) []byte, int, error) } end := braceEnd + 1 switch layout = layout[:braceEnd]; layout { - case "ascii": + case "ascii", "number": return writeNumASCII, end, nil case "hex64": return writeNumHex64, end, nil @@ -703,6 +836,8 @@ func parseNumWriteLayout(layout string) (func([]byte, int64) []byte, int, error) return writeNumLittle32, end, nil case "little16": return writeNumLittle16, end, nil + case "bool": + return writeNumBool, end, nil default: return nil, 0, fmt.Errorf("invalid output number layout %q", layout) } @@ -816,6 +951,13 @@ func writeNumLittle16(b []byte, n int64) []byte { } func writeNumByte(b []byte, n int64) []byte { u := uint64(n); return append(b, byte(u)) } +func writeNumBool(b []byte, n int64) []byte { + if n == 0 { + return append(b, "false"...) + } + return append(b, "true"...) +} + //////////// // READER // //////////// @@ -882,6 +1024,7 @@ type RecordReader struct { // All size numbers can be parsed in the following ways: // // %v{ascii} parse numeric digits until a non-numeric +// %v{number} alias for ascii // // %v{hex64} read 16 hex characters for the number // %v{hex32} read 8 hex characters for the number @@ -900,7 +1043,7 @@ type RecordReader struct { // %v{little8} read the number as a byte // // %v{byte} read the number as a byte -// +// %v{bool} read "true" as 1, "false" as 0 // %v{3} read 3 characters (any number) // // Header specification @@ -1331,9 +1474,15 @@ func (*RecordReader) parseReadSize(layout string, dst *uint64, needBrace bool) ( func([]byte, *Record) error { *dst = uint64(num); return nil }, }, end, nil - case "ascii": + case "ascii", "number": return readParse{ - readKind{condition: func(b byte) bool { return b < '0' || b > '9' }}, + readKind{condition: func(b byte) int8 { + if b < '0' || b > '9' { + return -1 + } else { + return 2 // ignore EOF if we hit it after this + } + }}, func(b []byte, _ *Record) (err error) { *dst, err = strconv.ParseUint(kbin.UnsafeString(b), 10, 64) return err @@ -1418,6 +1567,68 @@ func (*RecordReader) parseReadSize(layout string, dst *uint64, needBrace bool) ( return err }, }, end, nil + + case "bool": + const ( + stateUnknown uint8 = iota + stateTrue + stateFalse + ) + var state uint8 + var last byte + return readParse{ + readKind{condition: func(b byte) (done int8) { + defer func() { + if done <= 0 { + state = stateUnknown + last = 0 + } + }() + + switch state { + default: // stateUnknown + if b == 't' { + state = stateTrue + last = b + return 1 + } else if b == 'f' { + state = stateFalse + last = b + return 1 + } + return -1 + + case stateTrue: + if last == 't' && b == 'r' || last == 'r' && b == 'u' { + last = b + return 1 + } else if last == 'u' && b == 'e' { + return 0 + } + return -1 + + case stateFalse: + if last == 'f' && b == 'a' || last == 'a' && b == 'l' || last == 'l' && b == 's' { + last = b + return 1 + } else if last == 's' && b == 'e' { + return 0 + } + return -1 + } + }}, + func(b []byte, _ *Record) error { + switch string(b) { + case "true": + *dst = 1 + case "false": + *dst = 0 + default: + return fmt.Errorf("invalid bool %s", b) + } + return nil + }, + }, end, nil } } @@ -1434,7 +1645,7 @@ func decodeHex(b []byte) ([]byte, error) { type readKind struct { noread bool exact []byte - condition func(byte) bool + condition func(byte) int8 // -1: stop, do not consume input; 0: stop, consume input; 1: keep going, consume input, 2: keep going, consume input, can EOF size int sizefn func() int handoff func(*RecordReader, *Record) error @@ -1519,15 +1730,28 @@ func (r *RecordReader) next(rec *Record) error { return nil } -func (r *RecordReader) readCondition(fn func(byte) bool) error { +func (r *RecordReader) readCondition(fn func(byte) int8) error { + var ignoreEOF bool for { peek, err := r.r.Peek(1) if err != nil { + if err == io.EOF && ignoreEOF { + err = nil + } return err } + ignoreEOF = false c := peek[0] - if fn(c) { + switch fn(c) { + case -1: + return nil + case 0: + r.r.Discard(1) + r.buf = append(r.buf, c) return nil + case 1: + case 2: + ignoreEOF = true } r.r.Discard(1) r.buf = append(r.buf, c) diff --git a/pkg/kgo/record_formatter_test.go b/pkg/kgo/record_formatter_test.go index b3d6fa67..802d4869 100644 --- a/pkg/kgo/record_formatter_test.go +++ b/pkg/kgo/record_formatter_test.go @@ -20,6 +20,7 @@ func TestRecordFormatter(t *testing.T) { Timestamp: time.Unix(17, 0), Topic: "topictopictopictopictopict", Partition: 3, + Attrs: RecordAttrs{attrs: 0b1011_0011}, // no timestamp type, control, txnal, lz4 ProducerEpoch: 1, ProducerID: 791, LeaderEpoch: -1, @@ -98,6 +99,21 @@ func TestRecordFormatter(t *testing.T) { expR: "2 2 H1 V1 2 2 h2 v2 2 key value", }, + { + layout: "%a{compression}", + expR: "lz4", + }, + + { + layout: "%a{compression;number} %a{transactional-bit;bool} %a{control-bit;hex8} %a{timestamp-type;hex8}", + expR: "3 true 01 ff", + }, + + { + layout: "%a{compression} %a{transactional-bit} %a{control-bit} %a{timestamp-type}", + expR: "lz4 1 1 -1", + }, + // } { f, err := NewRecordFormatter(test.layout) @@ -362,6 +378,36 @@ func TestRecordReader(t *testing.T) { exp: []*Record{StringRecord("foo")}, }, + { + layout: "%V{bool}%v", + in: "truee", + exp: []*Record{StringRecord("e")}, + }, + + { + layout: "%V{bool}%ve", + in: "trueee", + exp: []*Record{StringRecord("e")}, + }, + + { + layout: "%V{bool}%v", + in: "false", + exp: []*Record{StringRecord("")}, + }, + + { + layout: "%V{bool}%v %K{bool}%k", + in: "false true1", + exp: []*Record{KeyStringRecord("1", "")}, + }, + + { + layout: "%V{bool}%v %K{ascii}%k", + in: "false 0", + exp: []*Record{KeyStringRecord("", "")}, + }, + { layout: "%V %v{hex}asdf", in: "6 6b6579asdf", @@ -506,6 +552,24 @@ func TestRecordReader(t *testing.T) { expErr: true, }, + { + layout: `a%V{bool}%v`, + in: "abc", + expErr: true, + }, + + { + layout: `a%V{bool}%v`, + in: "atruze", + expErr: true, + }, + + { + layout: `a%V{bool}%v`, + in: "afalsn", + expErr: true, + }, + { layout: `%o{hex8}`, in: "az",