Skip to content

Commit

Permalink
sql/catalog/descs: make it possible to use a Stepping Txn
Browse files Browse the repository at this point in the history
This is needed both to ensure that the transactions which
purposefully use the internal executor can avoid the halloween
problem and so that we can use a default QoS level for those
transaction.

Release justification: part of a bug fix

Release note: None
  • Loading branch information
ajwerner committed Aug 20, 2022
1 parent 9ad9feb commit f266aa2
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 3 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_library(
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlutil",
Expand All @@ -85,6 +86,7 @@ go_test(
"errors_test.go",
"helpers_test.go",
"main_test.go",
"txn_external_test.go",
"txn_with_executor_datadriven_test.go",
],
data = glob(["testdata/**"]),
Expand Down Expand Up @@ -127,6 +129,7 @@ go_test(
"//pkg/util/mon",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//assert",
Expand Down
57 changes: 54 additions & 3 deletions pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -43,14 +44,51 @@ func (cf *CollectionFactory) Txn(
ctx context.Context,
db *kv.DB,
f func(ctx context.Context, txn *kv.Txn, descriptors *Collection) error,
opts ...TxnOption,
) error {
return cf.TxnWithExecutor(ctx, db, nil /* sessionData */, func(
ctx context.Context, txn *kv.Txn, descriptors *Collection, _ sqlutil.InternalExecutor,
) error {
return f(ctx, txn, descriptors)
})
}, opts...)
}

// TxnOption is used to configure a Txn or TxnWithExecutor.
type TxnOption interface {
apply(*txnConfig)
}

type txnConfig struct {
steppingEnabled bool
}

type txnOptionFn func(options *txnConfig)

func (f txnOptionFn) apply(options *txnConfig) { f(options) }

var steppingEnabled = txnOptionFn(func(o *txnConfig) {
o.steppingEnabled = true
})

// SteppingEnabled creates a TxnOption to determine whether the underlying
// transaction should have stepping enabled. If stepping is enabled, the
// transaction will implicitly use lower admission priority. However, the
// user will need to remember to Step the Txn to make writes visible. The
// InternalExecutor will automatically (for better or for worse) step the
// transaction when executing each statement.
func SteppingEnabled() TxnOption {
return steppingEnabled
}

// TxnWithExecutorFunc is used to run a transaction in the context of a
// Collection and an InternalExecutor.
type TxnWithExecutorFunc = func(
ctx context.Context,
txn *kv.Txn,
descriptors *Collection,
ie sqlutil.InternalExecutor,
) error

// TxnWithExecutor enables callers to run transactions with a *Collection such that all
// retrieved immutable descriptors are properly leased and all mutable
// descriptors are handled. The function deals with verifying the two version
Expand All @@ -66,8 +104,21 @@ func (cf *CollectionFactory) TxnWithExecutor(
ctx context.Context,
db *kv.DB,
sd *sessiondata.SessionData,
f func(ctx context.Context, txn *kv.Txn, descriptors *Collection, ie sqlutil.InternalExecutor) error,
f TxnWithExecutorFunc,
opts ...TxnOption,
) error {
var config txnConfig
for _, opt := range opts {
opt.apply(&config)
}
run := db.Txn
if config.steppingEnabled {
type kvTxnFunc = func(context.Context, *kv.Txn) error
run = func(ctx context.Context, f kvTxnFunc) error {
return db.TxnWithSteppingEnabled(ctx, sessiondatapb.Normal, f)
}
}

// Waits for descriptors that were modified, skipping
// over ones that had their descriptor wiped.
waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error {
Expand Down Expand Up @@ -97,7 +148,7 @@ func (cf *CollectionFactory) TxnWithExecutor(
for {
var modifiedDescriptors []lease.IDVersion
var deletedDescs catalog.DescriptorIDSet
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := run(ctx, func(ctx context.Context, txn *kv.Txn) error {
modifiedDescriptors, deletedDescs = nil, catalog.DescriptorIDSet{}
descsCol := cf.NewCollection(
ctx, nil, /* temporarySchemaProvider */
Expand Down
71 changes: 71 additions & 0 deletions pkg/sql/catalog/descs/txn_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package descs_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestTxnWithStepping tests that if the user opts into stepping, they
// get stepping.
func TestTxnWithStepping(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

cf := s.CollectionFactory().(*descs.CollectionFactory)
scratchKey, err := s.ScratchRange()
require.NoError(t, err)
// Write a key, read in the transaction without stepping, ensure we
// do not see the value, step the transaction, then ensure that we do.
require.NoError(t, cf.Txn(ctx, kvDB, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
if err := txn.Put(ctx, scratchKey, 1); err != nil {
return err
}
{
got, err := txn.Get(ctx, scratchKey)
if err != nil {
return err
}
if got.Exists() {
return errors.AssertionFailedf("expected no value, got %v", got)
}
}
if err := txn.Step(ctx); err != nil {
return err
}
{
got, err := txn.Get(ctx, scratchKey)
if err != nil {
return err
}
if got.ValueInt() != 1 {
return errors.AssertionFailedf("expected 1, got %v", got)
}
}
return nil
}, descs.SteppingEnabled()))
}

0 comments on commit f266aa2

Please sign in to comment.