Skip to content

Commit

Permalink
copy: add vectorize insert support used solely by copy for now
Browse files Browse the repository at this point in the history
Implement new on by default vectorized insert for COPY FROM statements.
Controlled by vectorize and copy_fast_path_enabled session variables
which both default to true.  If you set copy_fast_path_enabled to false
you get the old unoptimized behavior (22.1).  If you leave
copy_fast_path_enabled enabled but turn off vectorize you get the 22.2
behavior.

COPY FROM fast path row version and vectorized version both now respect
memory limits on a per row basis, ie if huge rows are encountered
COPY buffers will be flushed before we reach the configured copy row
batch size. Also if lots of rows are sent in one CopyData statement
we will now flush when we reach the copy row batch size limit instead
of inserting all the data. This matters little with psql clients which
typically do a row per CopyData segment but matters a lot with pgx
which will do 64k CopyData segments.

Keys are not inserted in the exact same order as they were with the row
version of copy. Now they are sorted per batch so that all the PK Keys
are inserted and then the first secondary index etc.

The vectorized insert benefits from larger batch sizes so we are more
generous with how big they can get.  By default we start with 64 row
batches and double up till a limit derived by KV raft command batch
size parameterized by schema (ie wider bigger schema will get smaller
batch size upper limit) not to exceed 32k which is roughly where
performance gains from bigger batches start to trail off.

Epic: CRDB-18892
Informs: cockroachdb#91831
Release note (sql change): Bulk COPY FROM statements are now
processed with a vectorized insert and can be anywhere from %50
to 5x faster. Typical hardware and schemas should see a 2x improvement.
Vectorized inserts are only used for COPY statements and are not yet
applied to regular inserts. Both the vectorize and copy_fast_path_enabled
session variables can be used to disable this feature.
  • Loading branch information
cucaroach committed Mar 17, 2023
1 parent 28571fa commit b679155
Show file tree
Hide file tree
Showing 34 changed files with 1,060 additions and 121 deletions.
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
distSQLServer.ServerConfig.SQLStatsController = pgServer.SQLServer.GetSQLStatsController()
distSQLServer.ServerConfig.SchemaTelemetryController = pgServer.SQLServer.GetSchemaTelemetryController()
distSQLServer.ServerConfig.IndexUsageStatsController = pgServer.SQLServer.GetIndexUsageStatsController()
distSQLServer.ServerConfig.StatsRefresher = statsRefresher

