diff --git a/pkg/sql/distsqlrun/joinreader.go b/pkg/sql/distsqlrun/joinreader.go index 377c19fe10a0..645144ba5f8e 100644 --- a/pkg/sql/distsqlrun/joinreader.go +++ b/pkg/sql/distsqlrun/joinreader.go @@ -12,6 +12,7 @@ package distsqlrun import ( "context" + "sort" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" @@ -23,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" ) // TODO(radu): we currently create one batch at a time and run the KV operations @@ -398,8 +399,13 @@ func (jr *joinReader) readInput() (joinReaderState, *distsqlpb.ProducerMetadata) jr.finalLookupBatch = true return jrCollectingOutputRows, nil } + // Sort the spans so that we can rely upon the fetcher to limit the number of + // results per batch. It's safe to reorder the spans here because we already + // restore the original order of the output during the output collection + // phase. + sort.Sort(spans) err := jr.fetcher.StartScan( - jr.Ctx, jr.flowCtx.txn, spans, false /* limitBatches */, 0, /* limitHint */ + jr.Ctx, jr.flowCtx.txn, spans, true /* limitBatches */, 0, /* limitHint */ jr.flowCtx.traceKV) if err != nil { jr.MoveToDraining(err) @@ -530,9 +536,10 @@ func (jr *joinReader) hasNullLookupColumn(row sqlbase.EncDatumRow) bool { // Start is part of the RowSource interface. func (jr *joinReader) Start(ctx context.Context) context.Context { jr.input.Start(ctx) + ctx = jr.StartInternal(ctx, joinReaderProcName) jr.fetcher.Start(ctx) jr.runningState = jrReadingInput - return jr.StartInternal(ctx, joinReaderProcName) + return ctx } // ConsumerClosed is part of the RowSource interface. diff --git a/pkg/sql/exec/colrpc/inbox.go b/pkg/sql/exec/colrpc/inbox.go index a15f0a4fcad0..a263e1d18ec5 100644 --- a/pkg/sql/exec/colrpc/inbox.go +++ b/pkg/sql/exec/colrpc/inbox.go @@ -87,6 +87,10 @@ var _ exec.Operator = &Inbox{} // NewInbox creates a new Inbox. func NewInbox(typs []types.T) (*Inbox, error) { + c, err := colserde.NewArrowBatchConverter(typs) + if err != nil { + return nil, err + } s, err := colserde.NewRecordBatchSerializer(typs) if err != nil { return nil, err @@ -94,7 +98,7 @@ func NewInbox(typs []types.T) (*Inbox, error) { i := &Inbox{ typs: typs, zeroBatch: coldata.NewMemBatchWithSize(typs, 0), - converter: colserde.NewArrowBatchConverter(typs), + converter: c, serializer: s, streamCh: make(chan flowStreamServer, 1), contextCh: make(chan context.Context, 1), diff --git a/pkg/sql/exec/colrpc/outbox.go b/pkg/sql/exec/colrpc/outbox.go index 510228a8db94..9b56cea1be98 100644 --- a/pkg/sql/exec/colrpc/outbox.go +++ b/pkg/sql/exec/colrpc/outbox.go @@ -58,6 +58,10 @@ type Outbox struct { func NewOutbox( input exec.Operator, typs []types.T, metadataSources []distsqlpb.MetadataSource, ) (*Outbox, error) { + c, err := colserde.NewArrowBatchConverter(typs) + if err != nil { + return nil, err + } s, err := colserde.NewRecordBatchSerializer(typs) if err != nil { return nil, err @@ -67,7 +71,7 @@ func NewOutbox( // be). input: exec.NewDeselectorOp(input, typs), typs: typs, - converter: colserde.NewArrowBatchConverter(typs), + converter: c, serializer: s, metadataSources: metadataSources, } diff --git a/pkg/sql/exec/colserde/arrowbatchconverter.go b/pkg/sql/exec/colserde/arrowbatchconverter.go index b719a6bc5891..57882238010a 100644 --- a/pkg/sql/exec/colserde/arrowbatchconverter.go +++ b/pkg/sql/exec/colserde/arrowbatchconverter.go @@ -52,7 +52,12 @@ type ArrowBatchConverter struct { // NewArrowBatchConverter converts coldata.Batches to []*array.Data and back // again according to the schema specified by typs. Converting data that does // not conform to typs results in undefined behavior. -func NewArrowBatchConverter(typs []types.T) *ArrowBatchConverter { +func NewArrowBatchConverter(typs []types.T) (*ArrowBatchConverter, error) { + for _, t := range typs { + if _, supported := supportedTypes[t]; !supported { + return nil, errors.Errorf("unsupported type %v", t.String()) + } + } c := &ArrowBatchConverter{typs: typs} c.builders.boolBuilder = array.NewBooleanBuilder(memory.DefaultAllocator) c.builders.binaryBuilder = array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary) @@ -63,7 +68,7 @@ func NewArrowBatchConverter(typs []types.T) *ArrowBatchConverter { // two buffers: one for the nulls, the other for the values. c.scratch.buffers[i] = make([]*memory.Buffer, 2) } - return c + return c, nil } const ( @@ -75,6 +80,23 @@ const ( sizeOfFloat64 = int(unsafe.Sizeof(float64(0))) ) +var supportedTypes = func() map[types.T]struct{} { + typs := make(map[types.T]struct{}) + for _, t := range []types.T{ + types.Bool, + types.Bytes, + types.Int8, + types.Int16, + types.Int32, + types.Int64, + types.Float32, + types.Float64, + } { + typs[t] = struct{}{} + } + return typs +}() + // BatchToArrow converts the first batch.Length elements of the batch into an // arrow []*array.Data. It is assumed that the batch is not larger than // coldata.BatchSize. The returned []*array.Data may only be used until the diff --git a/pkg/sql/exec/colserde/arrowbatchconverter_test.go b/pkg/sql/exec/colserde/arrowbatchconverter_test.go index 5e5355e1eea4..7a20fcdbe3eb 100644 --- a/pkg/sql/exec/colserde/arrowbatchconverter_test.go +++ b/pkg/sql/exec/colserde/arrowbatchconverter_test.go @@ -86,11 +86,20 @@ func assertEqualBatches(t *testing.T, expected, actual coldata.Batch) { } } +func TestArrowBatchConverterRejectsUnsupportedTypes(t *testing.T) { + defer leaktest.AfterTest(t)() + + typs := []types.T{types.Decimal} + _, err := NewArrowBatchConverter(typs) + require.Error(t, err) +} + func TestArrowBatchConverterRandom(t *testing.T) { defer leaktest.AfterTest(t)() typs, b := randomBatch() - c := NewArrowBatchConverter(typs) + c, err := NewArrowBatchConverter(typs) + require.NoError(t, err) // Make a copy of the original batch because the converter modifies and casts // data without copying for performance reasons. @@ -108,7 +117,8 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) { defer leaktest.AfterTest(t)() typs, b := randomBatch() - c := NewArrowBatchConverter(typs) + c, err := NewArrowBatchConverter(typs) + require.NoError(t, err) r, err := NewRecordBatchSerializer(typs) require.NoError(t, err) @@ -161,7 +171,8 @@ func BenchmarkArrowBatchConverter(b *testing.B) { } } } - c := NewArrowBatchConverter([]types.T{typ}) + c, err := NewArrowBatchConverter([]types.T{typ}) + require.NoError(b, err) nullFractions := []float64{0, 0.25, 0.5} setNullFraction := func(batch coldata.Batch, nullFraction float64) { vec := batch.ColVec(0) diff --git a/pkg/sql/exec/colserde/file.go b/pkg/sql/exec/colserde/file.go index 10b35cd6aaab..c24dcb6040b9 100644 --- a/pkg/sql/exec/colserde/file.go +++ b/pkg/sql/exec/colserde/file.go @@ -54,6 +54,10 @@ type FileSerializer struct { // NewFileSerializer creates a FileSerializer for the given types. The caller is // responsible for closing the given writer. func NewFileSerializer(w io.Writer, typs []types.T) (*FileSerializer, error) { + a, err := NewArrowBatchConverter(typs) + if err != nil { + return nil, err + } rb, err := NewRecordBatchSerializer(typs) if err != nil { return nil, err @@ -61,7 +65,7 @@ func NewFileSerializer(w io.Writer, typs []types.T) (*FileSerializer, error) { s := &FileSerializer{ typs: typs, fb: flatbuffers.NewBuilder(flatbufferBuilderInitialCapacity), - a: NewArrowBatchConverter(typs), + a: a, rb: rb, } return s, s.Reset(w) @@ -194,10 +198,12 @@ func newFileDeserializer(buf []byte, bufCloseFn func() error) (*FileDeserializer } d.typs = typs + if d.a, err = NewArrowBatchConverter(typs); err != nil { + return nil, err + } if d.rb, err = NewRecordBatchSerializer(typs); err != nil { return nil, err } - d.a = NewArrowBatchConverter(typs) d.arrowScratch = make([]*array.Data, 0, len(typs)) return d, nil diff --git a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join index 642edb1270a9..e951863d07f7 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/lookup_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/lookup_join @@ -749,3 +749,57 @@ SELECT url FROM [ EXPLAIN (DISTSQL) ] ---- https://cockroachdb.github.io/distsqlplan/decode.html#eJzMlVFr2z4Uxd__n0Lcp_6pjC3ZTlM9-aWFlC4Zbd62PGjRpfPmWEaSYaXkuw_HZbXLKmsEQh4t-XDO_R3BfYFaK1zKHVoQX4ABBQ4UUqCQAYUcNhQao7dorTbdL71goX6BSCiUddO67nhDYasNgngBV7oKQcBafqvwAaVCEydAQaGTZXWwaUy5k-a5sDtZVUDhsZG1FSSKGZG1Ioxo9x0NULgtK4dGkIuCkUtS8P_J1zZJ0i3hiRBisVzPgcKqdYIUjBYcNnsKunVvoayTTwiC7Wl48Dtd1q-583Hu9XODgtzf3K7J482nBblbLZZA_4yjpJNA4V7rn21DfuiyJrruonUhl-Si4N0Q-esQSFj-T0PwD4d4y66NQoNqHLtgl7DZ_2XSpY50E7NxOx_ZpyN7Fl4-Cyo_ZlHMT1L_RPRB_bPzrZ-H8-dh_HkUpyfhPxF9wP_qfPmn4fzTMP5pFGcn4T8RfcB_fr78s3D-WRj_LIrzk_CfiD7gf32-_Cd26APaRtcWgzZL0u0mVE_Y7zKrW7PFz0ZvDzb95-qgOxwotK6_Zf3Hou6vuoBDMfOK-UjM3ou533nCOvWqM784OyZ37hXP_M6zY5yvvOK533l-jPO1v6tk4pn4H9l7783-v98BAAD__0_IfTU= + +# Regression test for #35950: Make sure that lookup joins use a batch limit. + +statement ok +CREATE TABLE a (a INT, b INT, PRIMARY KEY (a, b)) + +statement ok +CREATE TABLE b (a INT PRIMARY KEY) + +# We insert over 10k rows, which is the currently configured batch limit. + +statement ok +INSERT INTO a SELECT 1, g FROM generate_series(1,11000) g + +statement ok +INSERT INTO b VALUES(1) + +query TTT +EXPLAIN SELECT count(*) FROM (SELECT * FROM b NATURAL INNER LOOKUP JOIN a) +---- +group · · + │ aggregate 0 count_rows() + │ scalar · + └── render · · + └── lookup-join · · + │ table a@primary + │ type inner + │ equality (a) = (a) + └── scan · · +· table b@primary +· spans ALL + +statement ok +SET tracing = on + +query I +SELECT count(*) FROM (SELECT * FROM b NATURAL INNER LOOKUP JOIN a) +---- +11000 + +statement ok +SET tracing = off + +let $lookupTableID +SELECT 'a'::regclass::oid + +# Now assert that we get more than 1 separate batch request into the lookup +# table, since the first one wouldn't have returned all of the results. + +query T +SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'Scan /Table/$lookupTableID%' +---- +Scan /Table/63/1/{1-2} +Scan /Table/63/1/{1/10001/0-2}