Skip to content

Commit

Permalink
Merge #65497
Browse files Browse the repository at this point in the history
65497: sql: preliminary work for BatchReceiver introduction r=yuzefovich a=yuzefovich

See individual commits for details.

This PR extracts the preliminary work for the introduction of
`BatchReceiver` in #65289.

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed May 21, 2021
2 parents d46b650 + 8369afa commit 040a7af
Show file tree
Hide file tree
Showing 25 changed files with 628 additions and 368 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ ALL_TESTS = [
"//pkg/sql/catalog/typedesc:typedesc_test",
"//pkg/sql/catalog:catalog_test",
"//pkg/sql/colcontainer:colcontainer_test",
"//pkg/sql/colconv:colconv_test",
"//pkg/sql/colencoding:colencoding_test",
"//pkg/sql/colexec/colbuilder:colbuilder_test",
"//pkg/sql/colexec/colexecagg:colexecagg_test",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeeddist/distflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func StartDistChangefeed(
return resultRows.Err()
}

// changefeedResultWriter implements the `rowexec.resultWriter` that sends
// changefeedResultWriter implements the `sql.rowResultWriter` that sends
// the received rows back over the given channel.
type changefeedResultWriter struct {
rowsCh chan<- tree.Datums
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/cloud/nodelocal"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -190,9 +189,6 @@ func (r *errorReportingRowReceiver) Push(
}

func (r *errorReportingRowReceiver) ProducerDone() {}
func (r *errorReportingRowReceiver) Types() []*types.T {
return nil
}

// A do nothing bulk adder implementation.
type doNothingKeyAdder struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/col/coldata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_test(
embed = [":coldata"],
deps = [
"//pkg/col/coldatatestutils",
"//pkg/sql/colconv",
"//pkg/sql/types",
"//pkg/testutils/buildutil",
"//pkg/util/leaktest",
Expand Down
23 changes: 12 additions & 11 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,17 @@ func (m *MemBatch) String() string {
if m.Length() == 0 {
return "[zero-length batch]"
}
var builder strings.Builder
strs := make([]string, len(m.ColVecs()))
for i := 0; i < m.Length(); i++ {
builder.WriteString("\n[")
for colIdx, v := range m.ColVecs() {
strs[colIdx] = fmt.Sprintf("%v", GetValueAt(v, i))
}
builder.WriteString(strings.Join(strs, ", "))
builder.WriteString("]")
if VecsToStringWithRowPrefix == nil {
panic("need to inject the implementation from sql/colconv package")
}
builder.WriteString("\n")
return builder.String()
return strings.Join(VecsToStringWithRowPrefix(m.ColVecs(), m.Length(), m.Selection(), "" /* prefix */), "\n")
}

// VecsToStringWithRowPrefix returns a pretty representation of the vectors.
// This method will convert all vectors to datums in order to print everything
// in the same manner as the tree.Datum representation does. Each row is printed
// in a separate string.
//
// The implementation lives in colconv package and is injected during the
// initialization.
var VecsToStringWithRowPrefix func(vecs []Vec, length int, sel []int, prefix string) []string
41 changes: 41 additions & 0 deletions pkg/col/coldata/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colconv"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -142,3 +143,43 @@ func TestBatchWithBytesAndNulls(t *testing.T) {
assert.True(t, len(vec.Get(idx)) == 0)
}
}

// Import colconv package in order to inject the implementation of
// coldata.VecsToStringWithRowPrefix.
var _ colconv.VecToDatumConverter

func TestBatchString(t *testing.T) {
defer leaktest.AfterTest(t)()

b := coldata.NewMemBatch([]*types.T{types.String}, coldata.StandardColumnFactory)
input := []string{"one", "two", "three"}
for i := range input {
b.ColVec(0).Bytes().Set(i, []byte(input[i]))
}
getExpected := func(length int, sel []int) string {
var result string
for i := 0; i < length; i++ {
if i > 0 {
result += "\n"
}
rowIdx := i
if sel != nil {
rowIdx = sel[i]
}
result += "['" + input[rowIdx] + "']"
}
return result
}
for _, tc := range []struct {
length int
sel []int
}{
{length: 3},
{length: 2, sel: []int{0, 2}},
} {
b.SetSelection(tc.sel != nil)
copy(b.Selection(), tc.sel)
b.SetLength(tc.length)
assert.Equal(t, getExpected(tc.length, tc.sel), b.String())
}
}
15 changes: 14 additions & 1 deletion pkg/sql/colconv/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

# TODO(irfansharif): The dependency tree for *.eg.go needs
# sorting out. It depends on execgen+templates from elsewhere. Look towards
Expand All @@ -9,6 +9,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "colconv",
srcs = [
"batch.go", # keep
"datum_to_vec.eg.go",
"vec_to_datum.eg.go",
],
Expand All @@ -32,3 +33,15 @@ go_library(
"@com_github_lib_pq//oid",
],
)

go_test(
name = "colconv_test",
srcs = ["batch_test.go"],
embed = [":colconv"],
deps = [
"//pkg/col/coldata",
"//pkg/sql/types",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
47 changes: 47 additions & 0 deletions pkg/sql/colconv/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colconv

import (
"strings"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
)

func init() {
coldata.VecsToStringWithRowPrefix = vecsToStringWithRowPrefix
}

// vecsToStringWithRowPrefix returns a pretty representation of the vectors with
// each row being in a separate string.
func vecsToStringWithRowPrefix(vecs []coldata.Vec, length int, sel []int, prefix string) []string {
var builder strings.Builder
converter := NewAllVecToDatumConverter(len(vecs))
defer converter.Release()
converter.ConvertVecs(vecs, length, sel)
result := make([]string, length)
strs := make([]string, len(vecs))
for i := 0; i < length; i++ {
builder.Reset()
rowIdx := i
if sel != nil {
rowIdx = sel[i]
}
builder.WriteString(prefix + "[")
for colIdx := 0; colIdx < len(vecs); colIdx++ {
strs[colIdx] = converter.GetDatumColumn(colIdx)[rowIdx].String()
}
builder.WriteString(strings.Join(strs, " "))
builder.WriteString("]")
result[i] = builder.String()
}
return result
}
53 changes: 53 additions & 0 deletions pkg/sql/colconv/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colconv

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestVecsToStringWithRowPrefix(t *testing.T) {
defer leaktest.AfterTest(t)()

vec := coldata.NewMemColumn(types.String, coldata.BatchSize(), coldata.StandardColumnFactory)
input := []string{"one", "two", "three"}
for i := range input {
vec.Bytes().Set(i, []byte(input[i]))
}
getExpected := func(length int, sel []int, prefix string) []string {
result := make([]string, length)
for i := 0; i < length; i++ {
rowIdx := i
if sel != nil {
rowIdx = sel[i]
}
result[i] = prefix + "['" + input[rowIdx] + "']"
}
return result
}
for _, tc := range []struct {
length int
sel []int
prefix string
}{
{length: 3},
{length: 2, sel: []int{0, 2}},
{length: 3, prefix: "row: "},
{length: 2, sel: []int{0, 2}, prefix: "row: "},
} {
require.Equal(t, getExpected(tc.length, tc.sel, tc.prefix), vecsToStringWithRowPrefix([]coldata.Vec{vec}, tc.length, tc.sel, tc.prefix))
}
}
5 changes: 5 additions & 0 deletions pkg/sql/colexec/colexecbase/simple_project.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexecbase

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
Expand Down Expand Up @@ -86,6 +87,10 @@ func (b *projectingBatch) ReplaceCol(col coldata.Vec, idx int) {
b.Batch.ReplaceCol(col, int(b.projection[idx]))
}

func (b *projectingBatch) String() string {
return strings.Join(coldata.VecsToStringWithRowPrefix(b.ColVecs(), b.Length(), b.Selection(), "" /* prefix */), "\n")
}

// NewSimpleProjectOp returns a new simpleProjectOp that applies a simple
// projection on the columns in its input batch, returning a new batch with
// only the columns in the projection slice, in order. In a degenerate case
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/colflow/flow_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ type FlowCoordinator struct {
row rowenc.EncDatumRow
meta *execinfrapb.ProducerMetadata

// cancelFlow will return a function to cancel the context of the flow. It
// is a function in order to be lazily evaluated, since the context
// cancellation function is only available after the flow is Start()'ed.
cancelFlow func() context.CancelFunc
// cancelFlow cancels the context of the flow.
cancelFlow context.CancelFunc
}

var flowCoordinatorPool = sync.Pool{
Expand All @@ -57,7 +55,7 @@ func NewFlowCoordinator(
processorID int32,
input execinfra.RowSource,
output execinfra.RowReceiver,
cancelFlow func() context.CancelFunc,
cancelFlow context.CancelFunc,
) *FlowCoordinator {
f := flowCoordinatorPool.Get().(*FlowCoordinator)
f.input = input
Expand Down Expand Up @@ -155,7 +153,7 @@ func (f *FlowCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad

func (f *FlowCoordinator) close() {
if f.InternalClose() {
f.cancelFlow()()
f.cancelFlow()
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ func (s *vectorizedFlowCreator) setupOutput(
pspec.ProcessorID,
input,
s.syncFlowConsumer,
s.getCancelFlowFn,
s.getCancelFlowFn(),
)
// The flow coordinator is a root of its operator chain.
s.opChains = append(s.opChains, f)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (c callbackCloser) Close() error {
return c.closeCb()
}

// TestVectorizedFlowShutdown tests that closing the materializer correctly
// TestVectorizedFlowShutdown tests that closing the FlowCoordinator correctly
// closes all the infrastructure corresponding to the flow ending in that
// materializer. Namely:
// FlowCoordinator. Namely:
// - on a remote node, it creates a colflow.HashRouter with 3 outputs (with a
// corresponding to each colrpc.Outbox) as well as 3 standalone Outboxes;
// - on a local node, it creates 6 colrpc.Inboxes that feed into an unordered
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestVectorizedFlowShutdown(t *testing.T) {
1, /* processorID */
materializer,
nil, /* output */
func() context.CancelFunc { return cancelLocal },
cancelLocal,
)
coordinator.Start(ctxLocal)

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,8 @@ type DescribeResult interface {

// SetInferredTypes tells the client about the inferred placeholder types.
SetInferredTypes([]oid.Oid)
// SetNoDataDescription is used to tell the client that the prepared statement
// or portal produces no rows.
// SetNoDataRowDescription is used to tell the client that the prepared
// statement or portal produces no rows.
SetNoDataRowDescription()
// SetPrepStmtOutput tells the client about the results schema of a prepared
// statement.
Expand Down
Loading

0 comments on commit 040a7af

Please sign in to comment.