Skip to content

Commit

Permalink
colblk: make decoder function signatures consistent
Browse files Browse the repository at this point in the history
Unify the various functions that decode data structures from a columnar block
so that they all have consistent function signatures, and define a generic
DecodeFunc that captures this consistency. In future work, the DecodeFunc will
be used to aid in composing decoders.
  • Loading branch information
jbowens committed Jul 19, 2024
1 parent dbf9ab6 commit 6492e86
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 66 deletions.
20 changes: 12 additions & 8 deletions sstable/colblk/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,26 @@ type Bitmap struct {
bitCount int
}

// MakeBitmap returns a Bitmap that reads from b supporting bitCount logical
// bits. No bounds checking is performed, so the caller must guarantee the
// bitmap is appropriately sized and the provided bitCount correctly identifies
// the number of bits in the bitmap.
func MakeBitmap(b []byte, off uint32, bitCount int) Bitmap {
if len(b) < int(off)+bitmapRequiredSize(bitCount) {
// DecodeBitmap decodes the structure of a Bitmap and returns a Bitmap that
// reads from b supporting bitCount logical bits. No bounds checking is
// performed, so the caller must guarantee the bitmap is appropriately sized and
// the provided bitCount correctly identifies the number of bits in the bitmap.
func DecodeBitmap(b []byte, off uint32, bitCount int) (bitmap Bitmap, endOffset uint32) {
sz := bitmapRequiredSize(bitCount)
off = align(off, align64)
if len(b) < int(off)+sz {
panic(errors.AssertionFailedf("bitmap of %d bits requires at least %d bytes; provided with %d-byte slice",
bitCount, bitmapRequiredSize(bitCount), len(b[off:])))
}
off = align(off, align64)
return Bitmap{
data: makeUnsafeRawSlice[uint64](unsafe.Pointer(&b[off])),
bitCount: bitCount,
}
}, off + uint32(sz)
}

// Assert that DecodeBitmap implements DecodeFunc.
var _ DecodeFunc[Bitmap] = DecodeBitmap

// Get returns true if the bit at position i is set and false otherwise.
func (b Bitmap) Get(i int) bool {
return (b.data.At(i>>6 /* i/64 */) & (1 << uint(i%64))) != 0
Expand Down
6 changes: 4 additions & 2 deletions sstable/colblk/bitmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func TestBitmapFixed(t *testing.T) {
}

_ = builder.Finish(0, n, 0, data)
bitmap = MakeBitmap(data, 0, n)
bitmap, _ = DecodeBitmap(data, 0, n)
dumpBitmap(&buf, bitmap)
fmt.Fprint(&buf, "\nBinary representation:\n")
f := binfmt.New(data)
Expand Down Expand Up @@ -100,7 +101,8 @@ func TestBitmapRandom(t *testing.T) {
}
data := make([]byte, builder.Size(size, 0))
_ = builder.Finish(0, size, 0, data)
bitmap := MakeBitmap(data, 0, size)
bitmap, endOffset := DecodeBitmap(data, 0, size)
require.Equal(t, uint32(len(data)), endOffset)
for i := 0; i < size; i++ {
if got := bitmap.Get(i); got != v[i] {
t.Fatalf("b.Get(%d) = %t; want %t", i, got, v[i])
Expand Down
61 changes: 28 additions & 33 deletions sstable/colblk/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,22 @@ func FinishBlock(rows int, writers []ColumnWriter) []byte {
return buf
}

// DecodeColumn decodes the col'th column of the provided reader's block as a
// column of dataType using decodeFunc.
func DecodeColumn[V any](r *BlockReader, col int, dataType DataType, decodeFunc DecodeFunc[V]) V {
if uint16(col) >= r.header.Columns {
panic(errors.AssertionFailedf("column %d is out of range [0, %d)", col, r.header.Columns))
}
if dt := r.dataType(col); dt != dataType {
panic(errors.AssertionFailedf("column %d is type %s; not %s", col, dt, dataType))
}
v, endOffset := decodeFunc(r.data, r.pageStart(col), int(r.header.Rows))
if nextColumnOff := r.pageStart(col + 1); endOffset != nextColumnOff {
panic(errors.AssertionFailedf("column %d decoded to offset %d; expected %d", col, endOffset, nextColumnOff))
}
return v
}

// A BlockReader holds metadata for accessing the columns of a columnar block.
type BlockReader struct {
data []byte
Expand Down Expand Up @@ -196,74 +212,53 @@ func (r *BlockReader) DataType(col int) DataType {
if uint16(col) >= r.header.Columns {
panic(errors.AssertionFailedf("column %d is out of range [0, %d)", col, r.header.Columns))
}
return DataType(*(*uint8)(r.pointer(r.customHeaderSize + 7 + 5*uint32(col))))
return r.dataType(col)
}

func (r *BlockReader) dataType(col int) DataType {
return DataType(*(*uint8)(r.pointer(r.customHeaderSize + blockHeaderBaseSize + columnHeaderSize*uint32(col))))
}

// Bitmap retrieves the col'th column as a bitmap. The column must be of type
// DataTypeBool.
func (r *BlockReader) Bitmap(col int) Bitmap {
if dt := r.DataType(col); dt != DataTypeBool {
panic(errors.AssertionFailedf("column %d is not a Bitmap; holds data type %s", dt))
}
return MakeBitmap(r.data, r.pageStart(col), int(r.header.Rows))
return DecodeColumn(r, col, DataTypeBool, DecodeBitmap)
}

// RawBytes retrieves the col'th column as a column of byte slices. The column
// must be of type DataTypeBytes.
func (r *BlockReader) RawBytes(col int) RawBytes {
if dt := r.DataType(col); dt != DataTypeBytes {
panic(errors.AssertionFailedf("column %d is not a RawBytes column; holds data type %s", dt))
}
return MakeRawBytes(int(r.header.Rows), r.data, r.pageStart(col))
return DecodeColumn(r, col, DataTypeBytes, DecodeRawBytes)
}

// PrefixBytes retrieves the col'th column as a prefix-compressed byte slice column. The column
// must be of type DataTypePrefixBytes.
func (r *BlockReader) PrefixBytes(col int) PrefixBytes {
if dt := r.DataType(col); dt != DataTypePrefixBytes {
panic(errors.AssertionFailedf("column %d is not a PrefixBytes column; holds data type %s", dt))
}
return MakePrefixBytes(int(r.header.Rows), r.data, r.pageStart(col))
return DecodeColumn(r, col, DataTypePrefixBytes, DecodePrefixBytes)
}

// Uint8s retrieves the col'th column as a column of uint8s. The column must be
// of type DataTypeUint8.
func (r *BlockReader) Uint8s(col int) UnsafeUint8s {
if dt := r.DataType(col); dt != DataTypeUint8 {
panic(errors.AssertionFailedf("column %d is not a Uint8 column; holds data type %s", col, dt))
}
_, s := readUnsafeIntegerSlice[uint8](int(r.header.Rows), r.data, r.pageStart(col))
return s
return DecodeColumn(r, col, DataTypeUint8, DecodeUnsafeIntegerSlice[uint8])
}

// Uint16s retrieves the col'th column as a column of uint8s. The column must be
// of type DataTypeUint16.
func (r *BlockReader) Uint16s(col int) UnsafeUint16s {
if dt := r.DataType(col); dt != DataTypeUint16 {
panic(errors.AssertionFailedf("column %d is not a Uint16 column; holds data type %s", col, dt))
}
_, s := readUnsafeIntegerSlice[uint16](int(r.header.Rows), r.data, r.pageStart(col))
return s
return DecodeColumn(r, col, DataTypeUint16, DecodeUnsafeIntegerSlice[uint16])
}

// Uint32s retrieves the col'th column as a column of uint32s. The column must be
// of type DataTypeUint32.
func (r *BlockReader) Uint32s(col int) UnsafeUint32s {
if dt := r.DataType(col); dt != DataTypeUint32 {
panic(errors.AssertionFailedf("column %d is not a Uint32 column; holds data type %s", col, dt))
}
_, s := readUnsafeIntegerSlice[uint32](int(r.header.Rows), r.data, r.pageStart(col))
return s
return DecodeColumn(r, col, DataTypeUint32, DecodeUnsafeIntegerSlice[uint32])
}

// Uint64s retrieves the col'th column as a column of uint64s. The column must be
// of type DataTypeUint64.
func (r *BlockReader) Uint64s(col int) UnsafeUint64s {
if dt := r.DataType(col); dt != DataTypeUint64 {
panic(errors.AssertionFailedf("column %d is not a Uint64 column; holds data type %s", col, dt))
}
_, s := readUnsafeIntegerSlice[uint64](int(r.header.Rows), r.data, r.pageStart(col))
return s
return DecodeColumn(r, col, DataTypeUint64, DecodeUnsafeIntegerSlice[uint64])
}

func (r *BlockReader) pageStart(col int) uint32 {
Expand Down
6 changes: 6 additions & 0 deletions sstable/colblk/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ type Encoder interface {
// state to the provided writer.
WriteDebug(w io.Writer, rows int)
}

// A DecodeFunc decodes a data structure from a byte slice, returning an
// accessor for the data and the offset of the first byte after the structure.
// The rows argument must be number of logical rows encoded within the data
// structure.
type DecodeFunc[T any] func(buf []byte, offset uint32, rows int) (decoded T, nextOffset uint32)
21 changes: 14 additions & 7 deletions sstable/colblk/prefix_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,13 @@ type PrefixBytes struct {
rawBytes RawBytes
}

// MakePrefixBytes constructs an accessor for an array of lexicographically
// sorted byte slices constructed by PrefixBytesBuilder. Count must be the
// number of logical slices within the array.
func MakePrefixBytes(count int, b []byte, offset uint32) PrefixBytes {
// DecodePrefixBytes decodes the structure of a PrefixBytes, constructing an
// accessor for an array of lexicographically sorted byte slices constructed by
// PrefixBytesBuilder. Count must be the number of logical slices within the
// array.
func DecodePrefixBytes(
b []byte, offset uint32, count int,
) (prefixBytes PrefixBytes, endOffset uint32) {
if count == 0 {
panic(errors.AssertionFailedf("empty PrefixBytes"))
}
Expand All @@ -185,19 +188,23 @@ func MakePrefixBytes(count int, b []byte, offset uint32) PrefixBytes {
calc := makeBundleCalc(bundleShift)
nBundles := calc.bundleCount(count)

rb, endOffset := DecodeRawBytes(b, offset+1, count+nBundles)
pb := PrefixBytes{
bundleCalc: calc,
rows: count,
rawBytes: MakeRawBytes(count+nBundles, b, offset+1),
rawBytes: rb,
}
// We always set the base to zero.
if pb.rawBytes.offsets.base != 0 {
panic(errors.AssertionFailedf("unexpected non-zero base in offsets"))
}
pb.sharedPrefixLen = int(pb.rawBytes.offsets.At(0))
return pb
return pb, endOffset
}

// Assert that DecodePrefixBytes implements DecodeFunc.
var _ DecodeFunc[PrefixBytes] = DecodePrefixBytes

// SharedPrefix return a []byte of the shared prefix that was extracted from
// all of the values in the Bytes vector. The returned slice should not be
// mutated.
Expand Down Expand Up @@ -419,7 +426,7 @@ func prefixBytesToBinFormatter(f *binfmt.Formatter, count int, sliceFormatter fu
if sliceFormatter == nil {
sliceFormatter = defaultSliceFormatter
}
pb := MakePrefixBytes(count, f.Data(), uint32(f.Offset()))
pb, _ := DecodePrefixBytes(f.Data(), uint32(f.Offset()), count)
f.CommentLine("PrefixBytes")
f.HexBytesln(1, "bundleSize: %d", 1<<pb.bundleShift)
f.CommentLine("Offsets table")
Expand Down
9 changes: 6 additions & 3 deletions sstable/colblk/prefix_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func TestPrefixBytes(t *testing.T) {

f := binfmt.New(buf)
prefixBytesToBinFormatter(f, rows, nil)
pb = MakePrefixBytes(rows, buf, 0)
var endOffset uint32
pb, endOffset = DecodePrefixBytes(buf, 0, rows)
require.Equal(t, offset, endOffset)
require.Equal(t, rows, pb.Rows())
return f.String()
case "get":
Expand Down Expand Up @@ -169,7 +171,8 @@ func TestPrefixBytesRandomized(t *testing.T) {
t.Logf("Size: %d; NumUserKeys: %d (%d); Aggregate pre-compressed string data: %d",
size, n, len(userKeys), totalSize)

pb := MakePrefixBytes(n, buf, 0)
pb, endOffset := DecodePrefixBytes(buf, 0, n)
require.Equal(t, offset, endOffset)
f := binfmt.New(buf)
prefixBytesToBinFormatter(f, n, nil)
t.Logf("PrefixBytes:\n%s", f.String())
Expand Down Expand Up @@ -268,7 +271,7 @@ func BenchmarkPrefixBytes(b *testing.B) {
b.Run("iteration", func(b *testing.B) {
n := len(userKeys)
buf = build(n)
pb := MakePrefixBytes(n, buf, 0)
pb, _ := DecodePrefixBytes(buf, 0, n)
b.ResetTimer()
for i := 0; i < b.N; i++ {
k := append([]byte(nil), pb.SharedPrefix()...)
Expand Down
18 changes: 11 additions & 7 deletions sstable/colblk/raw_bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,25 @@ type RawBytes struct {
data unsafe.Pointer
}

// MakeRawBytes constructs an accessor for an array of byte slices constructed
// by RawBytesBuilder. Count must be the number of byte slices within the array.
func MakeRawBytes(count int, b []byte, offset uint32) RawBytes {
// DecodeRawBytes decodes the structure of a RawBytes, constructing an accessor
// for an array of byte slices constructed by RawBytesBuilder. Count must be the
// number of byte slices within the array.
func DecodeRawBytes(b []byte, offset uint32, count int) (rawBytes RawBytes, endOffset uint32) {
if count == 0 {
return RawBytes{}
return RawBytes{}, 0
}
dataOff, offsets := readUnsafeIntegerSlice[uint32](count+1 /* +1 offset */, b, offset)
offsets, dataOff := DecodeUnsafeIntegerSlice[uint32](b, offset, count+1 /* +1 offset */)
return RawBytes{
slices: count,
offsets: offsets,
start: unsafe.Pointer(&b[offset]),
data: unsafe.Pointer(&b[dataOff]),
}
}, dataOff + offsets.At(count)
}

// Assert that DecodeRawBytes implements DecodeFunc.
var _ DecodeFunc[RawBytes] = DecodeRawBytes

func defaultSliceFormatter(x []byte) string {
return string(x)
}
Expand All @@ -73,7 +77,7 @@ func rawBytesToBinFormatter(f *binfmt.Formatter, count int, sliceFormatter func(
sliceFormatter = defaultSliceFormatter
}

rb := MakeRawBytes(count, f.Data(), uint32(f.Offset()))
rb, _ := DecodeRawBytes(f.Data(), uint32(f.Offset()), count)
dataOffset := uint64(f.Offset()) + uint64(uintptr(rb.data)-uintptr(rb.start))
f.CommentLine("rawbytes")
f.CommentLine("offsets table")
Expand Down
6 changes: 4 additions & 2 deletions sstable/colblk/raw_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func TestRawBytes(t *testing.T) {
f.HexBytesln(startOffset, "start offset")
}
rawBytesToBinFormatter(f, count, nil)
rawBytes = MakeRawBytes(count, buf[startOffset:], 0)
var decodedEndOffset uint32
rawBytes, decodedEndOffset = DecodeRawBytes(buf[startOffset:], 0, count)
require.Equal(t, endOffset, decodedEndOffset+uint32(startOffset))
return f.String()
case "at":
var indices []int
Expand Down Expand Up @@ -126,7 +128,7 @@ func BenchmarkRawBytes(b *testing.B) {
b.Run(fmt.Sprintf("sliceLen=%d", sz), func(b *testing.B) {
slices := generateRandomSlices(sz, 32<<10 /* 32 KiB */)
data := buildRawBytes(slices)
rb := MakeRawBytes(len(slices), data, 0)
rb, _ := DecodeRawBytes(data, 0, len(slices))
b.ResetTimer()
b.SetBytes(32 << 10)
for i := 0; i < b.N; i++ {
Expand Down
13 changes: 9 additions & 4 deletions sstable/colblk/unsafe_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ type UnsafeIntegerSlice[T constraints.Integer] struct {
deltaWidth uintptr
}

func readUnsafeIntegerSlice[T constraints.Integer](
rows int, b []byte, off uint32,
) (endOffset uint32, slice UnsafeIntegerSlice[T]) {
// DecodeUnsafeIntegerSlice decodes the structure of a slice of uints from a
// byte slice.
func DecodeUnsafeIntegerSlice[T constraints.Integer](
b []byte, off uint32, rows int,
) (slice UnsafeIntegerSlice[T], endOffset uint32) {
delta := UintDeltaEncoding(b[off])
off++
switch delta {
Expand All @@ -91,9 +93,12 @@ func readUnsafeIntegerSlice[T constraints.Integer](
default:
panic("unreachable")
}
return off, slice
return slice, off
}

// Assert that DecodeUnsafeIntegerSlice implements DecodeFunc.
var _ DecodeFunc[UnsafeUint8s] = DecodeUnsafeIntegerSlice[uint8]

func makeUnsafeIntegerSlice[T constraints.Integer](
base T, deltaPtr unsafe.Pointer, deltaWidth int,
) UnsafeIntegerSlice[T] {
Expand Down

0 comments on commit 6492e86

Please sign in to comment.