Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
62635: sql: use vectorize=on when not explicitly set r=yuzefovich a=yuzefovich

**sql: set NeededColumns on TableReaderSpec in some tests**

This field is used by the vectorized cFetcher in order to know which
columns should be actually decoded. The follow-up commit will switch
some of the tests to go through the vectorized engine, and this commit
is a prerequisite for that.

Release note: None

**colexec: do not create a span in the materializer**

This commit removes the creation of a tracing span (when tracing is
enabled) in the materializer. This is a minor performance optimization,
but the change was actually prompted by breaking a unit test of the
migration manager by the follow-up commit if the materializer creates
a tracing span.

I left a somewhat detailed comment of what's wrong with the test, but
not making a tracing span is reasonable on its own, so I think this
change is worth it.

Release note: None

**sql: use vectorize=on when not explicitly set**

Previously, if `Vectorize` value of the session data wasn't explicitly
set, it would remain as 0 which was the equivalent of `VectorizeOff`.
This commit renames the zero value to `VectorizeUnset` that is mapped to
"on", `VectorizeOff` now needs to be set explicitly. Completely removing
zero value is not allowed by the proto3.

Fixes: cockroachdb#62394.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jun 2, 2021
2 parents eb046e1 + 5a92a58 commit 6c2e732
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 105 deletions.
14 changes: 14 additions & 0 deletions pkg/migration/migrationmanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ RETURNING id;`).Scan(&secondID))
}()

testutils.SucceedsSoon(t, func() error {
// TODO(yuzefovich): this check is quite unfortunate since it relies on
// the assumption that all recordings from the child spans are imported
// into the tracer. However, this is not the case for the DistSQL
// processors where child spans are created with
// WithParentAndManualCollection option which requires explicitly
// importing the recordings from the children. This only happens when
// the execution flow is drained which cannot happen until we close
// the 'unblock' channel, and this we cannot do until we see the
// expected message in the trace.
//
// At the moment it works in a very fragile manner (by making sure that
// no processors actually create their own spans). Instead, a different
// way to observe the status of the migration manager should be
// introduced and should be used here.
if tracing.FindMsgInRecording(getRecording(), "found existing migration job") > 0 {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ var (
errMetadataTestReceiverWrap = errors.New("core.MetadataTestReceiver is not supported")
errChangeAggregatorWrap = errors.New("core.ChangeAggregator is not supported")
errChangeFrontierWrap = errors.New("core.ChangeFrontier is not supported")
errReadImportWrap = errors.New("core.ReadImport is not supported")
errBackupDataWrap = errors.New("core.BackupData is not supported")
errBackfillerWrap = errors.New("core.Backfiller is not supported (not an execinfra.RowSource)")
errCSVWriterWrap = errors.New("core.CSVWriter is not supported (not an execinfra.RowSource)")
errSamplerWrap = errors.New("core.Sampler is not supported (not an execinfra.RowSource)")
Expand All @@ -274,6 +276,7 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSp
case spec.Core.Backfiller != nil:
return errBackfillerWrap
case spec.Core.ReadImport != nil:
return errReadImportWrap
case spec.Core.CSVWriter != nil:
return errCSVWriterWrap
case spec.Core.Sampler != nil:
Expand Down Expand Up @@ -307,6 +310,7 @@ func canWrap(mode sessiondatapb.VectorizeExecMode, spec *execinfrapb.ProcessorSp
case spec.Core.InvertedFilterer != nil:
case spec.Core.InvertedJoiner != nil:
case spec.Core.BackupData != nil:
return errBackupDataWrap
case spec.Core.SplitAndScatter != nil:
case spec.Core.RestoreData != nil:
case spec.Core.Filterer != nil:
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ func (d *drainHelper) Release() {
drainHelperPool.Put(d)
}

const materializerProcName = "materializer"

var materializerPool = sync.Pool{
New: func() interface{} {
return &Materializer{}
Expand Down Expand Up @@ -232,7 +230,7 @@ func (m *Materializer) OutputTypes() []*types.T {

// Start is part of the execinfra.RowSource interface.
func (m *Materializer) Start(ctx context.Context) {
ctx = m.StartInternal(ctx, materializerProcName)
ctx = m.StartInternalNoSpan(ctx)
// We can encounter an expected error during Init (e.g. an operator
// attempts to allocate a batch, but the memory budget limit has been
// reached), so we need to wrap it with a catcher.
Expand Down
12 changes: 3 additions & 9 deletions pkg/sql/colflow/colbatch_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,9 @@ func TestColBatchScanMeta(t *testing.T) {
Spans: []execinfrapb.TableReaderSpan{
{Span: td.PrimaryIndexSpan(keys.SystemSQLCodec)},
},
Table: *td.TableDesc(),
NeededColumns: []uint32{0},
Table: *td.TableDesc(),
}},
Post: execinfrapb.PostProcessSpec{
Projection: true,
OutputColumns: []uint32{0},
},
ResultTypes: types.OneIntCol,
}

Expand Down Expand Up @@ -135,11 +132,8 @@ func BenchmarkColBatchScan(b *testing.B) {
Spans: []execinfrapb.TableReaderSpan{
{Span: tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)},
},
NeededColumns: []uint32{0, 1},
}},
Post: execinfrapb.PostProcessSpec{
Projection: true,
OutputColumns: []uint32{0, 1},
},
ResultTypes: types.TwoIntCols,
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,10 @@ var VectorizeClusterMode = settings.RegisterEnumSetting(
"default vectorize mode",
"on",
map[int64]string{
int64(sessiondatapb.VectorizeOff): "off",
int64(sessiondatapb.VectorizeUnset): "on",
int64(sessiondatapb.VectorizeOn): "on",
int64(sessiondatapb.VectorizeExperimentalAlways): "experimental_always",
int64(sessiondatapb.VectorizeOff): "off",
},
)

Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/execinfra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@ import "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
//
// ATTENTION: When updating these fields, add a brief description of what
// changed to the version history below.
const Version execinfrapb.DistSQLVersion = 47
const Version execinfrapb.DistSQLVersion = 48

// MinAcceptedVersion is the oldest version that the server is compatible with.
// A server will not accept flows with older versions.
const MinAcceptedVersion execinfrapb.DistSQLVersion = 47
const MinAcceptedVersion execinfrapb.DistSQLVersion = 48

/*
** VERSION HISTORY **
Please add new entries at the top.
- Version: 48 (MinAcceptedVersion: 48)
- Zero value for VectorizeExecMode changed meaning from "off" to "on".
- Version: 47 (MinAcceptedVersion: 47)
- A new synchronizer type (serial unordered) has been introduced explicitly.
MinAcceptedVersion needed to be bumped because if the older server receives
Expand Down
21 changes: 12 additions & 9 deletions pkg/sql/flowinfra/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,24 @@ func TestClusterFlow(t *testing.T) {
leafInputState := txn.GetLeafTxnInputState(ctx)

tr1 := execinfrapb.TableReaderSpec{
Table: *desc.TableDesc(),
IndexIdx: 1,
Spans: []execinfrapb.TableReaderSpan{makeIndexSpan(0, 8)},
Table: *desc.TableDesc(),
IndexIdx: 1,
Spans: []execinfrapb.TableReaderSpan{makeIndexSpan(0, 8)},
NeededColumns: []uint32{0, 1},
}

tr2 := execinfrapb.TableReaderSpec{
Table: *desc.TableDesc(),
IndexIdx: 1,
Spans: []execinfrapb.TableReaderSpan{makeIndexSpan(8, 12)},
Table: *desc.TableDesc(),
IndexIdx: 1,
Spans: []execinfrapb.TableReaderSpan{makeIndexSpan(8, 12)},
NeededColumns: []uint32{0, 1},
}

tr3 := execinfrapb.TableReaderSpec{
Table: *desc.TableDesc(),
IndexIdx: 1,
Spans: []execinfrapb.TableReaderSpan{makeIndexSpan(12, 100)},
Table: *desc.TableDesc(),
IndexIdx: 1,
Spans: []execinfrapb.TableReaderSpan{makeIndexSpan(12, 100)},
NeededColumns: []uint32{0, 1},
}

fid := execinfrapb.FlowID{UUID: uuid.MakeV4()}
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/flowinfra/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ func TestServer(t *testing.T) {
td := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "test", "t")

ts := execinfrapb.TableReaderSpec{
Table: *td.TableDesc(),
IndexIdx: 0,
Reverse: false,
Spans: []execinfrapb.TableReaderSpan{{Span: td.PrimaryIndexSpan(keys.SystemSQLCodec)}},
Table: *td.TableDesc(),
IndexIdx: 0,
Reverse: false,
Spans: []execinfrapb.TableReaderSpan{{Span: td.PrimaryIndexSpan(keys.SystemSQLCodec)}},
NeededColumns: []uint32{0, 1},
}
post := execinfrapb.PostProcessSpec{
Projection: true,
Expand Down
24 changes: 19 additions & 5 deletions pkg/sql/physicalplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func checkDistAggregationInfo(

makeTableReader := func(startPK, endPK int, streamID int) execinfrapb.ProcessorSpec {
tr := execinfrapb.TableReaderSpec{
Table: *tableDesc.TableDesc(),
Spans: make([]execinfrapb.TableReaderSpan, 1),
Table: *tableDesc.TableDesc(),
Spans: make([]execinfrapb.TableReaderSpan, 1),
NeededColumns: []uint32{uint32(colIdx)},
}

var err error
Expand Down Expand Up @@ -202,7 +203,20 @@ func checkDistAggregationInfo(
txn := kv.NewTxn(ctx, srv.DB(), srv.NodeID())

// First run a flow that aggregates all the rows without any local stages.

nonDistFinalOutputTypes := finalOutputTypes
if info.FinalRendering != nil {
h := tree.MakeTypesOnlyIndexedVarHelper(finalOutputTypes)
renderExpr, err := info.FinalRendering(&h, varIdxs)
if err != nil {
t.Fatal(err)
}
var expr execinfrapb.Expression
expr, err = MakeExpression(renderExpr, nil, nil)
if err != nil {
t.Fatal(err)
}
nonDistFinalOutputTypes = []*types.T{expr.LocalExpr.ResolvedType()}
}
rowsNonDist := runTestFlow(
t, srv, txn,
makeTableReader(1, numRows+1, 0),
Expand All @@ -223,7 +237,7 @@ func checkDistAggregationInfo(
{Type: execinfrapb.StreamEndpointSpec_SYNC_RESPONSE},
},
}},
ResultTypes: finalOutputTypes,
ResultTypes: nonDistFinalOutputTypes,
},
)

Expand Down Expand Up @@ -305,7 +319,7 @@ func checkDistAggregationInfo(
t.Fatal(err)
}
finalProc.Post.RenderExprs = []execinfrapb.Expression{expr}

finalProc.ResultTypes = []*types.T{expr.LocalExpr.ResolvedType()}
}

procs = append(procs, finalProc)
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/sem/tree/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ go_library(
"//pkg/sql/privilege",
"//pkg/sql/roleoption",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqltelemetry",
"//pkg/sql/types",
Expand Down
13 changes: 5 additions & 8 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -3464,13 +3463,11 @@ func MakeTestingEvalContext(st *cluster.Settings) EvalContext {
// EvalContext so do not start or close the memory monitor.
func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonitor) EvalContext {
ctx := EvalContext{
Codec: keys.SystemSQLCodec,
Txn: &kv.Txn{},
SessionData: &sessiondata.SessionData{SessionData: sessiondatapb.SessionData{
VectorizeMode: sessiondatapb.VectorizeOn,
}},
Settings: st,
NodeID: base.TestingIDContainer,
Codec: keys.SystemSQLCodec,
Txn: &kv.Txn{},
SessionData: &sessiondata.SessionData{},
Settings: st,
NodeID: base.TestingIDContainer,
}
monitor.Start(context.Background(), nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64))
ctx.Mon = monitor
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/sessiondatapb/session_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ func BytesEncodeFormatFromString(val string) (_ BytesEncodeFormat, ok bool) {

func (m VectorizeExecMode) String() string {
switch m {
case VectorizeOff:
return "off"
case VectorizeOn:
case VectorizeOn, VectorizeUnset:
return "on"
case VectorizeExperimentalAlways:
return "experimental_always"
case VectorizeOff:
return "off"
default:
return fmt.Sprintf("invalid (%d)", m)
}
Expand All @@ -96,12 +96,12 @@ func (m VectorizeExecMode) String() string {
func VectorizeExecModeFromString(val string) (VectorizeExecMode, bool) {
var m VectorizeExecMode
switch strings.ToUpper(val) {
case "OFF":
m = VectorizeOff
case "ON":
m = VectorizeOn
case "EXPERIMENTAL_ALWAYS":
m = VectorizeExperimentalAlways
case "OFF":
m = VectorizeOff
default:
return 0, false
}
Expand Down
Loading

0 comments on commit 6c2e732

Please sign in to comment.