Skip to content

Commit

Permalink
Merge pull request cockroachdb#91555 from postamar/backport22.1-91411
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius Posta authored Nov 9, 2022
2 parents 4a4d911 + a637dc5 commit 36052be
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 56 deletions.
3 changes: 2 additions & 1 deletion pkg/cli/declarative_corpus.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package cli

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/cli/clierrorplus"
Expand Down Expand Up @@ -50,7 +51,7 @@ a given corpus file.
return jobID
},
}
_, err := scplan.MakePlan(*state, params)
_, err := scplan.MakePlan(context.Background(), *state, params)
if err != nil {
fmt.Printf("failed to validate %s with error %v\n", name, err)
} else {
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/explain_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func (n *explainDDLNode) startExec(params runParams) error {
return explainNotPossibleError
}
}
return n.setExplainValues(scNode.plannedState)
return n.setExplainValues(params.ctx, scNode.plannedState)
}

func (n *explainDDLNode) setExplainValues(scState scpb.CurrentState) (err error) {
func (n *explainDDLNode) setExplainValues(
ctx context.Context, scState scpb.CurrentState,
) (err error) {
defer func() {
err = errors.WithAssertionFailure(err)
}()
var p scplan.Plan
p, err = scplan.MakePlan(scState, scplan.Params{
p, err = scplan.MakePlan(ctx, scState, scplan.Params{
ExecutionPhase: scop.StatementPhase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 },
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/schemachanger/corpus/corpus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package corpus_test

import (
"context"
"flag"
"testing"

Expand Down Expand Up @@ -42,7 +43,7 @@ func TestValidateCorpuses(t *testing.T) {
jobID := jobspb.InvalidJobID
name, state := reader.GetCorpus(corpusIdx)
t.Run(name, func(t *testing.T) {
_, err := scplan.MakePlan(*state, scplan.Params{
_, err := scplan.MakePlan(context.Background(), *state, scplan.Params{
ExecutionPhase: scop.LatestPhase,
InRollback: state.InRollback,
SchemaChangerJobIDSupplier: func() jobspb.JobID {
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/schemachanger/scbuild/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"//pkg/util/log",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
25 changes: 6 additions & 19 deletions pkg/sql/schemachanger/scbuild/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// Build constructs a new state from an initial state and a statement.
Expand All @@ -36,13 +34,11 @@ import (
func Build(
ctx context.Context, dependencies Dependencies, initial scpb.CurrentState, n tree.Statement,
) (_ scpb.CurrentState, err error) {
start := timeutil.Now()
defer func() {
if err != nil || !log.ExpensiveLogEnabled(ctx, 2) {
return
}
log.Infof(ctx, "build for %s took %v", n.StatementTag(), timeutil.Since(start))
}()
defer scerrors.StartEventf(
ctx,
"building declarative schema change targets for %s",
redact.Safe(n.StatementTag()),
).HandlePanicAndLogError(ctx, &err)
initial = initial.DeepCopy()
bs := newBuilderState(ctx, dependencies, initial)
els := newEventLogState(dependencies, initial, n)
Expand All @@ -61,15 +57,6 @@ func Build(
TreeAnnotator: an,
SchemaFeatureChecker: dependencies.FeatureChecker(),
}
defer func() {
if recErr := recover(); recErr != nil {
if errObj, ok := recErr.(error); ok {
err = errObj
} else {
err = errors.Errorf("unexpected error encountered while building schema change plan %s", recErr)
}
}
}()
scbuildstmt.Process(b, an.GetStatement())
an.ValidateAnnotations()
els.statements[len(els.statements)-1].RedactedStatement =
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/scdeps/sctestutils/sctestutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func ProtoDiff(a, b protoutil.Message, args DiffArgs) string {

// MakePlan is a convenient alternative to calling scplan.MakePlan in tests.
func MakePlan(t *testing.T, state scpb.CurrentState, phase scop.Phase) scplan.Plan {
plan, err := scplan.MakePlan(state, scplan.Params{
plan, err := scplan.MakePlan(context.Background(), state, scplan.Params{
ExecutionPhase: phase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 },
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/schemachanger/scerrors/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ go_library(
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sem/tree",
"//pkg/util/log",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)
58 changes: 58 additions & 0 deletions pkg/sql/schemachanger/scerrors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,73 @@
package scerrors

import (
"context"
"fmt"
"runtime"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// EventLogger is a convenience object used for logging schema changer events.
type EventLogger struct {
msg redact.SafeValue
start time.Time
}

// StartEventf logs the start of a schema change event, and returns an object
// with a HandlePanicAndLogError method to handle panics and log errors at the
// end of the event.
//
// Typical usage is along the lines of:
// - defer StartEventf(...).HandlePanicAndLogError(...)
func StartEventf(ctx context.Context, format string, args ...interface{}) EventLogger {
msg := redact.Safe(fmt.Sprintf(format, args...))
log.InfofDepth(ctx, 1, "%s", msg)
return EventLogger{
msg: msg,
start: timeutil.Now(),
}
}

// HandlePanicAndLogError handles panics by recovering them in an error,
// which it then also logs. See also StartEventf.
func (el EventLogger) HandlePanicAndLogError(ctx context.Context, err *error) {
switch recErr := recover().(type) {
case nil:
// No panicked error.
case runtime.Error:
*err = errors.WithAssertionFailure(recErr)
case error:
*err = recErr
default:
*err = errors.AssertionFailedf("recovered from uncategorizable panic: %v", recErr)
}
if *err == nil {
*err = ctx.Err()
}
if errors.Is(*err, context.Canceled) {
return
}
if *err == nil {
if log.ExpensiveLogEnabled(ctx, 2) {
log.InfofDepth(ctx, 1, "done %s in %s", el.msg, redact.Safe(timeutil.Since(el.start)))
}
return
}
log.WarningfDepth(ctx, 1, "failed %s with error: %v", el.msg, *err)
if errors.HasAssertionFailure(*err) {
*err = errors.Wrapf(*err, "%s", el.msg)
}
}

type notImplementedError struct {
n tree.NodeFormatter
detail string
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/schemachanger/scpb/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package scpb

import (
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -69,6 +70,18 @@ func (s *CurrentState) Rollback() {
s.InRollback = true
}

// StatementTags returns the concatenated statement tags in the current state.
func (s CurrentState) StatementTags() string {
var sb strings.Builder
for i, stmt := range s.Statements {
if i > 0 {
sb.WriteString("; ")
}
sb.WriteString(stmt.StatementTag)
}
return sb.String()
}

// NumStatus is the number of values which Status may take on.
var NumStatus = len(Status_name)

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/schemachanger/scplan/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs/jobspb",
"//pkg/sql/schemachanger/scerrors",
"//pkg/sql/schemachanger/scop",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/scplan/internal/opgen",
Expand All @@ -23,6 +24,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/treeprinter",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@in_gopkg_yaml_v2//:yaml_v2",
],
)
Expand Down
22 changes: 10 additions & 12 deletions pkg/sql/schemachanger/scplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan/internal/opgen"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// Params holds the arguments for planning.
Expand Down Expand Up @@ -76,22 +78,18 @@ func (p Plan) StagesForCurrentPhase() []scstage.Stage {
// the initial state for a set of targets.
// Returns an error when planning fails. It is up to the caller to wrap this
// error as an assertion failure and with useful debug information details.
func MakePlan(initial scpb.CurrentState, params Params) (p Plan, err error) {
func MakePlan(ctx context.Context, initial scpb.CurrentState, params Params) (p Plan, err error) {
defer scerrors.StartEventf(
ctx,
"building declarative schema changer plan in %s (rollback=%v) for %s",
redact.Safe(params.ExecutionPhase),
redact.Safe(params.InRollback),
redact.Safe(initial.StatementTags()),
).HandlePanicAndLogError(ctx, &err)
p = Plan{
CurrentState: initial,
Params: params,
}
defer func() {
if r := recover(); r != nil {
rAsErr, ok := r.(error)
if !ok {
rAsErr = errors.Errorf("panic during MakePlan: %v", r)
}
err = p.DecorateErrorWithPlanDetails(rAsErr)
}
err = errors.WithAssertionFailure(err)
}()

{
start := timeutil.Now()
p.Graph = buildGraph(p.CurrentState)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scrun/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ go_library(
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/catalog/descpb",
"//pkg/sql/schemachanger/scerrors",
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/schemachanger/scop",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/scplan",
"//pkg/util/log",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)
33 changes: 18 additions & 15 deletions pkg/sql/schemachanger/scrun/scrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scerrors"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// RunStatementPhase executes in-transaction schema changes for the targeted
Expand Down Expand Up @@ -57,7 +57,7 @@ func runTransactionPhase(
if len(state.Current) == 0 {
return scpb.CurrentState{}, jobspb.InvalidJobID, nil
}
sc, err := scplan.MakePlan(state, scplan.Params{
sc, err := scplan.MakePlan(ctx, state, scplan.Params{
ExecutionPhase: phase,
SchemaChangerJobIDSupplier: deps.TransactionalJobRegistry().SchemaChangerJobID,
})
Expand Down Expand Up @@ -93,7 +93,7 @@ func RunSchemaChangesInJob(
if err != nil {
return errors.Wrapf(err, "failed to construct state for job %d", jobID)
}
sc, err := scplan.MakePlan(state, scplan.Params{
sc, err := scplan.MakePlan(ctx, state, scplan.Params{
ExecutionPhase: scop.PostCommitPhase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return jobID },
})
Expand Down Expand Up @@ -136,20 +136,18 @@ func executeStage(
stageIdx int,
stage scplan.Stage,
) (err error) {
defer scerrors.StartEventf(
ctx,
"executing declarative schema change %s (rollback=%v) for %s",
redact.Safe(stage),
redact.Safe(p.InRollback),
redact.Safe(p.StatementTags()),
).HandlePanicAndLogError(ctx, &err)
if knobs != nil && knobs.BeforeStage != nil {
if err := knobs.BeforeStage(p, stageIdx); err != nil {
return err
}
}

log.Infof(ctx, "executing %s (rollback=%v)", stage, p.InRollback)
start := timeutil.Now()
defer func() {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "executing %s (rollback=%v) took %v: err = %v",
stage, p.InRollback, timeutil.Since(start), err)
}
}()
if err := scexec.ExecuteStage(ctx, deps, stage.Ops()); err != nil {
// Don't go through the effort to wrap the error if it's a retry or it's a
// cancelation.
Expand All @@ -170,7 +168,12 @@ func executeStage(

func makeState(
ctx context.Context, deps JobRunDependencies, descriptorIDs []descpb.ID, rollback bool,
) (scpb.CurrentState, error) {
) (state scpb.CurrentState, err error) {
defer scerrors.StartEventf(
ctx,
"rebuilding declarative schema change state from descriptors %v",
redact.Safe(descriptorIDs),
).HandlePanicAndLogError(ctx, &err)
var descriptorStates []*scpb.DescriptorState
if err := deps.WithTxnInJob(ctx, func(ctx context.Context, txnDeps scexec.Dependencies) error {
descriptorStates = nil
Expand Down Expand Up @@ -198,7 +201,7 @@ func makeState(
}); err != nil {
return scpb.CurrentState{}, err
}
state, err := scpb.MakeCurrentStateFromDescriptors(descriptorStates)
state, err = scpb.MakeCurrentStateFromDescriptors(descriptorStates)
if err != nil {
return scpb.CurrentState{}, err
}
Expand Down

0 comments on commit 36052be

Please sign in to comment.