diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index af60291cdc1e..383911894a1b 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -1103,6 +1103,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { distSQLServer.ServerConfig.SQLStatsController = pgServer.SQLServer.GetSQLStatsController() distSQLServer.ServerConfig.SchemaTelemetryController = pgServer.SQLServer.GetSchemaTelemetryController() distSQLServer.ServerConfig.IndexUsageStatsController = pgServer.SQLServer.GetIndexUsageStatsController() + distSQLServer.ServerConfig.StatsRefresher = statsRefresher // We use one BytesMonitor for all Executor's created by the // internalDB. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 7833f3c90dfd..659a2ad87b5e 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -294,6 +294,7 @@ go_library( "//pkg/cloud/externalconn", "//pkg/clusterversion", "//pkg/col/coldata", + "//pkg/col/coldataext", "//pkg/config", "//pkg/config/zonepb", "//pkg/docs", @@ -378,6 +379,7 @@ go_library( "//pkg/sql/colexec", "//pkg/sql/colfetcher", "//pkg/sql/colflow", + "//pkg/sql/colmem", "//pkg/sql/compengine", "//pkg/sql/comprules", "//pkg/sql/contention", @@ -551,6 +553,7 @@ go_library( "@com_github_cockroachdb_errors//hintdetail", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", + "@com_github_dustin_go_humanize//:go-humanize", "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", "@com_github_lib_pq//:pq", diff --git a/pkg/sql/colexec/BUILD.bazel b/pkg/sql/colexec/BUILD.bazel index 9e151624ee1e..998bfd6df4d6 100644 --- a/pkg/sql/colexec/BUILD.bazel +++ b/pkg/sql/colexec/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "count.go", "hash_aggregator.go", "hash_group_joiner.go", + "insert.go", "invariants_checker.go", "limit.go", "materializer.go", @@ -43,12 +44,16 @@ go_library( "//pkg/col/coldata", "//pkg/col/coldataext", # keep "//pkg/col/typeconv", # keep + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/server/telemetry", # keep "//pkg/settings", + "//pkg/sql/catalog", "//pkg/sql/catalog/catenumpb", "//pkg/sql/catalog/colinfo", # keep + "//pkg/sql/catalog/descpb", "//pkg/sql/colconv", + "//pkg/sql/colenc", "//pkg/sql/colexec/colexecagg", # keep "//pkg/sql/colexec/colexecargs", "//pkg/sql/colexec/colexecbase", @@ -64,7 +69,9 @@ go_library( "//pkg/sql/execinfra/execreleasable", "//pkg/sql/execinfrapb", "//pkg/sql/memsize", + "//pkg/sql/row", "//pkg/sql/rowenc", + "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/sqltelemetry", # keep @@ -73,7 +80,9 @@ go_library( "//pkg/util/buildutil", "//pkg/util/duration", # keep "//pkg/util/encoding", # keep + "//pkg/util/intsets", "//pkg/util/json", # keep + "//pkg/util/log", "//pkg/util/stringarena", "//pkg/util/tracing", "@com_github_cockroachdb_apd_v3//:apd", # keep diff --git a/pkg/sql/colexec/colbuilder/execplan.go b/pkg/sql/colexec/colbuilder/execplan.go index 8dd68e17444b..189a2ec63173 100644 --- a/pkg/sql/colexec/colbuilder/execplan.go +++ b/pkg/sql/colexec/colbuilder/execplan.go @@ -238,6 +238,9 @@ func supportedNatively(core *execinfrapb.ProcessorCoreUnion) error { // distinguish from other unsupported cores. return errLocalPlanNodeWrap + case core.Insert != nil: + return nil + default: return errCoreUnsupportedNatively } @@ -818,11 +821,20 @@ func NewColOperator( return r, err } if core.Values.NumRows == 0 || len(core.Values.Columns) == 0 { - // To simplify valuesOp we handle some special cases with - // fixedNumTuplesNoInputOp. - result.Root = colexecutils.NewFixedNumTuplesNoInputOp( - getStreamingAllocator(ctx, args), int(core.Values.NumRows), nil, /* opToInitialize */ - ) + // Handle coldata.Batch vector source. + if b, ok := args.LocalVectorSources[args.Spec.ProcessorID]; ok { + batch, ok := b.(coldata.Batch) + if !ok { + colexecerror.InternalError(errors.AssertionFailedf("LocalVectorSource wasn't a coldata.Batch")) + } + result.Root = colexecutils.NewRawColDataBatchOp(batch) + } else { + // To simplify valuesOp we handle some special cases with + // fixedNumTuplesNoInputOp. + result.Root = colexecutils.NewFixedNumTuplesNoInputOp( + getStreamingAllocator(ctx, args), int(core.Values.NumRows), nil, /* opToInitialize */ + ) + } } else { result.Root = colexec.NewValuesOp( getStreamingAllocator(ctx, args), core.Values, execinfra.GetWorkMemLimit(flowCtx), @@ -1728,6 +1740,15 @@ func NewColOperator( input = result.Root } + case core.Insert != nil: + if err := checkNumIn(inputs, 1); err != nil { + return r, err + } + outputIdx := len(spec.Input[0].ColumnTypes) + result.Root = colexec.NewInsertOp(ctx, flowCtx, core.Insert, inputs[0].Root, spec.ResultTypes, outputIdx, + getStreamingAllocator(ctx, args), args.ExprHelper.SemaCtx) + result.ColumnTypes = spec.ResultTypes + default: return r, errors.AssertionFailedf("unsupported processor core %q", core) } diff --git a/pkg/sql/colexec/colexecargs/op_creation.go b/pkg/sql/colexec/colexecargs/op_creation.go index 94c0e0244ae3..88be8e996ff9 100644 --- a/pkg/sql/colexec/colexecargs/op_creation.go +++ b/pkg/sql/colexec/colexecargs/op_creation.go @@ -64,13 +64,15 @@ type NewColOperatorArgs struct { StreamingMemAccount *mon.BoundAccount ProcessorConstructor execinfra.ProcessorConstructor LocalProcessors []execinfra.LocalProcessor - DiskQueueCfg colcontainer.DiskQueueCfg - FDSemaphore semaphore.Semaphore - ExprHelper *ExprHelper - Factory coldata.ColumnFactory - MonitorRegistry *MonitorRegistry - TypeResolver *descs.DistSQLTypeResolver - TestingKnobs struct { + // any is actually a coldata.Batch, see physicalplan.PhysicalInfrastructure comments. + LocalVectorSources map[int32]any + DiskQueueCfg colcontainer.DiskQueueCfg + FDSemaphore semaphore.Semaphore + ExprHelper *ExprHelper + Factory coldata.ColumnFactory + MonitorRegistry *MonitorRegistry + TypeResolver *descs.DistSQLTypeResolver + TestingKnobs struct { // SpillingCallbackFn will be called when the spilling from an in-memory // to disk-backed operator occurs. It should only be set in tests. SpillingCallbackFn func() diff --git a/pkg/sql/colexec/colexecutils/operator.go b/pkg/sql/colexec/colexecutils/operator.go index 92c20d416124..328d336e5376 100644 --- a/pkg/sql/colexec/colexecutils/operator.go +++ b/pkg/sql/colexec/colexecutils/operator.go @@ -86,6 +86,28 @@ func (s *fixedNumTuplesNoInputOp) Next() coldata.Batch { return s.batch } +type rawBatchOp struct { + colexecop.ZeroInputNode + batch coldata.Batch +} + +var _ colexecop.Operator = &rawBatchOp{} + +func (s *rawBatchOp) Init(ctx context.Context) { +} + +func (s *rawBatchOp) Next() coldata.Batch { + b := s.batch + s.batch = coldata.ZeroBatch + return b +} + +// NewRawColDataBatchOp allocates a rawBatchOp. This is used by COPY to perform +// vectorized inserts. +func NewRawColDataBatchOp(b coldata.Batch) colexecop.Operator { + return &rawBatchOp{batch: b} +} + // vectorTypeEnforcer is a utility Operator that on every call to Next // enforces that non-zero length batch from the input has a vector of the // desired type in the desired position. If the width of the batch is less than diff --git a/pkg/sql/colexec/insert.go b/pkg/sql/colexec/insert.go new file mode 100644 index 000000000000..1e600519639d --- /dev/null +++ b/pkg/sql/colexec/insert.go @@ -0,0 +1,226 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package colexec + +import ( + "bytes" + "context" + "math" + + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/colenc" + "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" + "github.com/cockroachdb/cockroach/pkg/sql/colexecop" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +type vectorInserter struct { + colexecop.OneInputHelper + desc catalog.TableDescriptor + insertCols []catalog.Column + retBatch coldata.Batch + flowCtx *execinfra.FlowCtx + // checkOrds are the columns containing bool values with check expression + // results. + checkOrds intsets.Fast + // If we have checkOrds we need a sema context to format error messages. + semaCtx *tree.SemaContext + // mutationQuota is the number of bytes we'll allow in the kv.Batch before + // finishing it and starting a new one. + mutationQuota int + // If auto commit is true we'll commit the last batch. + autoCommit bool +} + +var _ colexecop.Operator = &vectorInserter{} + +// NewInsertOp allocates a new vector insert operator. Currently the only input +// will be a rawBatchOp and only output is row count so this doesn't support +// the full gamut of insert operations. +func NewInsertOp( + ctx context.Context, + flowCtx *execinfra.FlowCtx, + spec *execinfrapb.InsertSpec, + input colexecop.Operator, + typs []*types.T, + outputIdx int, + alloc *colmem.Allocator, + semaCtx *tree.SemaContext, +) colexecop.Operator { + // TODO(cucaroach): Should we dispense with the formalities and just pass a + // pointer to this through with the coldata.Batch? + desc := flowCtx.TableDescriptor(ctx, &spec.Table) + insCols := make([]catalog.Column, len(spec.ColumnIDs)) + for i, c := range spec.ColumnIDs { + col, err := catalog.MustFindColumnByID(desc, c) + if err != nil { + colexecerror.InternalError(err) + } + insCols[i] = col + } + + // Empirical testing shows that if ApproximateMutationBytes approaches + // 32MB we'll hit the command limit. So set limit to a fraction of + // command limit to be safe. + mutationQuota := int(kvserverbase.MaxCommandSize.Get(&flowCtx.Cfg.Settings.SV) / 3) + + v := vectorInserter{ + OneInputHelper: colexecop.MakeOneInputHelper(input), + desc: desc, + retBatch: alloc.NewMemBatchWithFixedCapacity(typs, 1), + flowCtx: flowCtx, + insertCols: insCols, + mutationQuota: mutationQuota, + autoCommit: spec.AutoCommit, + } + if spec.CheckOrds != nil { + if err := v.checkOrds.Decode(bytes.NewReader(spec.CheckOrds)); err != nil { + colexecerror.InternalError(err) + } + v.semaCtx = semaCtx + } + return &v +} + +func (v *vectorInserter) getPartialIndexMap(b coldata.Batch) map[catid.IndexID][]bool { + var partialIndexColMap map[descpb.IndexID][]bool + // Create a set of partial index IDs to not write to. Indexes should not be + // written to when they are partial indexes and the row does not satisfy the + // predicate. This set is passed as a parameter to tableInserter.row below. + pindexes := v.desc.PartialIndexes() + if n := len(pindexes); n > 0 { + colOffset := len(v.insertCols) + v.checkOrds.Len() + numCols := len(b.ColVecs()) - colOffset + if numCols != len(pindexes) { + colexecerror.InternalError(errors.AssertionFailedf("num extra columns didn't match number of partial indexes")) + } + for i := 0; i < numCols; i++ { + if partialIndexColMap == nil { + partialIndexColMap = make(map[descpb.IndexID][]bool) + } + partialIndexColMap[pindexes[i].GetID()] = b.ColVec(i + colOffset).Bool() + } + } + return partialIndexColMap +} + +func (v *vectorInserter) Next() coldata.Batch { + ctx := v.Ctx + b := v.Input.Next() + if b.Length() == 0 { + return coldata.ZeroBatch + } + + if !v.checkOrds.Empty() { + if err := v.checkMutationInput(ctx, b); err != nil { + colexecerror.ExpectedError(err) + } + } + partialIndexColMap := v.getPartialIndexMap(b) + + kvba := row.KVBatchAdapter{} + var p row.Putter = &kvba + if v.flowCtx.TraceKV { + p = &row.TracePutter{Putter: p, Ctx: ctx} + } + // In the future we could sort across multiple goroutines, not worth it yet, + // time here is minimal compared to time spent executing batch. + p = &row.SortingPutter{Putter: p} + enc := colenc.MakeEncoder(v.flowCtx.Codec(), v.desc, &v.flowCtx.Cfg.Settings.SV, b, v.insertCols, v.flowCtx.GetRowMetrics(), partialIndexColMap, + func() error { + if kvba.Batch.ApproximateMutationBytes() > v.mutationQuota { + return colenc.ErrOverMemLimit + } + return nil + }) + // PrepareBatch is called in a loop to partially insert till everything is + // done, if there are a ton of secondary indexes we could hit raft + // command limit building kv batch so we need to be able to do + // it in chunks of rows. + end := b.Length() + start := 0 + for start < b.Length() { + kvba.Batch = v.flowCtx.Txn.NewBatch() + if err := enc.PrepareBatch(ctx, p, start, end); err != nil { + if errors.Is(err, colenc.ErrOverMemLimit) { + log.VEventf(ctx, 2, "vector insert memory limit err %d, numrows: %d", start, end) + end /= 2 + // If one row blows out memory limit, just do one row at a time. + if end <= start { + // Disable memory limit, if the system can't handle this row + // a KV error will be encountered below. + v.mutationQuota = math.MaxInt + end = start + 1 + } + // Throw everything away and start over. + kvba.Batch = v.flowCtx.Txn.NewBatch() + continue + } + colexecerror.ExpectedError(err) + } + log.VEventf(ctx, 2, "copy running batch, autocommit: %v, final: %v, numrows: %d", v.autoCommit, end == b.Length(), end-start) + var err error + if v.autoCommit && end == b.Length() { + err = v.flowCtx.Txn.CommitInBatch(ctx, kvba.Batch) + } else { + err = v.flowCtx.Txn.Run(ctx, kvba.Batch) + } + if err != nil { + colexecerror.ExpectedError(row.ConvertBatchError(ctx, v.desc, kvba.Batch)) + } + numRows := end - start + start = end + end += numRows + if end > b.Length() { + end = b.Length() + } + } + + v.retBatch.ResetInternalBatch() + v.retBatch.ColVec(0).Int64()[0] = int64(b.Length()) + v.retBatch.SetLength(1) + + v.flowCtx.Cfg.StatsRefresher.NotifyMutation(v.desc, b.Length()) + + return v.retBatch +} + +func (v *vectorInserter) checkMutationInput(ctx context.Context, b coldata.Batch) error { + checks := v.desc.EnforcedCheckConstraints() + colIdx := 0 + for i, ch := range checks { + if !v.checkOrds.Contains(i) { + continue + } + vec := b.ColVec(colIdx + len(v.insertCols)) + bools := vec.Bool() + nulls := vec.Nulls() + for r := 0; r < b.Length(); r++ { + if !bools[r] && !nulls.NullAt(r) { + return row.CheckFailed(ctx, v.semaCtx, v.flowCtx.EvalCtx.SessionData(), v.desc, ch) + } + } + colIdx++ + } + return nil +} diff --git a/pkg/sql/colflow/explain_vec.go b/pkg/sql/colflow/explain_vec.go index 61db06e86548..4290efda2ae4 100644 --- a/pkg/sql/colflow/explain_vec.go +++ b/pkg/sql/colflow/explain_vec.go @@ -59,7 +59,7 @@ func convertToVecTree( flowCtx.Cfg.VecFDSemaphore, flowCtx.NewTypeResolver(flowCtx.Txn), admission.WorkInfo{}, ) - opChains, _, err = creator.setupFlow(ctx, flowCtx, flow.Processors, localProcessors, fuseOpt) + opChains, _, err = creator.setupFlow(ctx, flowCtx, flow.Processors, localProcessors, nil /*localVectorSources*/, fuseOpt) cleanup = func() { creator.cleanup(ctx) creator.Release() diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index cd6a7237ffd2..6be2c4ede16a 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -267,7 +267,7 @@ func (f *vectorizedFlow) Setup( if f.testingKnobs.onSetupFlow != nil { f.testingKnobs.onSetupFlow(f.creator) } - opChains, batchFlowCoordinator, err := f.creator.setupFlow(ctx, flowCtx, spec.Processors, f.GetLocalProcessors(), opt) + opChains, batchFlowCoordinator, err := f.creator.setupFlow(ctx, flowCtx, spec.Processors, f.GetLocalProcessors(), f.GetLocalVectorSources(), opt) if err != nil { // It is (theoretically) possible that some of the memory monitoring // infrastructure was created even in case of an error, and we need to @@ -1119,6 +1119,7 @@ func (s *vectorizedFlowCreator) setupFlow( flowCtx *execinfra.FlowCtx, processorSpecs []execinfrapb.ProcessorSpec, localProcessors []execinfra.LocalProcessor, + localVectorSources map[int32]any, opt flowinfra.FuseOpt, ) (opChains execopnode.OpChains, batchFlowCoordinator *BatchFlowCoordinator, err error) { if vecErr := colexecerror.CatchVectorizedRuntimeError(func() { @@ -1171,6 +1172,7 @@ func (s *vectorizedFlowCreator) setupFlow( StreamingMemAccount: s.monitorRegistry.NewStreamingMemAccount(flowCtx), ProcessorConstructor: rowexec.NewProcessor, LocalProcessors: localProcessors, + LocalVectorSources: localVectorSources, DiskQueueCfg: s.diskQueueCfg, FDSemaphore: s.fdSemaphore, ExprHelper: s.exprHelper, diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 0491d086f689..0edaa84d00da 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -247,7 +247,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { nil /* fdSemaphore */, descs.DistSQLTypeResolver{}, admission.WorkInfo{}, ) - _, _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, nil /* localProcessors */, flowinfra.FuseNormally) + _, _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, nil /* localProcessors */, nil /*localVectorSources*/, flowinfra.FuseNormally) defer vfc.cleanup(ctx) require.NoError(t, err) diff --git a/pkg/sql/copy/BUILD.bazel b/pkg/sql/copy/BUILD.bazel index 0a7ed2015780..c93608860020 100644 --- a/pkg/sql/copy/BUILD.bazel +++ b/pkg/sql/copy/BUILD.bazel @@ -14,6 +14,7 @@ go_test( deps = [ "//pkg/base", "//pkg/cli/clisqlclient", + "//pkg/keys", "//pkg/kv/kvpb", "//pkg/security/securityassets", "//pkg/security/securitytest", @@ -21,6 +22,8 @@ go_test( "//pkg/server", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/catalog", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/randgen", "//pkg/sql/sem/tree", "//pkg/sql/sqltestutils", @@ -32,6 +35,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", + "//pkg/util/encoding", "//pkg/util/encoding/csv", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/sql/copy/copy_test.go b/pkg/sql/copy/copy_test.go index 4ef06f2ee0d8..c1b755a2ba89 100644 --- a/pkg/sql/copy/copy_test.go +++ b/pkg/sql/copy/copy_test.go @@ -16,9 +16,10 @@ import ( "database/sql/driver" "fmt" "io" + "math/rand" "net/url" "regexp" - "runtime/pprof" + "strconv" "strings" "testing" "time" @@ -26,16 +27,25 @@ import ( "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/clisqlclient" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/jackc/pgconn" @@ -522,51 +532,231 @@ func TestShowQueriesIncludesCopy(t *testing.T) { }) } -// BenchmarkCopyFrom measures copy performance against a TestServer. -func BenchmarkCopyFrom(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) +// TestLargeDynamicRows ensure that we don't overflow memory with large rows by +// testing that we break the inserts into batches, in this case at least 1 +// batch per row. Also make sure adequately sized buffers just use 1 batch. +func TestLargeDynamicRows(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() + params, _ := tests.CreateTestServerParams() + var batchNumber int + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + BeforeCopyFromInsert: func() error { + batchNumber++ + return nil + }, + } + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) - s, _, _ := serverutils.StartServer(b, base.TestServerArgs{ - Settings: cluster.MakeTestingClusterSettings(), - }) + url, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "copytest", url.User(username.RootUser)) + defer cleanup() + var sqlConnCtx clisqlclient.Context + conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, url.String()) + + // Only copy-fast-path has proper row accounting, override metamorphic that + // might turn it off. + err := conn.Exec(ctx, `SET COPY_FAST_PATH_ENABLED = 'true'`) + require.NoError(t, err) + + // 4.0 MiB is minimum, copy sets max row size to this value / 3 + err = conn.Exec(ctx, "SET CLUSTER SETTING kv.raft.command.max_size = '4.0MiB'") + require.NoError(t, err) + + err = conn.Exec(ctx, "CREATE TABLE t (s STRING)") + require.NoError(t, err) + + rng, _ := randutil.NewTestRand() + str := randutil.RandString(rng, (2<<20)+1, "asdf") + + var sb strings.Builder + for i := 0; i < 4; i++ { + sb.WriteString(str) + sb.WriteString("\n") + } + _, err = conn.GetDriverConn().CopyFrom(ctx, strings.NewReader(sb.String()), "COPY t FROM STDIN") + require.NoError(t, err) + require.Greater(t, batchNumber, 4) + batchNumber = 0 + + // Reset and make sure we use 1 batch. + err = conn.Exec(ctx, "RESET CLUSTER SETTING kv.raft.command.max_size") + require.NoError(t, err) + + // This won't work if the batch size gets set to less than 4. + if sql.CopyBatchRowSize < 4 { + sql.SetCopyFromBatchSize(4) + } + + _, err = conn.GetDriverConn().CopyFrom(ctx, strings.NewReader(sb.String()), "COPY t FROM STDIN") + require.NoError(t, err) + require.Equal(t, 1, batchNumber) +} + +// TestTinyRows ensures batch sizing logic doesn't explode with small table. +func TestTinyRows(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + params, _ := tests.CreateTestServerParams() + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + url, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "copytest", url.User(username.RootUser)) + defer cleanup() + var sqlConnCtx clisqlclient.Context + conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, url.String()) + + err := conn.Exec(ctx, "CREATE TABLE t (b BOOL PRIMARY KEY)") + require.NoError(t, err) + + _, err = conn.GetDriverConn().CopyFrom(ctx, strings.NewReader("true\nfalse\n"), "COPY t FROM STDIN") + require.NoError(t, err) +} + +// TODO(cucaroach): get the rand utilities and ParseAndRequire to be friends +// STRINGS don't roundtrip well, need to figure out proper escaping +// INET doesn't round trip: ERROR: could not parse "70e5:112:5114:7da5:1" as inet. invalid IP (SQLSTATE 22P02) +// DECIMAL(15,2) don't round trip, get too big number errors. +const lineitemSchemaMunged string = `CREATE TABLE lineitem ( + l_orderkey INT8 NOT NULL, + l_partkey INT8 NOT NULL, + l_suppkey INT8 NOT NULL, + l_linenumber INT8 NOT NULL, + l_quantity INT8 NOT NULL, + l_extendedprice FLOAT NOT NULL, + l_discount FLOAT NOT NULL, + l_tax FLOAT NOT NULL, + l_returnflag TIMESTAMPTZ NOT NULL, + l_linestatus TIMESTAMPTZ NOT NULL, + l_shipdate DATE NOT NULL, + l_commitdate DATE NOT NULL, + l_receiptdate DATE NOT NULL, + l_shipinstruct INTERVAL NOT NULL, + l_shipmode UUID NOT NULL, + l_comment UUID NOT NULL, + PRIMARY KEY (l_orderkey, l_linenumber), + INDEX l_ok (l_orderkey ASC), + INDEX l_pk (l_partkey ASC), + INDEX l_sk (l_suppkey ASC), + INDEX l_sd (l_shipdate ASC), + INDEX l_cd (l_commitdate ASC), + INDEX l_rd (l_receiptdate ASC), + INDEX l_pk_sk (l_partkey ASC, l_suppkey ASC), + INDEX l_sk_pk (l_suppkey ASC, l_partkey ASC) +)` + +// Perform a COPY of N rows, N can be arbitrarily large to test huge copies. +func TestLargeCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + s, _, kvdb := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) - url, cleanup := sqlutils.PGUrl(b, s.ServingSQLAddr(), "copytest", url.User(username.RootUser)) + url, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), "copytest", url.User(username.RootUser)) defer cleanup() var sqlConnCtx clisqlclient.Context conn := sqlConnCtx.MakeSQLConn(io.Discard, io.Discard, url.String()) - err := conn.Exec(ctx, lineitemSchema) - require.NoError(b, err) - - // send data in 5 batches of 10k rows - const ROWS = sql.CopyBatchRowSizeDefault * 4 - datalen := 0 - var rows []string - for i := 0; i < ROWS; i++ { - row := fmt.Sprintf(csvData, i) - rows = append(rows, row) - datalen += len(row) + err := conn.Exec(ctx, lineitemSchemaMunged) + require.NoError(t, err) + + desc := desctestutils.TestingGetTableDescriptor(kvdb, keys.SystemSQLCodec, "defaultdb", "public", "lineitem") + require.NotNil(t, desc, "Failed to lookup descriptor") + + err = conn.Exec(ctx, "SET copy_from_atomic_enabled = false") + require.NoError(t, err) + + rng := rand.New(rand.NewSource(0)) + rows := 10000 + numrows, err := conn.GetDriverConn().CopyFrom(ctx, + ©Reader{rng: rng, cols: desc.PublicColumns(), rows: rows}, + "COPY lineitem FROM STDIN WITH CSV;") + require.NoError(t, err) + require.Equal(t, int(numrows), rows) +} + +type copyReader struct { + cols []catalog.Column + rng *rand.Rand + rows int + count int + generatedRows io.Reader +} + +var _ io.Reader = ©Reader{} + +func (c *copyReader) Read(b []byte) (n int, err error) { + if c.generatedRows == nil { + c.generateRows() } - rowsize := datalen / ROWS - for _, batchSizeFactor := range []float64{.5, 1, 2, 4} { - batchSize := int(batchSizeFactor * sql.CopyBatchRowSizeDefault) - b.Run(fmt.Sprintf("%d", batchSize), func(b *testing.B) { - actualRows := rows[:batchSize] - for i := 0; i < b.N; i++ { - pprof.Do(ctx, pprof.Labels("run", "copy"), func(ctx context.Context) { - numrows, err := conn.GetDriverConn().CopyFrom(ctx, strings.NewReader(strings.Join(actualRows, "\n")), "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';") - require.NoError(b, err) - require.Equal(b, int(numrows), len(actualRows)) - }) - b.StopTimer() - err = conn.Exec(ctx, "TRUNCATE TABLE lineitem") - require.NoError(b, err) - b.StartTimer() + n, err = c.generatedRows.Read(b) + if err == io.EOF && c.count < c.rows { + c.generateRows() + n, err = c.generatedRows.Read(b) + } + return +} + +func (c *copyReader) generateRows() { + numRows := min(1000, c.rows-c.count) + sb := strings.Builder{} + for i := 0; i < numRows; i++ { + row := make([]string, len(c.cols)) + for j, col := range c.cols { + t := col.GetType() + var ds string + if j == 0 { + // Special handling for ID field + ds = strconv.Itoa(c.count + i) + } else { + d := randgen.RandDatum(c.rng, t, col.IsNullable()) + ds = tree.AsStringWithFlags(d, tree.FmtBareStrings) + // Empty string is treated as null + if len(ds) == 0 && !col.IsNullable() { + ds = "a" + } + switch t.Family() { + case types.CollatedStringFamily: + // For collated strings, we just want the raw contents in COPY. + ds = d.(*tree.DCollatedString).Contents + case types.FloatFamily: + ds = strings.TrimSuffix(ds, ".0") + } + switch t.Family() { + case types.BytesFamily, + types.DateFamily, + types.IntervalFamily, + types.INetFamily, + types.StringFamily, + types.TimestampFamily, + types.TimestampTZFamily, + types.UuidFamily, + types.CollatedStringFamily: + var b bytes.Buffer + if err := sql.EncodeCopy(&b, encoding.UnsafeConvertStringToBytes(ds), ','); err != nil { + panic(err) + } + ds = b.String() + } } - b.SetBytes(int64(len(actualRows) * rowsize)) - }) + row[j] = ds + } + r := strings.Join(row, ",") + sb.WriteString(r) + sb.WriteString("\n") + } + c.count += numRows + c.generatedRows = strings.NewReader(sb.String()) +} + +func min(a, b int) int { + if a < b { + return a } + return b } diff --git a/pkg/sql/copy_file_upload.go b/pkg/sql/copy_file_upload.go index d31ebeddc68c..504fc83f1e2b 100644 --- a/pkg/sql/copy_file_upload.go +++ b/pkg/sql/copy_file_upload.go @@ -151,7 +151,7 @@ func newFileUploadMachine( c.format = tree.CopyFormatText c.null = `\N` c.delimiter = '\t' - c.rows.Init(c.rowsMemAcc, colinfo.ColTypeInfoFromResCols(c.resultColumns), copyBatchRowSize) + c.rows.Init(c.rowsMemAcc, colinfo.ColTypeInfoFromResCols(c.resultColumns), CopyBatchRowSize) c.scratchRow = make(tree.Datums, len(c.resultColumns)) return } diff --git a/pkg/sql/copy_from.go b/pkg/sql/copy_from.go index 66f56b13ee6f..2e3ad399c288 100644 --- a/pkg/sql/copy_from.go +++ b/pkg/sql/copy_from.go @@ -19,11 +19,17 @@ import ( "strings" "time" "unicode/utf8" + "unsafe" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/col/coldata" + "github.com/cockroachdb/cockroach/pkg/col/coldataext" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" @@ -31,29 +37,37 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" "github.com/cockroachdb/errors" + "github.com/dustin/go-humanize" ) // CopyBatchRowSizeDefault is the number of rows we insert in one insert // statement. const CopyBatchRowSizeDefault = 100 +// Vector wise inserts scale much better and this is suitable default. +// Empirically determined limit where we start to see diminishing speedups. +const CopyBatchRowSizeVectorDefault = 32 << 10 + // When this many rows are in the copy buffer, they are inserted. -var copyBatchRowSize = util.ConstantWithMetamorphicTestRange("copy-batch-size", CopyBatchRowSizeDefault, 1, 10000) +var CopyBatchRowSize = util.ConstantWithMetamorphicTestRange("copy-batch-size", CopyBatchRowSizeDefault, 1, 50_000) // SetCopyFromBatchSize exports overriding copy batch size for test code. func SetCopyFromBatchSize(i int) int { - old := copyBatchRowSize + old := CopyBatchRowSize if buildutil.CrdbTestBuild { - copyBatchRowSize = i + CopyBatchRowSize = i } else { // We don't want non-test code mutating globals. panic("SetCopyFromBatchSize is a test utility that requires crdb_test tag") @@ -237,12 +251,19 @@ type copyMachine struct { processRows func(ctx context.Context, finalBatch bool) error - scratchRow []tree.Datum + scratchRow []tree.Datum + batch coldata.Batch + accHelper colmem.SetAccountingHelper + typs []*types.T + valueHandlers []tree.ValueHandler + ph pgdate.ParseHelper // For testing we want to be able to override this on the instance level. copyBatchRowSize int - - implicitTxn bool + maxRowMem int64 + implicitTxn bool + copyFastPath bool + vectorized bool } // newCopyMachine creates a new copyMachine. @@ -298,6 +319,7 @@ func newCopyMachine( return nil, err } c.resultColumns = make(colinfo.ResultColumns, len(cols)) + typs := make([]*types.T, len(cols)) for i, col := range cols { c.resultColumns[i] = colinfo.ResultColumn{ Name: col.GetName(), @@ -305,7 +327,9 @@ func newCopyMachine( TableID: tableDesc.GetID(), PGAttributeNum: uint32(col.GetPGAttributeNum()), } + typs[i] = col.GetType() } + c.typs = typs // If there are no column specifiers and we expect non-visible columns // to have field data then we have to populate the expectedHiddenColumnIdxs // field with the columns indexes we expect to be hidden. @@ -318,11 +342,122 @@ func newCopyMachine( } c.initMonitoring(ctx, parentMon) c.processRows = c.insertRows - c.rows.Init(c.rowsMemAcc, colinfo.ColTypeInfoFromResCols(c.resultColumns), copyBatchRowSize) - c.scratchRow = make(tree.Datums, len(c.resultColumns)) + c.copyFastPath = c.p.SessionData().CopyFastPathEnabled + + // We want to do as many rows as we can keeping things under command mem + // limit. Conservatively target a fraction of kv command size. If we + // exceed this due to large dynamic values we will bail early and + // insert the rows we have so far. Note once the coldata.Batch is full + // we still have all the encoder allocations to make. + c.maxRowMem = kvserverbase.MaxCommandSize.Get(c.p.execCfg.SV()) / 3 + + if c.canSupportVectorized(tableDesc) { + c.initVectorizedCopy(ctx, typs) + } else { + c.copyBatchRowSize = CopyBatchRowSize + c.vectorized = false + c.rows.Init(c.rowsMemAcc, colinfo.ColTypeInfoFromResCols(c.resultColumns), c.copyBatchRowSize) + c.scratchRow = make(tree.Datums, len(c.resultColumns)) + } return c, nil } +func (c *copyMachine) canSupportVectorized(table catalog.TableDescriptor) bool { + // TODO(cucaroach): support vectorized binary. + if c.format == tree.CopyFormatBinary { + return false + } + // Vectorized requires avoiding materializing the rows for the optimizer. + if !c.copyFastPath { + return false + } + if c.p.SessionData().VectorizeMode == sessiondatapb.VectorizeOff { + return false + } + // Vectorized COPY doesn't support foreign key checks, no reason it couldn't + // but it doesn't work right now because we don't have the ability to + // hold the results in a bufferNode. We wouldn't want to enable it + // until we were sure that all the checks could be vectorized so the + // "bufferNode" used doesn't just get materialized into a datum based + // row container. I think that requires a vectorized version of lookup + // join. TODO(cucaroach): extend the vectorized insert code to support + // insertFastPath style FK checks. + return len(table.EnforcedOutboundForeignKeys()) == 0 +} + +func (c *copyMachine) initVectorizedCopy(ctx context.Context, typs []*types.T) { + if buildutil.CrdbTestBuild { + // We have to honor metamorphic default in testing, the transaction + // commit tests rely on it, specifically they override it to + // 1. + c.copyBatchRowSize = CopyBatchRowSize + } else { + batchSize := CopyBatchRowSizeVectorDefault + minBatchSize := 100 + // When the coldata.Batch memory usage exceeds maxRowMem we flush the + // rows we have so we want the batch's initial memory usage to + // be smaller so we don't flush every row. We also want to + // leave a comfortable buffer so some dynamic values (ie + // strings, json) don't unnecessarily push us past the limit + // but if we encounter lots of huge dynamic values we do want + // to flush the batch. + targetBatchMemUsage := c.maxRowMem / 2 + + // Now adjust batch size down based on EstimateBatchSizeBytes. Rather than + // try to unpack EstimateBatchSizeBytes just use a simple + // iterative algorithm to arrive at a reasonable batch size. + // Basically we want something from 100 to maxBatchSize but we + // don't want to have a bunch of unused memory in the + // coldata.Batch so dial it in using EstimateBatchSizeBytes. + for colmem.EstimateBatchSizeBytes(typs, batchSize) > targetBatchMemUsage && + batchSize > minBatchSize { + batchSize /= 2 + } + // Go back up by tenths to make up for 1/2 reduction overshoot. + for colmem.EstimateBatchSizeBytes(typs, batchSize) < targetBatchMemUsage && + batchSize < CopyBatchRowSizeVectorDefault { + batchSize += batchSize / 10 + } + if batchSize > CopyBatchRowSizeVectorDefault { + batchSize = CopyBatchRowSizeVectorDefault + } + // Note its possible we overshot minBatchSize and schema was so wide we + // didn't go back over it. Worst case we end up with a batch size of 50 + // but if the schema has that many columns smaller is probably better. + c.copyBatchRowSize = batchSize + } + log.VEventf(ctx, 2, "vectorized copy chose %d for batch size", c.copyBatchRowSize) + c.vectorized = true + factory := coldataext.NewExtendedColumnFactory(c.p.EvalContext()) + alloc := colmem.NewLimitedAllocator(ctx, &c.rowsMemAcc, nil /*optional unlimited memory account*/, factory) + alloc.SetMaxBatchSize(c.copyBatchRowSize) + // TODO(cucaroach): Avoid allocating selection vector. + c.accHelper.Init(alloc, c.maxRowMem, typs, false /*alwaysReallocate*/) + // Start with small number of rows, compromise between going too big and + // overallocating memory and avoiding some doubling growth batches. + c.batch, _ = c.accHelper.ResetMaybeReallocate(c.typs, c.batch, 64) + initialMemUsage := c.rowsMemAcc.Used() + if initialMemUsage > c.maxRowMem { + // Some tests set the max raft command size lower and if the metamorphic + // batch size is big enough this can happen. The affect is + // that every row will be flushed which is fine for testing so + // ignore it. + if !buildutil.CrdbTestBuild { + // The logic above failed us, this shouldn't happen, basically this + // means EstimateBatchSizeBytes off by a factor of 2. + panic(errors.AssertionFailedf("EstimateBatchSizeBytes estimated %s for %d row but actual was %s and maxRowMem was %s", + humanize.IBytes(uint64(colmem.EstimateBatchSizeBytes(typs, c.copyBatchRowSize))), + c.copyBatchRowSize, + humanize.IBytes(uint64(initialMemUsage)), + humanize.IBytes(uint64(c.maxRowMem)))) + } + } + c.valueHandlers = make([]tree.ValueHandler, len(typs)) + for i := range typs { + c.valueHandlers[i] = coldataext.MakeVecHandler(c.batch.ColVec(i)) + } +} + func (c *copyMachine) numInsertedRows() int { if c == nil { return 0 @@ -342,7 +477,6 @@ func (c *copyMachine) initMonitoring(ctx context.Context, parentMon *mon.BytesMo c.copyMon.StartNoReserved(ctx, parentMon) c.bufMemAcc = c.copyMon.MakeBoundAccount() c.rowsMemAcc = c.copyMon.MakeBoundAccount() - c.copyBatchRowSize = copyBatchRowSize } // copyTxnOpt contains information about the transaction in which the copying @@ -362,6 +496,9 @@ type copyTxnOpt struct { func (c *copyMachine) Close(ctx context.Context) { c.rows.Close(ctx) + // TODO(cucaroach): if this isn't close'd the Stop below errors out + // saying there's 10240 bytes left, investigate. + c.rowsMemAcc.Close(ctx) c.bufMemAcc.Close(ctx) c.copyMon.Stop(ctx) } @@ -433,7 +570,7 @@ Loop: switch typ { case pgwirebase.ClientMsgCopyData: if err := c.processCopyData( - ctx, string(readBuf.Msg), false, /* final */ + ctx, unsafeUint8ToString(readBuf.Msg), false, /* final */ ); err != nil { return err } @@ -505,15 +642,38 @@ func (c *copyMachine) processCopyData(ctx context.Context, data string, final bo if err != nil { return err } + var batchDone bool + if !brk && c.vectorized { + batchDone = c.accHelper.AccountForSet(c.batch.Length() - 1) + } + // If we have a full batch of rows or we have exceeded maxRowMem process + // them. Only set finalBatch to true if this is the last + // CopyData segment AND we have no more data in the buffer. + if len := c.currentBatchSize(); c.rowsMemAcc.Used() > c.maxRowMem || len == c.copyBatchRowSize || batchDone { + if len != c.copyBatchRowSize { + log.VEventf(ctx, 2, "copy batch of %d rows flushing due to memory usage %d > %d", c.batch.Length(), c.rowsMemAcc.Used(), c.maxRowMem) + } + if err := c.processRows(ctx, final && c.buf.Len() == 0); err != nil { + return err + } + } if brk { break } } - // Only do work if we have a full batch of rows or this is the end. - if ln := c.rows.Len(); !final && (ln == 0 || ln < c.copyBatchRowSize) { - return nil + // If we're done, process any remainder, if we're not done let more rows + // accumulate. + if final { + return c.processRows(ctx, final) } - return c.processRows(ctx, final) + return nil +} + +func (c *copyMachine) currentBatchSize() int { + if c.vectorized { + return c.batch.Length() + } + return c.rows.Len() } func (c *copyMachine) readTextData(ctx context.Context, final bool) (brk bool, err error) { @@ -648,23 +808,38 @@ func (c *copyMachine) readCSVTuple(ctx context.Context, record []csv.Record) err "expected %d values, got %d", expected, len(record)) } record = c.maybeIgnoreHiddenColumnsStr(record) - datums := c.scratchRow - for i, s := range record { - // NB: When we implement FORCE_NULL, then quoted values also are allowed - // to be treated as NULL. - if !s.Quoted && s.Val == c.null { - datums[i] = tree.DNull - continue + if c.vectorized { + vh := c.valueHandlers + for i, s := range record { + // NB: When we implement FORCE_NULL, then quoted values also are allowed + // to be treated as NULL. + if !s.Quoted && s.Val == c.null { + vh[i].Null() + continue + } + if err := tree.ParseAndRequireStringHandler(c.resultColumns[i].Typ, s.Val, c.parsingEvalCtx, vh[i], &c.ph); err != nil { + return err + } } - d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s.Val, c.parsingEvalCtx) - if err != nil { + c.batch.SetLength(c.batch.Length() + 1) + } else { + datums := c.scratchRow + for i, s := range record { + // NB: When we implement FORCE_NULL, then quoted values also are allowed + // to be treated as NULL. + if !s.Quoted && s.Val == c.null { + datums[i] = tree.DNull + continue + } + d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s.Val, c.parsingEvalCtx) + if err != nil { + return err + } + datums[i] = d + } + if _, err := c.rows.AddRow(ctx, datums); err != nil { return err } - - datums[i] = d - } - if _, err := c.rows.AddRow(ctx, datums); err != nil { - return err } return nil } @@ -872,21 +1047,25 @@ func (c *copyMachine) insertRowsInternal(ctx context.Context, finalBatch bool) ( defer func() { retErr = cleanup(ctx, retErr) }() - if c.rows.Len() == 0 { - return nil - } - numRows := c.rows.Len() - if c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert != nil { if err := c.p.ExecCfg().TestingKnobs.BeforeCopyFromInsert(); err != nil { return err } } - - copyFastPath := c.p.SessionData().CopyFastPathEnabled + numRows := c.currentBatchSize() + if numRows == 0 { + return nil + } + // TODO(cucaroach): Investigate caching memo/plan/etc so that we don't + // rebuild everything for every batch. var vc tree.SelectStatement - if copyFastPath { - vc = &tree.LiteralValuesClause{Rows: &c.rows} + if c.copyFastPath { + if c.vectorized { + b := tree.VectorRows{Batch: c.batch} + vc = &tree.LiteralValuesClause{Rows: &b} + } else { + vc = &tree.LiteralValuesClause{Rows: &c.rows} + } } else { // This is best effort way of mimic'ing pre-copyFastPath behavior, its // not exactly the same but should suffice to workaround any bugs due to @@ -915,6 +1094,8 @@ func (c *copyMachine) insertRowsInternal(ctx context.Context, finalBatch bool) ( Returning: tree.AbsentReturningClause, } c.txnOpt.initPlanner(ctx, c.p) + + // TODO(cucaroach): We shouldn't need to do this for every batch. if err := c.p.makeOptimizerPlan(ctx); err != nil { return err } @@ -934,7 +1115,24 @@ func (c *copyMachine) insertRowsInternal(ctx context.Context, finalBatch bool) ( } c.insertedRows += numRows // We're done reset for next batch. - return c.rows.UnsafeReset(ctx) + if c.vectorized { + var realloc bool + c.batch, realloc = c.accHelper.ResetMaybeReallocate(c.typs, c.batch, 0 /* tuplesToBeSet*/) + if realloc { + for i := range c.typs { + c.valueHandlers[i] = coldataext.MakeVecHandler(c.batch.ColVec(i)) + } + } else { + for _, vh := range c.valueHandlers { + vh.Reset() + } + } + } else { + if err := c.rows.UnsafeReset(ctx); err != nil { + return err + } + } + return nil } func (c *copyMachine) maybeIgnoreHiddenColumnsBytes(in [][]byte) [][]byte { @@ -958,9 +1156,17 @@ func (c *copyMachine) readTextTuple(ctx context.Context, line []byte) error { "expected %d values, got %d", expected, len(parts)) } parts = c.maybeIgnoreHiddenColumnsBytes(parts) + if c.vectorized { + return c.readTextTupleVec(ctx, parts) + } else { + return c.readTextTupleDatum(ctx, parts) + } +} + +func (c *copyMachine) readTextTupleDatum(ctx context.Context, parts [][]byte) error { datums := c.scratchRow for i, part := range parts { - s := string(part) + s := unsafeUint8ToString(part) // Disable NULL conversion during file uploads. if !c.forceNotNull && s == c.null { datums[i] = tree.DNull @@ -1001,6 +1207,43 @@ func (c *copyMachine) readTextTuple(ctx context.Context, line []byte) error { return err } +func (c *copyMachine) readTextTupleVec(ctx context.Context, parts [][]byte) error { + for i, part := range parts { + s := unsafeUint8ToString(part) + // Disable NULL conversion during file uploads. + if !c.forceNotNull && s == c.null { + c.valueHandlers[i].Null() + continue + } + decodeTyp := c.resultColumns[i].Typ + for decodeTyp.Family() == types.ArrayFamily { + decodeTyp = decodeTyp.ArrayContents() + } + switch decodeTyp.Family() { + case types.BytesFamily, + types.DateFamily, + types.IntervalFamily, + types.INetFamily, + types.StringFamily, + types.TimestampFamily, + types.TimestampTZFamily, + types.UuidFamily: + s = DecodeCopy(s) + } + switch c.resultColumns[i].Typ.Family() { + case types.BytesFamily: + // This just bypasses DecodeRawBytesToByteArrayAuto, not sure why... + c.valueHandlers[i].Bytes(encoding.UnsafeConvertStringToBytes(s)) + default: + if err := tree.ParseAndRequireStringHandler(c.resultColumns[i].Typ, s, c.parsingEvalCtx, c.valueHandlers[i], &c.ph); err != nil { + return err + } + } + } + c.batch.SetLength(c.batch.Length() + 1) + return nil +} + // DecodeCopy unescapes a single COPY field. // // See: https://www.postgresql.org/docs/9.5/static/sql-copy.html#AEN74432 @@ -1116,3 +1359,7 @@ const ( binaryStateRead binaryStateFoundTrailer ) + +func unsafeUint8ToString(data []uint8) string { + return *(*string)(unsafe.Pointer(&data)) +} diff --git a/pkg/sql/copy_to.go b/pkg/sql/copy_to.go index ee6f29af105b..941b3d1217ea 100644 --- a/pkg/sql/copy_to.go +++ b/pkg/sql/copy_to.go @@ -57,7 +57,7 @@ func (t *textCopyToTranslater) translateRow( continue } t.fmtCtx.FormatNode(d) - if err := encodeCopy(&t.rowBuffer, t.fmtCtx.Buffer.Bytes(), t.delimiter); err != nil { + if err := EncodeCopy(&t.rowBuffer, t.fmtCtx.Buffer.Bytes(), t.delimiter); err != nil { return nil, err } t.fmtCtx.Buffer.Reset() @@ -250,11 +250,11 @@ var encodeMap = func() map[byte]byte { return ret }() -// encodeCopy escapes a single COPY field. +// EncodeCopy escapes a single COPY field. // // See: https://www.postgresql.org/docs/9.5/static/sql-copy.html#AEN74432 // NOTE: we don't have to worry about hex in COPY TO. -func encodeCopy(w io.Writer, in []byte, delimiter byte) error { +func EncodeCopy(w io.Writer, in []byte, delimiter byte) error { lastIndex := 0 for i, r := range in { if escapeChar, ok := encodeMap[r]; ok || r == delimiter { diff --git a/pkg/sql/copy_to_test.go b/pkg/sql/copy_to_test.go index 818b18c98cb2..246263695ec7 100644 --- a/pkg/sql/copy_to_test.go +++ b/pkg/sql/copy_to_test.go @@ -36,7 +36,7 @@ func TestEncodeCopy(t *testing.T) { for _, tc := range testCases { t.Run(fmt.Sprintf("%s, delimiter %c", tc.in, tc.delimiter), func(t *testing.T) { var b bytes.Buffer - require.NoError(t, encodeCopy(&b, []byte(tc.in), tc.delimiter)) + require.NoError(t, EncodeCopy(&b, []byte(tc.in), tc.delimiter)) require.Equal(t, tc.expected, b.String()) // Check decode is the same. diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index e72ea6fa373a..d99211bbeac8 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -389,7 +389,8 @@ func (ds *ServerImpl) setupFlow( isVectorized := req.EvalContext.SessionData.VectorizeMode != sessiondatapb.VectorizeOff f := newFlow( flowCtx, sp, ds.flowRegistry, rowSyncFlowConsumer, batchSyncFlowConsumer, - localState.LocalProcs, isVectorized, onFlowCleanupEnd, req.StatementSQL, + localState.LocalProcs, localState.LocalVectorSources, isVectorized, + onFlowCleanupEnd, req.StatementSQL, ) opt := flowinfra.FuseNormally if !localState.MustUseLeafTxn() { @@ -521,11 +522,12 @@ func newFlow( rowSyncFlowConsumer execinfra.RowReceiver, batchSyncFlowConsumer execinfra.BatchReceiver, localProcessors []execinfra.LocalProcessor, + localVectorSources map[int32]any, isVectorized bool, onFlowCleanupEnd func(), statementSQL string, ) flowinfra.Flow { - base := flowinfra.NewFlowBase(flowCtx, sp, flowReg, rowSyncFlowConsumer, batchSyncFlowConsumer, localProcessors, onFlowCleanupEnd, statementSQL) + base := flowinfra.NewFlowBase(flowCtx, sp, flowReg, rowSyncFlowConsumer, batchSyncFlowConsumer, localProcessors, localVectorSources, onFlowCleanupEnd, statementSQL) if isVectorized { return colflow.NewVectorizedFlow(base) } @@ -562,6 +564,11 @@ type LocalState struct { // LocalProcs is an array of planNodeToRowSource processors. It's in order and // will be indexed into by the RowSourceIdx field in LocalPlanNodeSpec. LocalProcs []execinfra.LocalProcessor + + // LocalVectorSources is a map of local vector sources for Insert operator + // mapping to coldata.Batch, use any to avoid injecting new + // dependencies. + LocalVectorSources map[int32]any } // MustUseLeafTxn returns true if a LeafTxn must be used. It is valid to call diff --git a/pkg/sql/distsql/vectorized_panic_propagation_test.go b/pkg/sql/distsql/vectorized_panic_propagation_test.go index a8e5ec9900aa..7faf834ac98a 100644 --- a/pkg/sql/distsql/vectorized_panic_propagation_test.go +++ b/pkg/sql/distsql/vectorized_panic_propagation_test.go @@ -50,6 +50,7 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) { nil, /* rowSyncFlowConsumer */ nil, /* batchSyncFlowConsumer */ nil, /* localProcessors */ + nil, /* localVectorProcessors */ nil, /* onFlowCleanup */ "", /* statementSQL */ ) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 9c023ad9887a..fa802478e871 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -11,6 +11,7 @@ package sql import ( + "bytes" "context" "fmt" "reflect" @@ -477,11 +478,9 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool { // If the plan is local, we also wrap the valuesNode to avoid pointless // serialization of the values, and also to avoid situations in which // expressions within the valuesNode were not distributable in the first - // place. - if !specifiedInQuery || planCtx.isLocal { - return true - } - return false + // place. Unless the plan is a vector insert where we are inserting a preformed + // coldata.Batch. + return !specifiedInQuery || (planCtx.isLocal && !planCtx.isVectorInsert) } // checkSupportForPlanNode returns a distRecommendation (as described above) or @@ -852,6 +851,9 @@ type PlanningCtx struct { // release the resources that are acquired during the physical planning and // are being held onto throughout the whole flow lifecycle. onFlowCleanup []func() + + // This is true if plan is a simple insert that can be vectorized. + isVectorInsert bool } var _ physicalplan.ExprContext = &PlanningCtx{} @@ -3346,6 +3348,13 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( case *indexJoinNode: plan, err = dsp.createPlanForIndexJoin(ctx, planCtx, n) + case *insertNode: + if planCtx.isVectorInsert { + plan, err = dsp.createPlanForInsert(ctx, planCtx, n) + } else { + plan, err = dsp.wrapPlan(ctx, planCtx, n, false /* allowPartialDistribution */) + } + case *invertedFilterNode: plan, err = dsp.createPlanForInvertedFilter(ctx, planCtx, n) @@ -3387,6 +3396,28 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( return nil, err } + case *rowCountNode: + if in, ok := n.source.(*insertNode); ok { + var valNode planNode + // We support two cases, a render around a values and a straight values. + if r, ok := in.source.(*renderNode); ok { + valNode = r.source.plan + } else { + valNode = in.source + } + if v, ok := valNode.(*valuesNode); ok { + if v.coldataBatch != nil { + planCtx.isVectorInsert = true + } + } + } + + if planCtx.isVectorInsert { + plan, err = dsp.createPlanForRowCount(ctx, planCtx, n) + } else { + plan, err = dsp.wrapPlan(ctx, planCtx, n, false /* allowPartialDistribution */) + } + case *scanNode: plan, err = dsp.createTableReaders(ctx, planCtx, n) @@ -3425,7 +3456,14 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( if err != nil { return nil, err } - plan, err = dsp.createValuesPlan(planCtx, spec, colTypes) + var idx physicalplan.ProcessorIdx + plan, idx, err = dsp.createValuesPlan(planCtx, spec, colTypes) + if n.coldataBatch != nil { + if plan.LocalVectorSources == nil { + plan.LocalVectorSources = make(map[int32]any) + } + plan.LocalVectorSources[int32(idx)] = n.coldataBatch + } } case *windowNode: @@ -3608,7 +3646,7 @@ func (dsp *DistSQLPlanner) createValuesSpec( // located on the gateway node. func (dsp *DistSQLPlanner) createValuesPlan( planCtx *PlanningCtx, spec *execinfrapb.ValuesCoreSpec, resultTypes []*types.T, -) (*PhysicalPlan, error) { +) (*PhysicalPlan, physicalplan.ProcessorIdx, error) { p := planCtx.NewPhysicalPlan() pIdx := p.AddProcessor(physicalplan.Processor{ @@ -3624,7 +3662,7 @@ func (dsp *DistSQLPlanner) createValuesPlan( p.Distribution = physicalplan.LocalPlan p.PlanToStreamColMap = identityMapInPlace(make([]int, len(resultTypes))) - return p, nil + return p, pIdx, nil } // createValuesSpecFromTuples creates a ValuesCoreSpec from the results of @@ -3669,7 +3707,8 @@ func (dsp *DistSQLPlanner) createPlanForUnary( } spec := dsp.createValuesSpec(planCtx, types, 1 /* numRows */, nil /* rawBytes */) - return dsp.createValuesPlan(planCtx, spec, types) + plan, _, err := dsp.createValuesPlan(planCtx, spec, types) + return plan, err } func (dsp *DistSQLPlanner) createPlanForZero( @@ -3681,7 +3720,8 @@ func (dsp *DistSQLPlanner) createPlanForZero( } spec := dsp.createValuesSpec(planCtx, types, 0 /* numRows */, nil /* rawBytes */) - return dsp.createValuesPlan(planCtx, spec, types) + plan, _, err := dsp.createValuesPlan(planCtx, spec, types) + return plan, err } func (dsp *DistSQLPlanner) createDistinctSpec( @@ -4585,7 +4625,78 @@ func finalizePlanWithRowCount( }) // Assign processor IDs. - for i := range plan.Processors { + for i, p := range plan.Processors { plan.Processors[i].Spec.ProcessorID = int32(i) + // Double check that our reliance on ProcessorID == index is good. + if _, ok := plan.LocalVectorSources[int32(i)]; ok { + // Ensure processor is a values spec. + if p.Spec.Core.Values == nil { + panic(errors.AssertionFailedf("expected processor to be Values")) + } + } + } +} + +// TODO(cucaroach): this doesn't work, get it working as part of effort to make +// distsql inserts handle general inserts. +func (dsp *DistSQLPlanner) createPlanForRowCount( + ctx context.Context, planCtx *PlanningCtx, n *rowCountNode, +) (*PhysicalPlan, error) { + plan, err := dsp.createPhysPlanForPlanNode(ctx, planCtx, n.source) + plan.PlanToStreamColMap = identityMap(nil, 1) + // fn := newAggregateFuncHolder( + // execinfrapb.AggregatorSpec_Func_name[int32(execinfrapb.AggregatorSpec_COUNT_ROWS)], + // []int{0}, + // nil, /* arguments */ + // false, /* isDistinct */ + // ) + // gn := groupNode{ + // columns: []colinfo.ResultColumn{{Name: "rowCount", Typ: types.Int}}, + // plan: n, + // groupCols: []int{0}, + // isScalar: true, + // funcs: []*aggregateFuncHolder{fn}, + // } + // // This errors: no builtin aggregate for COUNT_ROWS on [int] + // if err := dsp.addAggregators(ctx, planCtx, plan, &gn); err != nil { + // return nil, err + // } + return plan, err +} + +func (dsp *DistSQLPlanner) createPlanForInsert( + ctx context.Context, planCtx *PlanningCtx, n *insertNode, +) (*PhysicalPlan, error) { + plan, err := dsp.createPhysPlanForPlanNode(ctx, planCtx, n.source) + if err != nil { + return nil, err + } + insColIDs := make([]descpb.ColumnID, len(n.run.insertCols)) + for i, c := range n.run.insertCols { + insColIDs[i] = c.GetID() + } + var buff bytes.Buffer + if !n.run.checkOrds.Empty() { + if err := n.run.checkOrds.Encode(&buff); err != nil { + return nil, err + } } + insertSpec := execinfrapb.InsertSpec{ + Table: *n.run.ti.tableDesc().TableDesc(), + ColumnIDs: insColIDs, + CheckOrds: buff.Bytes(), + AutoCommit: n.run.ti.autoCommit == autoCommitEnabled, + } + var typs []*types.T + if len(n.columns) > 0 { + return nil, errors.AssertionFailedf("distsql insert doesn't support RETURNING") + } else { + typs = []*types.T{types.Int} + } + plan.AddNoGroupingStage( + execinfrapb.ProcessorCoreUnion{Insert: &insertSpec}, + execinfrapb.PostProcessSpec{}, + typs, + execinfrapb.Ordering{}) + return plan, nil } diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index eb07b2b32e45..35a1c074d149 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -689,6 +689,7 @@ func (dsp *DistSQLPlanner) Run( localState.MustUseLeaf = planCtx.mustUseLeafTxn localState.Txn = txn localState.LocalProcs = plan.LocalProcessors + localState.LocalVectorSources = plan.LocalVectorSources // If we have access to a planner and are currently being used to plan // statements in a user transaction, then take the descs.Collection to resolve // types with during flow execution. This is necessary to do in the case of diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 0db12c83923a..4be97afb5e69 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -106,7 +106,7 @@ func (e *distSQLSpecExecFactory) ConstructValues( planCtx := e.getPlanCtx(canDistribute) colTypes := getTypesFromResultColumns(cols) spec := e.dsp.createValuesSpec(planCtx, colTypes, len(rows), nil /* rawBytes */) - physPlan, err := e.dsp.createValuesPlan(planCtx, spec, colTypes) + physPlan, _, err := e.dsp.createValuesPlan(planCtx, spec, colTypes) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (e *distSQLSpecExecFactory) ConstructValues( if err != nil { return nil, err } - physPlan, err = e.dsp.createValuesPlan(planCtx, spec, colTypes) + physPlan, _, err = e.dsp.createValuesPlan(planCtx, spec, colTypes) } if err != nil { return nil, err diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index fe0823e13fa7..37292614e0fa 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlliveness", + "//pkg/sql/stats", "//pkg/sql/types", "//pkg/storage/fs", "//pkg/util/admission", diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 82267d185c6d..874f853f06d7 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/limit" @@ -191,6 +192,9 @@ type ServerConfig struct { // with elastic CPU control. AdmissionPacerFactory admission.PacerFactory + // Allow mutation operations to trigger stats refresh. + StatsRefresher *stats.Refresher + // *sql.ExecutorConfig exposed as an interface (due to dependency cycles). ExecutorConfig interface{} } diff --git a/pkg/sql/execinfrapb/flow_diagram.go b/pkg/sql/execinfrapb/flow_diagram.go index a379fae17560..d2df926c1bb8 100644 --- a/pkg/sql/execinfrapb/flow_diagram.go +++ b/pkg/sql/execinfrapb/flow_diagram.go @@ -620,6 +620,14 @@ func (g *GenerativeSplitAndScatterSpec) summary() (string, []string) { return "GenerativeSplitAndScatterSpec", []string{detail} } +// summary implements the diagramCellType interface. +func (i *InsertSpec) summary() (string, []string) { + return "Insert", []string{ + fmt.Sprintf("TableID: %d", i.Table.ID), + fmt.Sprintf("AutoCommit: %t", i.AutoCommit), + } +} + type diagramCell struct { Title string `json:"title"` Details []string `json:"details"` diff --git a/pkg/sql/execinfrapb/processors.proto b/pkg/sql/execinfrapb/processors.proto index a55e9a876e57..f8c152cc3010 100644 --- a/pkg/sql/execinfrapb/processors.proto +++ b/pkg/sql/execinfrapb/processors.proto @@ -125,9 +125,10 @@ message ProcessorCoreUnion { optional HashGroupJoinerSpec hashGroupJoiner = 40; optional GenerativeSplitAndScatterSpec generativeSplitAndScatter = 41; optional CloudStorageTestSpec cloudStorageTest = 42; + optional InsertSpec insert = 43; reserved 6, 12, 14, 17, 18, 19, 20; - // NEXT ID: 43. + // NEXT ID: 44. } // NoopCoreSpec indicates a "no-op" processor core. This is used when we just diff --git a/pkg/sql/execinfrapb/processors_sql.proto b/pkg/sql/execinfrapb/processors_sql.proto index f2c29f8271b3..b96b37e5ef31 100644 --- a/pkg/sql/execinfrapb/processors_sql.proto +++ b/pkg/sql/execinfrapb/processors_sql.proto @@ -1087,3 +1087,18 @@ message HashGroupJoinerSpec { optional AggregatorSpec aggregator_spec = 3 [(gogoproto.nullable) = false]; } + +// InsertSpec is a limited insert processor that can only handle vectorized +// batch inserts for rows affected insert queries (ie doesn't support returning +// rows or inserts from select). +message InsertSpec { + optional sqlbase.TableDescriptor table = 1 [(gogoproto.nullable) = false]; + repeated uint32 column_ids = 2 [ + (gogoproto.customname) = "ColumnIDs", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ColumnID" + ]; + // The serialized bytes from intsets.Fast.Encode. + optional bytes check_ords = 3; + // Whether the insert should be autocommitted. + optional bool auto_commit = 4 [(gogoproto.nullable) = false]; +} diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 19c29a9295f8..2890569e57a4 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -186,7 +186,8 @@ type FlowBase struct { // pushing coldata.Batches to locally. batchSyncFlowConsumer execinfra.BatchReceiver - localProcessors []execinfra.LocalProcessor + localProcessors []execinfra.LocalProcessor + localVectorSources map[int32]any // startedGoroutines specifies whether this flow started any goroutines. This // is used in Wait() to avoid the overhead of waiting for non-existent @@ -283,6 +284,7 @@ func NewFlowBase( rowSyncFlowConsumer execinfra.RowReceiver, batchSyncFlowConsumer execinfra.BatchReceiver, localProcessors []execinfra.LocalProcessor, + localVectorSources map[int32]any, onFlowCleanupEnd func(), statementSQL string, ) *FlowBase { @@ -306,6 +308,7 @@ func NewFlowBase( rowSyncFlowConsumer: rowSyncFlowConsumer, batchSyncFlowConsumer: batchSyncFlowConsumer, localProcessors: localProcessors, + localVectorSources: localVectorSources, admissionInfo: admissionInfo, onCleanupEnd: onFlowCleanupEnd, status: flowNotStarted, @@ -389,6 +392,11 @@ func (f *FlowBase) GetLocalProcessors() []execinfra.LocalProcessor { return f.localProcessors } +// GetLocalVectorSources return the LocalVectorSources of this flow. +func (f *FlowBase) GetLocalVectorSources() map[int32]any { + return f.localVectorSources +} + // GetAdmissionInfo returns the information to use for admission control on // responses received from a remote flow. func (f *FlowBase) GetAdmissionInfo() admission.WorkInfo { diff --git a/pkg/sql/opt/optbuilder/insert.go b/pkg/sql/opt/optbuilder/insert.go index 99cffb0a42e6..972a24cc15a8 100644 --- a/pkg/sql/opt/optbuilder/insert.go +++ b/pkg/sql/opt/optbuilder/insert.go @@ -242,7 +242,7 @@ func (b *Builder) buildInsert(ins *tree.Insert, inScope *scope) (outScope *scope mb.addTargetNamedColsForInsert(ins.Columns) } else { values := mb.extractValuesInput(ins.Rows) - if values != nil { + if values != nil && len(values.Rows) > 0 { // Target columns are implicitly targeted by VALUES expression in the // same order they appear in the target table schema. mb.addTargetTableColsForInsert(len(values.Rows[0])) diff --git a/pkg/sql/opt/optbuilder/mutation_builder.go b/pkg/sql/opt/optbuilder/mutation_builder.go index 101164006ae2..609808d8dda3 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder.go +++ b/pkg/sql/opt/optbuilder/mutation_builder.go @@ -562,7 +562,7 @@ func (mb *mutationBuilder) extractValuesInput(inputRows *tree.Select) *tree.Valu // or just the unchanged input expression if there are no DEFAULT values. func (mb *mutationBuilder) replaceDefaultExprs(inRows *tree.Select) (outRows *tree.Select) { values := mb.extractValuesInput(inRows) - if values == nil { + if values == nil || len(values.Rows) == 0 { return inRows } diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index e9bb7c5d5801..dd190d5bf4ff 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -105,12 +105,24 @@ func (ef *execFactory) ConstructLiteralValues( if rows.NumRows() == 0 { return &zeroNode{columns: cols}, nil } - return &valuesNode{ - columns: cols, - specifiedInQuery: true, - externallyOwnedContainer: true, - valuesRun: valuesRun{rows: rows.(*rowcontainer.RowContainer)}, - }, nil + switch t := rows.(type) { + case *rowcontainer.RowContainer: + return &valuesNode{ + columns: cols, + specifiedInQuery: true, + externallyOwnedContainer: true, + valuesRun: valuesRun{rows: t}, + }, nil + case *tree.VectorRows: + return &valuesNode{ + columns: cols, + specifiedInQuery: true, + externallyOwnedContainer: true, + coldataBatch: t.Batch, + }, nil + default: + return nil, errors.AssertionFailedf("unexpected rows type %T in ConstructLiteralValues", rows) + } } // ConstructScan is part of the exec.Factory interface. diff --git a/pkg/sql/physicalplan/physical_plan.go b/pkg/sql/physicalplan/physical_plan.go index b97453ab5221..7e8da109be75 100644 --- a/pkg/sql/physicalplan/physical_plan.go +++ b/pkg/sql/physicalplan/physical_plan.go @@ -83,6 +83,20 @@ type PhysicalInfrastructure struct { // wrapping had to happen. LocalProcessors []execinfra.LocalProcessor + // LocalVectorSources contains canned coldata.Batch's to be used as vector + // engine input sources. This is currently used for COPY, eventually + // should probably be replaced by a proper copy processor that + // materializes coldata batches from pgwire stream in a distsql + // processor itself but that might have to wait until we deprecate + // non-atomic COPY support (maybe? a COPY distsql processor could just + // just finish when running non atomic and N rows were inserted if we + // could pull rows from the pgwire buffer across distsql executions). + // In that case we wouldn't be plumbing coldata.Batch's here we'd be + // plumbing "vector" sources which would be an interface with + // implementations for COPY and other batch streams (ie prepared + // batches). Use any to avoid creating unwanted package dependencies. + LocalVectorSources map[int32]any + // Streams accumulates the streams in the plan - both local (intra-node) and // remote (inter-node); when we have a final plan, the streams are used to // generate processor input and output specs (see PopulateEndpoints). diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index 0b4d239bc173..57ad2f499559 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -121,6 +121,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree", visibility = ["//visibility:public"], deps = [ + "//pkg/col/coldata", "//pkg/col/typeconv", # keep "//pkg/geo", "//pkg/geo/geopb", diff --git a/pkg/sql/sem/tree/values.go b/pkg/sql/sem/tree/values.go index 38a25d8c11d5..77280c05be1d 100644 --- a/pkg/sql/sem/tree/values.go +++ b/pkg/sql/sem/tree/values.go @@ -19,6 +19,8 @@ package tree +import "github.com/cockroachdb/cockroach/pkg/col/coldata" + // ValuesClause represents a VALUES clause. type ValuesClause struct { Rows []Exprs @@ -60,6 +62,28 @@ type LiteralValuesClause struct { Rows ExprContainer } +// VectorRows lets us store a Batch in a tree.LiteralValuesClause. +type VectorRows struct { + Batch coldata.Batch +} + +// NumRows implements the ExprContainer interface. +func (r VectorRows) NumRows() int { + return r.Batch.Length() +} + +// NumCols implements the ExprContainer interface. +func (r VectorRows) NumCols() int { + return r.Batch.Width() +} + +// Get implements the ExprContainer interface. +func (r VectorRows) Get(i, j int) Expr { + return DNull +} + +var _ ExprContainer = VectorRows{} + // Format implements the NodeFormatter interface. func (node *ValuesClause) Format(ctx *FmtCtx) { ctx.WriteString("VALUES ") diff --git a/pkg/sql/values.go b/pkg/sql/values.go index fbf4a5b138a5..3ca48b59bd81 100644 --- a/pkg/sql/values.go +++ b/pkg/sql/values.go @@ -13,6 +13,7 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" @@ -35,6 +36,9 @@ type valuesNode struct { externallyOwnedContainer bool valuesRun + + // Allow passing a coldata.Batch through a valuesNode. + coldataBatch coldata.Batch } func (p *planner) newContainerValuesNode(columns colinfo.ResultColumns, capacity int) *valuesNode {