// We use one BytesMonitor for all Executor's created by the
// internalDB.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ go_library(
"//pkg/cloud/externalconn",
"//pkg/clusterversion",
"//pkg/col/coldata",
"//pkg/col/coldataext",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/docs",
Expand Down Expand Up @@ -378,6 +379,7 @@ go_library(
"//pkg/sql/colexec",
"//pkg/sql/colfetcher",
"//pkg/sql/colflow",
"//pkg/sql/colmem",
"//pkg/sql/compengine",
"//pkg/sql/comprules",
"//pkg/sql/contention",
Expand Down Expand Up @@ -551,6 +553,7 @@ go_library(
"@com_github_cockroachdb_errors//hintdetail",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_lib_pq//:pq",
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"count.go",
"hash_aggregator.go",
"hash_group_joiner.go",
"insert.go",
"invariants_checker.go",
"limit.go",
"materializer.go",
Expand Down Expand Up @@ -43,12 +44,16 @@ go_library(
"//pkg/col/coldata",
"//pkg/col/coldataext", # keep
"//pkg/col/typeconv", # keep
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/server/telemetry", # keep
"//pkg/settings",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catenumpb",
"//pkg/sql/catalog/colinfo", # keep
"//pkg/sql/catalog/descpb",
"//pkg/sql/colconv",
"//pkg/sql/colenc",
"//pkg/sql/colexec/colexecagg", # keep
"//pkg/sql/colexec/colexecargs",
"//pkg/sql/colexec/colexecbase",
Expand All @@ -64,7 +69,9 @@ go_library(
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
"//pkg/sql/memsize",
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sqltelemetry", # keep
Expand All @@ -73,7 +80,9 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/intsets",
"//pkg/util/json", # keep
"//pkg/util/log",
"//pkg/util/stringarena",
"//pkg/util/tracing",
"@com_github_cockroachdb_apd_v3//:apd", # keep
Expand Down
31 changes: 26 additions & 5 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ func supportedNatively(core *execinfrapb.ProcessorCoreUnion) error {
// distinguish from other unsupported cores.
return errLocalPlanNodeWrap

case core.Insert != nil:
return nil

default:
return errCoreUnsupportedNatively
}
Expand Down Expand Up @@ -818,11 +821,20 @@ func NewColOperator(
return r, err
}
if core.Values.NumRows == 0 || len(core.Values.Columns) == 0 {
// To simplify valuesOp we handle some special cases with
// fixedNumTuplesNoInputOp.
result.Root = colexecutils.NewFixedNumTuplesNoInputOp(
getStreamingAllocator(ctx, args), int(core.Values.NumRows), nil, /* opToInitialize */
)
// Handle coldata.Batch vector source.
if b, ok := args.LocalVectorSources[args.Spec.ProcessorID]; ok {
batch, ok := b.(coldata.Batch)
if !ok {
colexecerror.InternalError(errors.AssertionFailedf("LocalVectorSource wasn't a coldata.Batch"))
}
result.Root = colexecutils.NewRawColDataBatchOp(batch)
} else {
// To simplify valuesOp we handle some special cases with
// fixedNumTuplesNoInputOp.
result.Root = colexecutils.NewFixedNumTuplesNoInputOp(
getStreamingAllocator(ctx, args), int(core.Values.NumRows), nil, /* opToInitialize */
)
}
} else {
result.Root = colexec.NewValuesOp(
getStreamingAllocator(ctx, args), core.Values, execinfra.GetWorkMemLimit(flowCtx),
Expand Down Expand Up @@ -1728,6 +1740,15 @@ func NewColOperator(
input = result.Root
}

case core.Insert != nil:
if err := checkNumIn(inputs, 1); err != nil {
return r, err
}
outputIdx := len(spec.Input[0].ColumnTypes)
result.Root = colexec.NewInsertOp(ctx, flowCtx, core.Insert, inputs[0].Root, spec.ResultTypes, outputIdx,
getStreamingAllocator(ctx, args), args.ExprHelper.SemaCtx)
result.ColumnTypes = spec.ResultTypes

default:
return r, errors.AssertionFailedf("unsupported processor core %q", core)
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@ type NewColOperatorArgs struct {
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
LocalProcessors []execinfra.LocalProcessor
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Factory coldata.ColumnFactory
MonitorRegistry *MonitorRegistry
TypeResolver *descs.DistSQLTypeResolver
TestingKnobs struct {
// any is actually a coldata.Batch, see physicalplan.PhysicalInfrastructure comments.
LocalVectorSources map[int32]any
DiskQueueCfg colcontainer.DiskQueueCfg
FDSemaphore semaphore.Semaphore
ExprHelper *ExprHelper
Factory coldata.ColumnFactory
MonitorRegistry *MonitorRegistry
TypeResolver *descs.DistSQLTypeResolver
TestingKnobs struct {
// SpillingCallbackFn will be called when the spilling from an in-memory
// to disk-backed operator occurs. It should only be set in tests.
SpillingCallbackFn func()
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/colexec/colexecutils/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,28 @@ func (s *fixedNumTuplesNoInputOp) Next() coldata.Batch {
return s.batch
}

type rawBatchOp struct {
colexecop.ZeroInputNode
batch coldata.Batch
}

var _ colexecop.Operator = &rawBatchOp{}

func (s *rawBatchOp) Init(ctx context.Context) {
}

func (s *rawBatchOp) Next() coldata.Batch {
b := s.batch
s.batch = coldata.ZeroBatch
return b
}

// NewRawColDataBatchOp allocates a rawBatchOp. This is used by COPY to perform
// vectorized inserts.
func NewRawColDataBatchOp(b coldata.Batch) colexecop.Operator {
return &rawBatchOp{batch: b}
}

// vectorTypeEnforcer is a utility Operator that on every call to Next
// enforces that non-zero length batch from the input has a vector of the
// desired type in the desired position. If the width of the batch is less than
Expand Down
Loading

0 comments on commit b679155

Please sign in to comment.