Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Tyler314 committed Jul 10, 2019
2 parents 495d48b + e8c1571 commit d9f6f15
Show file tree
Hide file tree
Showing 11 changed files with 734 additions and 73 deletions.
2 changes: 1 addition & 1 deletion c-deps/rocksdb
Submodule rocksdb updated 801 files
7 changes: 4 additions & 3 deletions pkg/sql/exec/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Inbox struct {

scratch struct {
data []*array.Data
b coldata.Batch
}
}

Expand All @@ -103,6 +104,7 @@ func NewInbox(typs []types.T) (*Inbox, error) {
}
i.zeroBatch.SetLength(0)
i.scratch.data = make([]*array.Data, len(typs))
i.scratch.b = coldata.NewMemBatch(typs)
return i, nil
}

Expand Down Expand Up @@ -249,11 +251,10 @@ func (i *Inbox) Next(ctx context.Context) coldata.Batch {
if err := i.serializer.Deserialize(&i.scratch.data, m.Data.RawBytes); err != nil {
panic(err)
}
b, err := i.converter.ArrowToBatch(i.scratch.data)
if err != nil {
if err := i.converter.ArrowToBatch(i.scratch.data, i.scratch.b); err != nil {
panic(err)
}
return b
return i.scratch.b
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (o *Outbox) sendBatches(
log.Errorf(ctx, "Outbox BatchToArrow data serialization error: %+v", err)
return false, err
}
if err := o.serializer.Serialize(o.scratch.buf, d); err != nil {
if _, _, err := o.serializer.Serialize(o.scratch.buf, d); err != nil {
log.Errorf(ctx, "Outbox Serialize data error: %+v", err)
return false, err
}
Expand Down
35 changes: 23 additions & 12 deletions pkg/sql/exec/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ type ArrowBatchConverter struct {
}

scratch struct {
// batch and arrowData are used as scratch space returned as the corresponding
// arrowData is used as scratch space returned as the corresponding
// conversion result.
batch coldata.Batch
arrowData []*array.Data
// buffers is scratch space for exactly two buffers per element in
// arrowData.
Expand All @@ -57,7 +56,6 @@ func NewArrowBatchConverter(typs []types.T) *ArrowBatchConverter {
c := &ArrowBatchConverter{typs: typs}
c.builders.boolBuilder = array.NewBooleanBuilder(memory.DefaultAllocator)
c.builders.binaryBuilder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
c.scratch.batch = coldata.NewMemBatch(typs)
c.scratch.arrowData = make([]*array.Data, len(typs))
c.scratch.buffers = make([][]*memory.Buffer, len(typs))
for i := range c.scratch.buffers {
Expand Down Expand Up @@ -182,16 +180,32 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]*array.Data,
}

// ArrowToBatch converts []*array.Data to a coldata.Batch. There must not be
// more than coldata.BatchSize elements in data. The returned batch may only be
// used until the next call to ArrowToBatch.
func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data) (coldata.Batch, error) {
// more than coldata.BatchSize elements in data. It's safe to call ArrowToBatch
// concurrently.
//
// The passed in batch is overwritten, but after this method returns it stays
// valid as long as `data` stays valid. Callers can use this to control the
// lifetimes of the batches, saving allocations when they can be reused (i.e.
// reused by passing them back into this function).
//
// The passed in data is also mutated (we store nulls differently than arrow and
// the adjustment is done in place).
func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data, b coldata.Batch) error {
if len(data) != len(c.typs) {
return nil, errors.Errorf("mismatched data and schema length: %d != %d", len(data), len(c.typs))
return errors.Errorf("mismatched data and schema length: %d != %d", len(data), len(c.typs))
}
// Assume > 0 length data.
n := data[0].Len()
// Reset reuses the passed-in Batch when possible, saving allocations but
// overwriting it. If the passed-in Batch is not suitable for use, a new one
// is allocated.
b.Reset(c.typs, n)
b.SetLength(uint16(n))
// No selection, all values are valid.
b.SetSelection(false)

for i, typ := range c.typs {
vec := c.scratch.batch.ColVec(i)
vec := b.ColVec(i)
d := data[i]

var arr array.Interface
Expand Down Expand Up @@ -261,8 +275,5 @@ func (c *ArrowBatchConverter) ArrowToBatch(data []*array.Data) (coldata.Batch, e
vec.Nulls().SetNullBitmap(arrowBitmap, n)
}
}
c.scratch.batch.SetLength(uint16(n))
// No selection, all values are valid.
c.scratch.batch.SetSelection(false)
return c.scratch.batch, nil
return nil
}
110 changes: 80 additions & 30 deletions pkg/sql/exec/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package colserde

import (
"bytes"
"fmt"
"testing"

Expand All @@ -23,11 +24,8 @@ import (
"github.com/stretchr/testify/require"
)

func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

func randomBatch() ([]types.T, coldata.Batch) {
const maxTyps = 16

rng, _ := randutil.NewPseudoRand()

availableTyps := make([]types.T, 0, len(types.AllTypes))
Expand All @@ -44,47 +42,94 @@ func TestArrowBatchConverterRandom(t *testing.T) {
}

b := exec.RandomBatch(rng, typs, rng.Intn(coldata.BatchSize)+1, rng.Float64())
c := NewArrowBatchConverter(typs)
return typs, b
}

// Make a copy of the original batch because the converter modifies and casts
// data without copying for performance reasons.
expectedLength := b.Length()
expectedWidth := b.Width()
expectedColVecs := make([]coldata.Vec, len(b.ColVecs()))
for i := range typs {
expectedColVecs[i] = coldata.NewMemColumn(typs[i], int(b.Length()))
expectedColVecs[i].Copy(
coldata.CopyArgs{
ColType: typs[i],
Src: b.ColVec(i),
SrcEndIdx: uint64(b.Length()),
},
)
func copyBatch(original coldata.Batch) coldata.Batch {
typs := make([]types.T, original.Width())
for i, vec := range original.ColVecs() {
typs[i] = vec.Type()
}
b := coldata.NewMemBatchWithSize(typs, int(original.Length()))
b.SetLength(original.Length())
for colIdx, col := range original.ColVecs() {
b.ColVec(colIdx).Copy(coldata.CopyArgs{
ColType: typs[colIdx],
Src: col,
SrcEndIdx: uint64(original.Length()),
})
}
return b
}

arrowData, err := c.BatchToArrow(b)
require.NoError(t, err)
result, err := c.ArrowToBatch(arrowData)
require.NoError(t, err)
if result.Selection() != nil {
func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) {
t.Helper()

if actual.Selection() != nil {
t.Fatal("violated invariant that batches have no selection vectors")
}
require.Equal(t, expectedLength, result.Length())
require.Equal(t, expectedWidth, result.Width())
for i, typ := range typs {
require.Equal(t, expected.Length(), actual.Length())
require.Equal(t, expected.Width(), actual.Width())
for colIdx := 0; colIdx < expected.Width(); colIdx++ {
// Verify equality of ColVecs (this includes nulls). Since the coldata.Vec
// backing array is always of coldata.BatchSize due to the scratch batch
// that the converter keeps around, the coldata.Vec needs to be sliced to
// the first length elements to match on length, otherwise the check will
// fail.
expectedVec := expected.ColVec(colIdx)
actualVec := actual.ColVec(colIdx)
require.Equal(
t,
expectedColVecs[i].Slice(typ, 0, uint64(b.Length())),
result.ColVec(i).Slice(typ, 0, uint64(result.Length())),
expectedVec.Slice(expectedVec.Type(), 0, uint64(expected.Length())),
actualVec.Slice(actualVec.Type(), 0, uint64(actual.Length())),
)
}
}

func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

typs, b := randomBatch()
c := NewArrowBatchConverter(typs)

// Make a copy of the original batch because the converter modifies and casts
// data without copying for performance reasons.
expected := copyBatch(b)

arrowData, err := c.BatchToArrow(b)
require.NoError(t, err)
actual := coldata.NewMemBatchWithSize(nil, 0)
require.NoError(t, c.ArrowToBatch(arrowData, actual))

assertEqualBatches(t, expected, actual)
}

func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
defer leaktest.AfterTest(t)()

typs, b := randomBatch()
c := NewArrowBatchConverter(typs)
r, err := NewRecordBatchSerializer(typs)
require.NoError(t, err)

// Make a copy of the original batch because the converter modifies and casts
// data without copying for performance reasons.
expected := copyBatch(b)

var buf bytes.Buffer
arrowDataIn, err := c.BatchToArrow(b)
require.NoError(t, err)
_, _, err = r.Serialize(&buf, arrowDataIn)
require.NoError(t, err)

var arrowDataOut []*array.Data
require.NoError(t, r.Deserialize(&arrowDataOut, buf.Bytes()))
actual := coldata.NewMemBatchWithSize(nil, 0)
require.NoError(t, c.ArrowToBatch(arrowDataOut, actual))

assertEqualBatches(t, expected, actual)
}

func BenchmarkArrowBatchConverter(b *testing.B) {
// fixedLen specifies how many bytes we should fit variable length data types
// to in order to reduce benchmark noise.
Expand Down Expand Up @@ -149,10 +194,15 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
data, err := c.BatchToArrow(batch)
require.NoError(b, err)
testPrefix := fmt.Sprintf("%s/nullFraction=%0.2f", typ.String(), nullFraction)
result := coldata.NewMemBatch(typs)
b.Run(testPrefix+"/ArrowToBatch", func(b *testing.B) {
b.SetBytes(numBytes[typIdx])
for i := 0; i < b.N; i++ {
result, _ := c.ArrowToBatch(data)
// Using require.NoError here causes large enough allocations to
// affect the result.
if err := c.ArrowToBatch(data, result); err != nil {
b.Fatal(err)
}
if result.Width() != 1 {
b.Fatal("expected one column")
}
Expand Down
Loading

0 comments on commit d9f6f15

Please sign in to comment.