Skip to content

Commit

Permalink
colblk: add RawBytes, RawBytesBuilder
Browse files Browse the repository at this point in the history
Add a new columnar primitive for storing an array of variable-length, unordered
byte slices. The new format stores an offset table to provide readers with
constant-time lookup of the position of any item in the array.
  • Loading branch information
jbowens committed Jul 12, 2024
1 parent e762e05 commit 2d97bb1
Show file tree
Hide file tree
Showing 7 changed files with 598 additions and 3 deletions.
6 changes: 6 additions & 0 deletions internal/binfmt/binfmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (f *Formatter) Pointer(off int) unsafe.Pointer {
return unsafe.Pointer(&f.data[f.off+off])
}

// Data returns the original data slice. Offset may be used to retrieve the
// current offset within the slice.
func (f *Formatter) Data() []byte {
return f.data
}

func (f *Formatter) newline(binaryData, comment string) {
f.lines = append(f.lines, [2]string{binaryData, comment})
f.buf.Reset()
Expand Down
186 changes: 186 additions & 0 deletions sstable/colblk/raw_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package colblk

import (
"fmt"
"io"
"unsafe"

"github.com/cockroachdb/pebble/internal/binfmt"
)

// RawBytes holds an array of byte slices, stored as a concatenated data section
// and a series of offsets for each slice. Byte slices within RawBytes are
// stored in their entirety without any compression, ensuring stability without
// copying.
//
// # Representation
//
// An array of N byte slices encodes N+1 offsets. The beginning of the data
// representation holds an offsets table, in the same encoding as a
// DataTypeUint32 column. The Uint32 offsets may be delta-encoded to save space
// if all offsets fit within an 8-bit or 16-bit uint. Each offset is relative to
// the beginning of the string data section (after the offset table).
//
// The use of delta encoding conserves space in the common case. In the context
// of CockroachDB, the vast majority of offsets will fit in 16-bits when using
// 32 KiB blocks (the size in use by CockroachDB). However a single value larger
// than 65535 bytes requires an offset too large to fit within 16 bits, in which
// case offsets will be encoded as 32-bit integers.
//
// +-------------------------------------------------------------------+
// | a uint32 offsets table, possibly delta encoded, |
// | possibly padded for 32-bit alignment |
// | (see DeltaEncoding) |
// +-------------------------------------------------------------------+
// | String Data |
// | abcabcada.... |
// +-------------------------------------------------------------------+
//
// The DeltaEncoding bits of the ColumnEncoding for a RawBytes column describes
// the delta encoding of the offset table.
type RawBytes struct {
slices int
offsets UnsafeUint32s
start unsafe.Pointer
data unsafe.Pointer
}

// MakeRawBytes constructs an accessor for an array of byte slices constructed
// by RawBytesBuilder. Count must be the number of byte slices within the array.
func MakeRawBytes(count int, b []byte, offset uint32, enc ColumnEncoding) RawBytes {
if count == 0 {
return RawBytes{}
}
dataOff, offsets := readUnsafeIntegerSlice[uint32](count+1 /* +1 offset */, b, offset, enc.Delta())
return RawBytes{
slices: count,
offsets: offsets,
start: unsafe.Pointer(&b[offset]),
data: unsafe.Pointer(&b[dataOff]),
}
}

func defaultSliceFormatter(x []byte) string {
return string(x)
}

func rawBytesToBinFormatter(
f *binfmt.Formatter, count int, enc ColumnEncoding, sliceFormatter func([]byte) string,
) {
if sliceFormatter == nil {
sliceFormatter = defaultSliceFormatter
}

rb := MakeRawBytes(count, f.Data(), uint32(f.Offset()), enc)
dataOffset := uint64(f.Offset()) + uint64(uintptr(rb.data)-uintptr(rb.start))
f.CommentLine("RawBytes")
f.CommentLine("Offsets table")
uintsToBinFormatter(f, count+1, ColumnDesc{DataType: DataTypeUint32, Encoding: enc}, func(offset uint64) string {
return fmt.Sprintf("%d [%d overall]", offset, offset+dataOffset)
})
f.CommentLine("Data")
for i := 0; i < rb.slices; i++ {
s := rb.At(i)
f.HexBytesln(len(s), "data[%d]: %s", i, sliceFormatter(s))
}
}

func (b RawBytes) ptr(offset int) unsafe.Pointer {
return unsafe.Pointer(uintptr(b.data) + uintptr(offset))
}

func (b RawBytes) slice(start, end int) []byte {
return unsafe.Slice((*byte)(b.ptr(start)), end-start)
}

// At returns the []byte at index i. The returned slice should not be mutated.
func (b RawBytes) At(i int) []byte {
return b.slice(int(b.offsets.At(i)), int(b.offsets.At(i+1)))
}

// Slices returns the number of []byte slices encoded within the RawBytes.
func (b RawBytes) Slices() int {
return b.slices
}

// RawBytesBuilder encodes a column of byte slices.
type RawBytesBuilder struct {
rows int
data []byte
offsets UintBuilder[uint32]
}

// Assert that *RawBytesBuilder implements ColumnWriter.
var _ ColumnWriter = (*RawBytesBuilder)(nil)

// Reset resets the builder to an empty state, preserving the existing bundle
// size.
func (b *RawBytesBuilder) Reset() {
b.rows = 0
b.data = b.data[:0]
b.offsets.Reset()
// Add an initial offset of zero to streamline the logic in RawBytes.At() to
// avoid needing a special case for row 0.
b.offsets.Set(0, 0)
}

// NumColumns implements ColumnWriter.
func (b *RawBytesBuilder) NumColumns() int { return 1 }

// Put appends the provided byte slice to the builder.
func (b *RawBytesBuilder) Put(s []byte) {
b.data = append(b.data, s...)
b.rows++
b.offsets.Set(b.rows, uint32(len(b.data)))
}

// PutConcat appends a single byte slice formed by the concatenation of the two
// byte slice arguments.
func (b *RawBytesBuilder) PutConcat(s1, s2 []byte) {
b.data = append(append(b.data, s1...), s2...)
b.rows++
b.offsets.Set(b.rows, uint32(len(b.data)))
}

// Finish writes the serialized byte slices to buf starting at offset. The buf
// slice must be sufficiently large to store the serialized output. The caller
// should use [Size] to size buf appropriately before calling Finish.
func (b *RawBytesBuilder) Finish(
col int, rows int, offset uint32, buf []byte,
) (uint32, ColumnDesc) {
desc := ColumnDesc{DataType: DataTypeBytes, Encoding: EncodingDefault}
if rows == 0 {
return 0, desc
}
dataLen := b.offsets.Get(rows)
dataOffset, offsetColumnDesc := b.offsets.Finish(0, rows+1, offset, buf)
desc.Encoding = offsetColumnDesc.Encoding
// Copy the data section.
endOffset := dataOffset + uint32(copy(buf[dataOffset:], b.data[:dataLen]))
return endOffset, desc
}

// Size computes the size required to encode the byte slices beginning in a
// buffer at the provided offset. The offset is required to ensure proper
// alignment. The returned uint32 is the offset of the first byte after the end
// of the encoded data. To compute the size in bytes, subtract the [offset]
// passed into Size from the returned offset.
func (b *RawBytesBuilder) Size(rows int, offset uint32) uint32 {
if rows == 0 {
return 0
}
// Get the size needed to encode the rows+1 offsets.
offset = b.offsets.Size(rows+1, offset)
// Add the value of offset[rows] since that is the accumulated size of the
// first [rows] slices.
return offset + b.offsets.Get(rows)
}

// WriteDebug implements Encoder.
func (b *RawBytesBuilder) WriteDebug(w io.Writer, rows int) {
fmt.Fprintf(w, "bytes: %d rows set; %d bytes in data", b.rows, len(b.data))
}
140 changes: 140 additions & 0 deletions sstable/colblk/raw_bytes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2024 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package colblk

import (
"bytes"
"fmt"
"io"
"strings"
"testing"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/aligned"
"github.com/cockroachdb/pebble/internal/binfmt"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func TestRawBytes(t *testing.T) {
var out bytes.Buffer
var builder RawBytesBuilder
var rawBytes RawBytes
datadriven.RunTest(t, "testdata/raw_bytes", func(t *testing.T, td *datadriven.TestData) string {
out.Reset()
switch td.Cmd {
case "build":
builder.Reset()

var startOffset int
td.ScanArgs(t, "offset", &startOffset)

var count int
for _, k := range strings.Split(strings.TrimSpace(td.Input), "\n") {
builder.Put([]byte(k))
count++
}
td.MaybeScanArgs(t, "count", &count)

size := builder.Size(count, uint32(startOffset))
fmt.Fprintf(&out, "Size: %d\n", size-uint32(startOffset))

buf := aligned.ByteSlice(startOffset + int(size))
endOffset, desc := builder.Finish(0, count, uint32(startOffset), buf)

// Validate that builder.Size() was correct in its estimate.
require.Equal(t, size, endOffset)
f := binfmt.New(buf).LineWidth(20)
if startOffset > 0 {
f.HexBytesln(startOffset, "start offset")
}
rawBytesToBinFormatter(f, count, desc.Encoding, nil)
rawBytes = MakeRawBytes(count, buf[:], 0, desc.Encoding)
return f.String()
case "at":
var indices []int
td.ScanArgs(t, "indices", &indices)
for i, index := range indices {
if i > 0 {
fmt.Fprintln(&out)
}
fmt.Fprintf(&out, "%s", rawBytes.At(index))
}
return out.String()
default:
panic(fmt.Sprintf("unrecognized command %q", td.Cmd))
}
})
}

func BenchmarkRawBytes(b *testing.B) {
seed := uint64(205295296)
generateRandomSlices := func(sliceSize, totalByteSize int) [][]byte {
rng := rand.New(rand.NewSource(seed))
randInt := func(lo, hi int) int {
return lo + rng.Intn(hi-lo)
}
data := make([]byte, totalByteSize)
for i := range data {
data[i] = byte(randInt(int('a'), int('z')+1))
}
slices := make([][]byte, 0, totalByteSize/sliceSize)
for len(data) > 0 {
s := data[:min(sliceSize, len(data))]
slices = append(slices, s)
data = data[len(s):]
}
return slices
}

var builder RawBytesBuilder
var buf []byte
buildRawBytes := func(slices [][]byte) ([]byte, ColumnEncoding) {
builder.Reset()
for _, s := range slices {
builder.Put(s)
}
sz := builder.Size(len(slices), 0)
if cap(buf) >= int(sz) {
buf = buf[:sz]
} else {
buf = make([]byte, sz)
}
_, desc := builder.Finish(0, len(slices), 0, buf)
return buf, desc.Encoding
}

sizes := []int{8, 128, 1024}

b.Run("Builder", func(b *testing.B) {
for _, sz := range sizes {
b.Run(fmt.Sprintf("sliceLen=%d", sz), func(b *testing.B) {
slices := generateRandomSlices(sz, 32<<10 /* 32 KiB */)
b.ResetTimer()
b.SetBytes(32 << 10)
for i := 0; i < b.N; i++ {
data, _ := buildRawBytes(slices)
fmt.Fprint(io.Discard, data)
}
})
}
})
b.Run("At", func(b *testing.B) {
for _, sz := range sizes {
b.Run(fmt.Sprintf("sliceLen=%d", sz), func(b *testing.B) {
slices := generateRandomSlices(sz, 32<<10 /* 32 KiB */)
data, enc := buildRawBytes(slices)
rb := MakeRawBytes(len(slices), data, 0, enc)
b.ResetTimer()
b.SetBytes(32 << 10)
for i := 0; i < b.N; i++ {
for j := 0; j < len(slices); j++ {
fmt.Fprint(io.Discard, rb.At(j))
}
}
})
}
})
}
Loading

0 comments on commit 2d97bb1

Please sign in to comment.