Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql/rowexec: subject column backfills to admission control #79115

Merged
merged 2 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ go_library(
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v3//:apd",
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,10 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// from recoverable internal errors, and is automatically committed
// otherwise. The retryable function should have no side effects which could
// cause problems in the event it must be run more than once.
//
// This transaction will not be subject to admission control. To enable this,
// use TxnWithAdmissionControl.
//
// For example:
// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// if kv, err := txn.Get(ctx, key); err != nil {
Expand All @@ -877,19 +881,32 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// // ...
// return nil
// })
//
// Note that once the transaction encounters a retryable error, the txn object
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
return db.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_OTHER, admission.NormalPri, retryable)
}

// TxnWithAdmissionControl is like Txn, but uses a configurable admission
// control source and priority.
func (db *DB) TxnWithAdmissionControl(
ctx context.Context,
source roachpb.AdmissionHeader_Source,
priority admission.WorkPriority,
retryable func(context.Context, *Txn) error,
) error {
// TODO(radu): we should open a tracing Span here (we need to figure out how
// to use the correct tracer).

// Observed timestamps don't work with multi-tenancy. See:
//
// https://github.com/cockroachdb/cockroach/issues/48008
nodeID, _ := db.ctx.NodeID.OptionalNodeID() // zero if not available
txn := NewTxn(ctx, db, nodeID)
txn := NewTxnWithAdmissionControl(ctx, db, nodeID, source, priority)
txn.SetDebugName("unnamed")
return runTxn(ctx, txn, retryable)
}
Expand Down
40 changes: 24 additions & 16 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -119,6 +118,19 @@ type Txn struct {
//
// See also db.NewTxn().
func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
return NewTxnWithAdmissionControl(
ctx, db, gatewayNodeID, roachpb.AdmissionHeader_OTHER, admission.NormalPri)
}

// NewTxnWithAdmissionControl creates a new transaction with the specified
// admission control source and priority. See NewTxn() for details.
func NewTxnWithAdmissionControl(
ctx context.Context,
db *DB,
gatewayNodeID roachpb.NodeID,
source roachpb.AdmissionHeader_Source,
priority admission.WorkPriority,
) *Txn {
if db == nil {
panic(errors.WithContextTags(
errors.AssertionFailedf("attempting to create txn with nil db"), ctx))
Expand All @@ -133,8 +145,13 @@ func NewTxn(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
db.clock.MaxOffset().Nanoseconds(),
int32(db.ctx.NodeID.SQLInstanceID()),
)

return NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn)
txn := NewTxnFromProto(ctx, db, gatewayNodeID, now, RootTxn, &kvTxn)
txn.admissionHeader = roachpb.AdmissionHeader{
CreateTime: db.clock.PhysicalNow(),
Priority: int32(priority),
Source: source,
}
return txn
}

// NewTxnWithSteppingEnabled is like NewTxn but suitable for use by SQL. Note
Expand All @@ -148,12 +165,8 @@ func NewTxnWithSteppingEnabled(
gatewayNodeID roachpb.NodeID,
qualityOfService sessiondatapb.QoSLevel,
) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
txn.admissionHeader = roachpb.AdmissionHeader{
Priority: int32(qualityOfService),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
}
txn := NewTxnWithAdmissionControl(ctx, db, gatewayNodeID,
roachpb.AdmissionHeader_FROM_SQL, admission.WorkPriority(qualityOfService))
_ = txn.ConfigureStepping(ctx, SteppingEnabled)
return txn
}
Expand All @@ -165,13 +178,8 @@ func NewTxnWithSteppingEnabled(
// transaction to undergo admission control. See AdmissionHeader_Source for more
// details.
func NewTxnRootKV(ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID) *Txn {
txn := NewTxn(ctx, db, gatewayNodeID)
txn.admissionHeader = roachpb.AdmissionHeader{
Priority: int32(admission.NormalPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_ROOT_KV,
}
return txn
return NewTxnWithAdmissionControl(
ctx, db, gatewayNodeID, roachpb.AdmissionHeader_ROOT_KV, admission.NormalPri)
}

// NewTxnFromProto is like NewTxn but assumes the Transaction object is already initialized.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/sql/stats",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/cancelchecker",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
55 changes: 29 additions & 26 deletions pkg/sql/rowexec/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand Down Expand Up @@ -105,34 +106,36 @@ func (cb *columnBackfiller) runChunk(
) (roachpb.Key, error) {
var key roachpb.Key
var commitWaitFn func(context.Context) error
err := cb.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk != nil {
if err := cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk(sp); err != nil {
return err
err := cb.flowCtx.Cfg.DB.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_FROM_SQL, admission.NormalPri,
func(ctx context.Context, txn *kv.Txn) error {
if cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk != nil {
if err := cb.flowCtx.Cfg.TestingKnobs.RunBeforeBackfillChunk(sp); err != nil {
return err
}
}
if cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk != nil {
defer cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk()
}
}
if cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk != nil {
defer cb.flowCtx.Cfg.TestingKnobs.RunAfterBackfillChunk()
}

// Defer the commit-wait operation so that we can coalesce this wait
// across all batches. This dramatically reduces the total time we spend
// waiting for consistency when backfilling a column on GLOBAL tables.
commitWaitFn = txn.DeferCommitWait(ctx)

// TODO(knz): do KV tracing in DistSQL processors.
var err error
key, err = cb.RunColumnBackfillChunk(
ctx,
txn,
cb.desc,
sp,
chunkSize,
true, /*alsoCommit*/
false, /*traceKV*/
)
return err
})
// Defer the commit-wait operation so that we can coalesce this wait
// across all batches. This dramatically reduces the total time we spend
// waiting for consistency when backfilling a column on GLOBAL tables.
commitWaitFn = txn.DeferCommitWait(ctx)

// TODO(knz): do KV tracing in DistSQL processors.
var err error
key, err = cb.RunColumnBackfillChunk(
ctx,
txn,
cb.desc,
sp,
chunkSize,
true, /*alsoCommit*/
false, /*traceKV*/
)
return err
})
if err == nil {
cb.commitWaitFns = append(cb.commitWaitFns, commitWaitFn)
maxCommitWaitFns := int(backfillerMaxCommitWaitFns.Get(&cb.flowCtx.Cfg.Settings.SV))
Expand Down