Skip to content

Commit

Permalink
RecordFormatter: support %a; formatter&reader: support 'bool'
Browse files Browse the repository at this point in the history
Attributes are special in that there is no obvious way to print them:
a single record has many attributes that all mean different things.

We instead add a required formatting specification that selects which
attribute to print. The specification can have ; to separate _how_ to
print the attribute.

We also support formatting booleans as 'true' or 'false', and reading
'true' as 1 and 'false' as 0. The reading is a bit complicated and uses
some weird closure. The code is ..a bit ugly.. but it is fun and tested.
  • Loading branch information
twmb committed Oct 10, 2022
1 parent 5a38292 commit bbac68b
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 8 deletions.
240 changes: 232 additions & 8 deletions pkg/kgo/record_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r *
// %o offset
// %e leader epoch
// %d timestamp (date, formatting described below)
// %a record attributes (formatting required, described below)
// %x producer id
// %y producer epoch
//
Expand Down Expand Up @@ -124,6 +125,7 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r *
// All number verbs accept braces that control how the number is printed:
//
// %v{ascii} the default, print the number as ascii
// %v{number} alias for ascii
//
// %v{hex64} print 16 hex characters for the number
// %v{hex32} print 8 hex characters for the number
Expand All @@ -143,6 +145,7 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r *
// %v{little8} alias for byte
//
// %v{byte} print the number as a single byte
// %v{bool} print "true" if the number is non-zero, otherwise "false"
//
// All numbers are truncated as necessary per each given format.
//
Expand All @@ -161,6 +164,38 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r *
// simply passed to the time package's AppendFormat function. For strftime, all
// "man strftime" options are supported. Time is always in UTC.
//
// Attributes
//
// Records attributes require formatting, where each formatting option selects
// which attribute to print and how to print it.
//
// %a{compression}
// %a{compression;number}
// %a{compression;big64}
// %a{compression;hex8}
//
// By default, prints the compression as text ("none", "gzip", ...).
// Compression can be printed as a number with ";number", where number is any
// number formatting option described above.
//
// %a{timestamp-type}
// %a{timestamp-type;big64}
//
// Prints -1 for pre-0.10 records, 0 for client generated timestamps, and 1 for
// broker generated. Number formatting can be controlled with ";number".
//
// %a{transactional-bit}
// %a{transactional-bit;bool}
//
// Prints 1 if the record is a transaction marker and 0 if the record is not a
// transaction marker. Number formatting can be controlled with ";number".
//
// %a{control-bit}
// %a{control-bit;bool}
//
// Prints 1 if the record is a control record and 0 if the record is not a
// control record. Number formatting can be controlled with ";number".
//
// Text
//
// Topics, keys, and values have "base64", "base64raw", "hex", and "unpack"
Expand Down Expand Up @@ -197,7 +232,6 @@ func (f *RecordFormatter) AppendPartitionRecord(b []byte, p *FetchPartition, r *
// affects everything that follows. It is possible to switch endianness
// multiple times. If the parser needs more data than available, or if the more
// input remains after '$', an error message will be appended.
//
func NewRecordFormatter(layout string) (*RecordFormatter, error) {
var f RecordFormatter

Expand Down Expand Up @@ -377,6 +411,105 @@ func NewRecordFormatter(layout string) (*RecordFormatter, error) {
})
}

case 'a':
if !isOpenBrace {
return nil, errors.New("missing open brace sequence on %a signifying how attributes should be written")
}
handledBrace = true

num := func(skipText string, rfn func(*Record) int64) error {
layout = layout[len(skipText):]
numfn, n, err := parseNumWriteLayout(layout)
if err != nil {
return err
}
layout = layout[n:]
f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte {
return writeR(b, r, func(b []byte, r *Record) []byte { return numfn(b, rfn(r)) })
})
return nil
}
bi64 := func(b bool) int64 {
if b {
return 1
}
return 0
}

switch {
case strings.HasPrefix(layout, "compression}"):
layout = layout[len("compression}"):]
f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte {
return writeR(b, r, func(b []byte, r *Record) []byte {
switch r.Attrs.CompressionType() {
case 0:
return append(b, "none"...)
case 1:
return append(b, "gzip"...)
case 2:
return append(b, "snappy"...)
case 3:
return append(b, "lz4"...)
case 4:
return append(b, "zstd"...)
default:
return append(b, "unknown"...)
}
})
})
case strings.HasPrefix(layout, "compression;"):
if err := num("compression;", func(r *Record) int64 { return int64(r.Attrs.CompressionType()) }); err != nil {
return nil, err
}

case strings.HasPrefix(layout, "timestamp-type}"):
layout = layout[len("timestamp-type}"):]
f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte {
return writeR(b, r, func(b []byte, r *Record) []byte {
return strconv.AppendInt(b, int64(r.Attrs.TimestampType()), 10)
})
})
case strings.HasPrefix(layout, "timestamp-type;"):
if err := num("timestamp-type;", func(r *Record) int64 { return int64(r.Attrs.TimestampType()) }); err != nil {
return nil, err
}

case strings.HasPrefix(layout, "transactional-bit}"):
layout = layout[len("transactional-bit}"):]
f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte {
return writeR(b, r, func(b []byte, r *Record) []byte {
if r.Attrs.IsTransactional() {
return append(b, '1')
} else {
return append(b, '0')
}
})
})
case strings.HasPrefix(layout, "transactional-bit;"):
if err := num("transactional-bit;", func(r *Record) int64 { return bi64(r.Attrs.IsTransactional()) }); err != nil {
return nil, err
}

