Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
38837: distsqlrun: make joinreader use limited batches r=jordanlewis a=jordanlewis

Closes cockroachdb#35950.

Previously, lookup join put no limit on the number of returned rows from
each of its lookups. This could be hugely problematic. Imagine a
circumstance in which a lookup join was planned on a pair of tables that
had 1 billion match rows on the right side for every row on the left
side: in this case, the lookup join would ask the KV backend to return a
single giant buffer with all 1 billion rows.

Also, joinReader's tracing span collection was slightly broken - fix that while we're at it, to make sure we can test this change properly.

Release note (bug fix): prevent OOM conditions during lookup joins
between tables with a very large n:1 relationship.

38887: exec: check for unsupported by ArrowBatchConverter type r=yuzefovich a=yuzefovich

Previously, we would create an arrow batch converter without
paying attention to whether we supported the types which could
lead to a panic during actual conversion. Now we do this check
upfront so that we can fall back to DistSQL.

Release note: None

Co-authored-by: Jordan Lewis <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jul 16, 2019
3 parents 862b79a + 5368cf0 + 7ddf6a2 commit 07732c1
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 12 deletions.
13 changes: 10 additions & 3 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package distsqlrun

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
Expand All @@ -23,7 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
)

// TODO(radu): we currently create one batch at a time and run the KV operations
Expand Down Expand Up @@ -398,8 +399,13 @@ func (jr *joinReader) readInput() (joinReaderState, *distsqlpb.ProducerMetadata)
jr.finalLookupBatch = true
return jrCollectingOutputRows, nil
}
// Sort the spans so that we can rely upon the fetcher to limit the number of
// results per batch. It's safe to reorder the spans here because we already
// restore the original order of the output during the output collection
// phase.
sort.Sort(spans)
err := jr.fetcher.StartScan(
jr.Ctx, jr.flowCtx.txn, spans, false /* limitBatches */, 0, /* limitHint */
jr.Ctx, jr.flowCtx.txn, spans, true /* limitBatches */, 0, /* limitHint */
jr.flowCtx.traceKV)
if err != nil {
jr.MoveToDraining(err)
Expand Down Expand Up @@ -530,9 +536,10 @@ func (jr *joinReader) hasNullLookupColumn(row sqlbase.EncDatumRow) bool {
// Start is part of the RowSource interface.
func (jr *joinReader) Start(ctx context.Context) context.Context {
jr.input.Start(ctx)
ctx = jr.StartInternal(ctx, joinReaderProcName)
jr.fetcher.Start(ctx)
jr.runningState = jrReadingInput
return jr.StartInternal(ctx, joinReaderProcName)
return ctx
}

// ConsumerClosed is part of the RowSource interface.
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/exec/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,18 @@ var _ exec.Operator = &Inbox{}

// NewInbox creates a new Inbox.
func NewInbox(typs []types.T) (*Inbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
if err != nil {
return nil, err
}
s, err := colserde.NewRecordBatchSerializer(typs)
if err != nil {
return nil, err
}
i := &Inbox{
typs: typs,
zeroBatch: coldata.NewMemBatchWithSize(typs, 0),
converter: colserde.NewArrowBatchConverter(typs),
converter: c,
serializer: s,
streamCh: make(chan flowStreamServer, 1),
contextCh: make(chan context.Context, 1),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/exec/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Outbox struct {
func NewOutbox(
input exec.Operator, typs []types.T, metadataSources []distsqlpb.MetadataSource,
) (*Outbox, error) {
c, err := colserde.NewArrowBatchConverter(typs)
if err != nil {
return nil, err
}
s, err := colserde.NewRecordBatchSerializer(typs)
if err != nil {
return nil, err
Expand All @@ -67,7 +71,7 @@ func NewOutbox(
// be).
input: exec.NewDeselectorOp(input, typs),
typs: typs,
converter: colserde.NewArrowBatchConverter(typs),
converter: c,
serializer: s,
metadataSources: metadataSources,
}
Expand Down
26 changes: 24 additions & 2 deletions pkg/sql/exec/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ type ArrowBatchConverter struct {
// NewArrowBatchConverter converts coldata.Batches to []*array.Data and back
// again according to the schema specified by typs. Converting data that does
// not conform to typs results in undefined behavior.
func NewArrowBatchConverter(typs []types.T) *ArrowBatchConverter {
func NewArrowBatchConverter(typs []types.T) (*ArrowBatchConverter, error) {
for _, t := range typs {
if _, supported := supportedTypes[t]; !supported {
return nil, errors.Errorf("unsupported type %v", t.String())
}
}
c := &ArrowBatchConverter{typs: typs}
c.builders.boolBuilder = array.NewBooleanBuilder(memory.DefaultAllocator)
c.builders.binaryBuilder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
Expand All @@ -63,7 +68,7 @@ func NewArrowBatchConverter(typs []types.T) *ArrowBatchConverter {
// two buffers: one for the nulls, the other for the values.
c.scratch.buffers[i] = make([]*memory.Buffer, 2)
}
return c
return c, nil
}

const (
Expand All @@ -75,6 +80,23 @@ const (
sizeOfFloat64 = int(unsafe.Sizeof(float64(0)))
)

var supportedTypes = func() map[types.T]struct{} {
typs := make(map[types.T]struct{})
for _, t := range []types.T{
types.Bool,
types.Bytes,
types.Int8,
types.Int16,
types.Int32,
types.Int64,
types.Float32,
types.Float64,
} {
typs[t] = struct{}{}
}
return typs
}()

// BatchToArrow converts the first batch.Length elements of the batch into an
// arrow []*array.Data. It is assumed that the batch is not larger than
// coldata.BatchSize. The returned []*array.Data may only be used until the
Expand Down
17 changes: 14 additions & 3 deletions pkg/sql/exec/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,20 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) {
}
}

func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()

typs := []types.T{types.Decimal}
_, err := NewArrowBatchConverter(typs)
require.Error(t, err)
}

func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

typs, b := randomBatch()
c := NewArrowBatchConverter(typs)
c, err := NewArrowBatchConverter(typs)
require.NoError(t, err)

// Make a copy of the original batch because the converter modifies and casts
// data without copying for performance reasons.
Expand All @@ -108,7 +117,8 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
defer leaktest.AfterTest(t)()

typs, b := randomBatch()
c := NewArrowBatchConverter(typs)
c, err := NewArrowBatchConverter(typs)
require.NoError(t, err)
r, err := NewRecordBatchSerializer(typs)
require.NoError(t, err)

Expand Down Expand Up @@ -161,7 +171,8 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
}
}
}
c := NewArrowBatchConverter([]types.T{typ})
c, err := NewArrowBatchConverter([]types.T{typ})
require.NoError(b, err)
nullFractions := []float64{0, 0.25, 0.5}
setNullFraction := func(batch coldata.Batch, nullFraction float64) {
vec := batch.ColVec(0)
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/exec/colserde/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,18 @@ type FileSerializer struct {
// NewFileSerializer creates a FileSerializer for the given types. The caller is
// responsible for closing the given writer.
func NewFileSerializer(w io.Writer, typs []types.T) (*FileSerializer, error) {
a, err := NewArrowBatchConverter(typs)
if err != nil {
return nil, err
}
rb, err := NewRecordBatchSerializer(typs)
if err != nil {
return nil, err
}
s := &FileSerializer{
typs: typs,
fb: flatbuffers.NewBuilder(flatbufferBuilderInitialCapacity),
a: NewArrowBatchConverter(typs),
a: a,
rb: rb,
}
return s, s.Reset(w)
Expand Down Expand Up @@ -194,10 +198,12 @@ func newFileDeserializer(buf []byte, bufCloseFn func() error) (*FileDeserializer
}
d.typs = typs

if d.a, err = NewArrowBatchConverter(typs); err != nil {
return nil, err
}
if d.rb, err = NewRecordBatchSerializer(typs); err != nil {
return nil, err
}
d.a = NewArrowBatchConverter(typs)
d.arrowScratch = make([]*array.Data, 0, len(typs))

return d, nil
Expand Down
54 changes: 54 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,57 @@ SELECT url FROM [ EXPLAIN (DISTSQL)
]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMlVFr2z4Uxd__n0Lcp_6pjC3ZTlM9-aWFlC4Zbd62PGjRpfPmWEaSYaXkuw_HZbXLKmsEQh4t-XDO_R3BfYFaK1zKHVoQX4ABBQ4UUqCQAYUcNhQao7dorTbdL71goX6BSCiUddO67nhDYasNgngBV7oKQcBafqvwAaVCEydAQaGTZXWwaUy5k-a5sDtZVUDhsZG1FSSKGZG1Ioxo9x0NULgtK4dGkIuCkUtS8P_J1zZJ0i3hiRBisVzPgcKqdYIUjBYcNnsKunVvoayTTwiC7Wl48Dtd1q-583Hu9XODgtzf3K7J482nBblbLZZA_4yjpJNA4V7rn21DfuiyJrruonUhl-Si4N0Q-esQSFj-T0PwD4d4y66NQoNqHLtgl7DZ_2XSpY50E7NxOx_ZpyN7Fl4-Cyo_ZlHMT1L_RPRB_bPzrZ-H8-dh_HkUpyfhPxF9wP_qfPmn4fzTMP5pFGcn4T8RfcB_fr78s3D-WRj_LIrzk_CfiD7gf32-_Cd26APaRtcWgzZL0u0mVE_Y7zKrW7PFz0ZvDzb95-qgOxwotK6_Zf3Hou6vuoBDMfOK-UjM3ou533nCOvWqM784OyZ37hXP_M6zY5yvvOK533l-jPO1v6tk4pn4H9l7783-v98BAAD__0_IfTU=

# Regression test for #35950: Make sure that lookup joins use a batch limit.

statement ok
CREATE TABLE a (a INT, b INT, PRIMARY KEY (a, b))

statement ok
CREATE TABLE b (a INT PRIMARY KEY)

# We insert over 10k rows, which is the currently configured batch limit.

statement ok
INSERT INTO a SELECT 1, g FROM generate_series(1,11000) g

statement ok
INSERT INTO b VALUES(1)

query TTT
EXPLAIN SELECT count(*) FROM (SELECT * FROM b NATURAL INNER LOOKUP JOIN a)
----
group · ·
│ aggregate 0 count_rows()
│ scalar ·
└── render · ·
└── lookup-join · ·
│ table a@primary
│ type inner
│ equality (a) = (a)
└── scan · ·
· table b@primary
· spans ALL

statement ok
SET tracing = on

query I
SELECT count(*) FROM (SELECT * FROM b NATURAL INNER LOOKUP JOIN a)
----
11000

statement ok
SET tracing = off

let $lookupTableID
SELECT 'a'::regclass::oid

# Now assert that we get more than 1 separate batch request into the lookup
# table, since the first one wouldn't have returned all of the results.

query T
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'Scan /Table/$lookupTableID%'
----
Scan /Table/63/1/{1-2}
Scan /Table/63/1/{1/10001/0-2}

0 comments on commit 07732c1

Please sign in to comment.