case strings.HasPrefix(layout, "control-bit}"):
layout = layout[len("control-bit}"):]
f.fns = append(f.fns, func(b []byte, _ *FetchPartition, r *Record) []byte {
return writeR(b, r, func(b []byte, r *Record) []byte {
if r.Attrs.IsControl() {
return append(b, '1')
} else {
return append(b, '0')
}
})
})
case strings.HasPrefix(layout, "control-bit;"):
if err := num("control-bit;", func(r *Record) int64 { return bi64(r.Attrs.IsControl()) }); err != nil {
return nil, err
}

default:
return nil, errors.New("unknown %a formatting")
}

case 'h':
if !isOpenBrace {
return nil, errors.New("missing open brace sequence on %h signifying how headers are written")
Expand Down Expand Up @@ -675,7 +808,7 @@ func parseNumWriteLayout(layout string) (func([]byte, int64) []byte, int, error)
}
end := braceEnd + 1
switch layout = layout[:braceEnd]; layout {
case "ascii":
case "ascii", "number":
return writeNumASCII, end, nil
case "hex64":
return writeNumHex64, end, nil
Expand Down Expand Up @@ -703,6 +836,8 @@ func parseNumWriteLayout(layout string) (func([]byte, int64) []byte, int, error)
return writeNumLittle32, end, nil
case "little16":
return writeNumLittle16, end, nil
case "bool":
return writeNumBool, end, nil
default:
return nil, 0, fmt.Errorf("invalid output number layout %q", layout)
}
Expand Down Expand Up @@ -816,6 +951,13 @@ func writeNumLittle16(b []byte, n int64) []byte {
}
func writeNumByte(b []byte, n int64) []byte { u := uint64(n); return append(b, byte(u)) }

func writeNumBool(b []byte, n int64) []byte {
if n == 0 {
return append(b, "false"...)
}
return append(b, "true"...)
}

////////////
// READER //
////////////
Expand Down Expand Up @@ -882,6 +1024,7 @@ type RecordReader struct {
// All size numbers can be parsed in the following ways:
//
// %v{ascii} parse numeric digits until a non-numeric
// %v{number} alias for ascii
//
// %v{hex64} read 16 hex characters for the number
// %v{hex32} read 8 hex characters for the number
Expand All @@ -900,7 +1043,7 @@ type RecordReader struct {
// %v{little8} read the number as a byte
//
// %v{byte} read the number as a byte
//
// %v{bool} read "true" as 1, "false" as 0
// %v{3} read 3 characters (any number)
//
// Header specification
Expand Down Expand Up @@ -1331,9 +1474,15 @@ func (*RecordReader) parseReadSize(layout string, dst *uint64, needBrace bool) (
func([]byte, *Record) error { *dst = uint64(num); return nil },
}, end, nil

case "ascii":
case "ascii", "number":
return readParse{
readKind{condition: func(b byte) bool { return b < '0' || b > '9' }},
readKind{condition: func(b byte) int8 {
if b < '0' || b > '9' {
return -1
} else {
return 2 // ignore EOF if we hit it after this
}
}},
func(b []byte, _ *Record) (err error) {
*dst, err = strconv.ParseUint(kbin.UnsafeString(b), 10, 64)
return err
Expand Down Expand Up @@ -1418,6 +1567,68 @@ func (*RecordReader) parseReadSize(layout string, dst *uint64, needBrace bool) (
return err
},
}, end, nil

case "bool":
const (
stateUnknown uint8 = iota
stateTrue
stateFalse
)
var state uint8
var last byte
return readParse{
readKind{condition: func(b byte) (done int8) {
defer func() {
if done <= 0 {
state = stateUnknown
last = 0
}
}()

switch state {
default: // stateUnknown
if b == 't' {
state = stateTrue
last = b
return 1
} else if b == 'f' {
state = stateFalse
last = b
return 1
}
return -1

case stateTrue:
if last == 't' && b == 'r' || last == 'r' && b == 'u' {
last = b
return 1
} else if last == 'u' && b == 'e' {
return 0
}
return -1

case stateFalse:
if last == 'f' && b == 'a' || last == 'a' && b == 'l' || last == 'l' && b == 's' {
last = b
return 1
} else if last == 's' && b == 'e' {
return 0
}
return -1
}
}},
func(b []byte, _ *Record) error {
switch string(b) {
case "true":
*dst = 1
case "false":
*dst = 0
default:
return fmt.Errorf("invalid bool %s", b)
}
return nil
},
}, end, nil
}
}

Expand All @@ -1434,7 +1645,7 @@ func decodeHex(b []byte) ([]byte, error) {
type readKind struct {
noread bool
exact []byte
condition func(byte) bool
condition func(byte) int8 // -1: stop, do not consume input; 0: stop, consume input; 1: keep going, consume input, 2: keep going, consume input, can EOF
size int
sizefn func() int
handoff func(*RecordReader, *Record) error
Expand Down Expand Up @@ -1519,15 +1730,28 @@ func (r *RecordReader) next(rec *Record) error {
return nil
}

func (r *RecordReader) readCondition(fn func(byte) bool) error {
func (r *RecordReader) readCondition(fn func(byte) int8) error {
var ignoreEOF bool
for {
peek, err := r.r.Peek(1)
if err != nil {
if err == io.EOF && ignoreEOF {
err = nil
}
return err
}
ignoreEOF = false
c := peek[0]
if fn(c) {
switch fn(c) {
case -1:
return nil
case 0:
r.r.Discard(1)
r.buf = append(r.buf, c)
return nil
case 1:
case 2:
ignoreEOF = true
}
r.r.Discard(1)
r.buf = append(r.buf, c)
Expand Down
Loading

0 comments on commit bbac68b

Please sign in to comment.