diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index bb89630ab114..14cc9ca9cba5 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -46,6 +46,7 @@ type operationGeneratorParams struct { enumPct int rng *rand.Rand ops *deck + declarativeOps *deck maxSourceTables int sequenceOwnedByPct int fkParentInvalidPct int @@ -74,6 +75,9 @@ type operationGenerator struct { // opGenLog log of statement used to generate the current statement. opGenLog []interface{} + + //useDeclarativeSchemaChanger indices if the declarative schema changer is used. + useDeclarativeSchemaChanger bool } // OpGenLogQuery a query with a single value result. @@ -131,8 +135,9 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato } // Reset internal state used per operation within a transaction -func (og *operationGenerator) resetOpState() { +func (og *operationGenerator) resetOpState(useDeclarativeSchemaChanger bool) { og.candidateExpectedCommitErrors.reset() + og.useDeclarativeSchemaChanger = useDeclarativeSchemaChanger } // Reset internal state used per transaction @@ -282,6 +287,45 @@ var opWeights = []int{ validate: 2, // validate twice more often } +var opDeclarative = []bool{ + addColumn: true, + addConstraint: false, + addForeignKeyConstraint: true, + addRegion: false, + addUniqueConstraint: true, + alterTableLocality: false, + createIndex: true, + createSequence: false, + createTable: false, + createTableAs: false, + createView: false, + createEnum: false, + createSchema: false, + dropColumn: true, + dropColumnDefault: true, + dropColumnNotNull: true, + dropColumnStored: true, + dropConstraint: true, + dropIndex: true, + dropSequence: true, + dropTable: true, + dropView: true, + dropSchema: true, + primaryRegion: false, + renameColumn: false, + renameIndex: false, + renameSequence: false, + renameTable: false, + renameView: false, + setColumnDefault: false, + setColumnNotNull: false, + setColumnType: false, + survive: false, + insertRow: false, + selectStmt: false, + validate: false, +} + // adjustOpWeightsForActiveVersion adjusts the weights for the active cockroach // version, allowing us to disable certain operations in mixed version scenarios. func adjustOpWeightsForCockroachVersion( @@ -311,10 +355,18 @@ func adjustOpWeightsForCockroachVersion( // change constructed. Constructing a random schema change may require a few // stochastic attempts and if verbosity is >= 2 the unsuccessful attempts are // recorded in `log` to help with debugging of the workload. -func (og *operationGenerator) randOp(ctx context.Context, tx pgx.Tx) (stmt *opStmt, err error) { +func (og *operationGenerator) randOp( + ctx context.Context, tx pgx.Tx, useDeclarativeSchemaChanger bool, +) (stmt *opStmt, err error) { for { - op := opType(og.params.ops.Int()) - og.resetOpState() + var op opType + // The declarative schema changer has a more limited deck of operations. + if useDeclarativeSchemaChanger { + op = opType(og.params.declarativeOps.Int()) + } else { + op = opType(og.params.ops.Int()) + } + og.resetOpState(useDeclarativeSchemaChanger) stmt, err = opFuncs[op](og, ctx, tx) if err != nil { if errors.Is(err, pgx.ErrNoRows) { @@ -2657,13 +2709,14 @@ func makeOpStmt(queryType opStmtType) *opStmt { // ErrorState wraps schemachange workload errors to have state information for // the purpose of dumping in our JSON log. type ErrorState struct { - cause error - ExpectedErrors []string `json:"expectedErrors,omitempty"` - PotentialErrors []string `json:"potentialErrors,omitempty"` - ExpectedCommitErrors []string `json:"expectedCommitErrors,omitempty"` - PotentialCommitErrors []string `json:"potentialCommitErrors,omitempty"` - QueriesForGeneratingErrors []interface{} `json:"queriesForGeneratingErrors,omitempty"` - PreviousStatements []string `json:"previousStatements,omitempty"` + cause error + ExpectedErrors []string `json:"expectedErrors,omitempty"` + PotentialErrors []string `json:"potentialErrors,omitempty"` + ExpectedCommitErrors []string `json:"expectedCommitErrors,omitempty"` + PotentialCommitErrors []string `json:"potentialCommitErrors,omitempty"` + QueriesForGeneratingErrors []interface{} `json:"queriesForGeneratingErrors,omitempty"` + PreviousStatements []string `json:"previousStatements,omitempty"` + UsesDeclarativeSchemaChanger bool `json:"usesDeclarativeSchemaChanger,omitempty"` } func (es *ErrorState) Unwrap() error { @@ -2681,13 +2734,14 @@ func (og *operationGenerator) WrapWithErrorState(err error, op *opStmt) error { previousStmts = append(previousStmts, stmt.sql) } return &ErrorState{ - cause: err, - ExpectedErrors: op.expectedExecErrors.StringSlice(), - PotentialErrors: op.potentialExecErrors.StringSlice(), - ExpectedCommitErrors: og.expectedCommitErrors.StringSlice(), - PotentialCommitErrors: og.potentialCommitErrors.StringSlice(), - QueriesForGeneratingErrors: og.GetOpGenLog(), - PreviousStatements: previousStmts, + cause: err, + ExpectedErrors: op.expectedExecErrors.StringSlice(), + PotentialErrors: op.potentialExecErrors.StringSlice(), + ExpectedCommitErrors: og.expectedCommitErrors.StringSlice(), + PotentialCommitErrors: og.potentialCommitErrors.StringSlice(), + QueriesForGeneratingErrors: og.GetOpGenLog(), + PreviousStatements: previousStmts, + UsesDeclarativeSchemaChanger: og.useDeclarativeSchemaChanger, } } @@ -2715,6 +2769,15 @@ func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenera if pgcode.MakeCode(pgErr.Code) == pgcode.SerializationFailure { return err } + // TODO(fqazi): For the short term we are going to ignore any not implemented, + // errors in the declarative schema changer. Supported operations have edge + // cases, but later we should mark some of these are fully supported. + if og.useDeclarativeSchemaChanger && pgcode.MakeCode(pgErr.Code) == pgcode.Uncategorized && + strings.Contains(pgErr.Message, " not implemented in the new schema changer") { + return errors.Mark(errors.Wrap(err, "ROLLBACK; Ignoring declarative schema changer not implemented error."), + errRunInTxnRbkSentinel, + ) + } if !s.expectedExecErrors.contains(pgcode.MakeCode(pgErr.Code)) && !s.potentialExecErrors.contains(pgcode.MakeCode(pgErr.Code)) { return errors.Mark( diff --git a/pkg/workload/schemachange/schemachange.go b/pkg/workload/schemachange/schemachange.go index 5c8c9e078752..dfd6046ecd0b 100644 --- a/pkg/workload/schemachange/schemachange.go +++ b/pkg/workload/schemachange/schemachange.go @@ -54,32 +54,36 @@ import ( //For example, an attempt to do something we don't support should be swallowed (though if we can detect that maybe we should just not do it, e.g). It will be hard to use this test for anything more than liveness detection until we go through the tedious process of classifying errors.: const ( - defaultMaxOpsPerWorker = 5 - defaultErrorRate = 10 - defaultEnumPct = 10 - defaultMaxSourceTables = 3 - defaultSequenceOwnedByPct = 25 - defaultFkParentInvalidPct = 5 - defaultFkChildInvalidPct = 5 + defaultMaxOpsPerWorker = 5 + defaultErrorRate = 10 + defaultEnumPct = 10 + defaultMaxSourceTables = 3 + defaultSequenceOwnedByPct = 25 + defaultFkParentInvalidPct = 5 + defaultFkChildInvalidPct = 5 + defaultDeclarativeSchemaChangerPct = 25 + defaultDeclarativeSchemaMaxStmtsPerTxn = 1 ) type schemaChange struct { - flags workload.Flags - dbOverride string - concurrency int - maxOpsPerWorker int - errorRate int - enumPct int - verbose int - dryRun bool - maxSourceTables int - sequenceOwnedByPct int - logFilePath string - logFile *os.File - dumpLogsOnce *sync.Once - workers []*schemaChangeWorker - fkParentInvalidPct int - fkChildInvalidPct int + flags workload.Flags + dbOverride string + concurrency int + maxOpsPerWorker int + errorRate int + enumPct int + verbose int + dryRun bool + maxSourceTables int + sequenceOwnedByPct int + logFilePath string + logFile *os.File + dumpLogsOnce *sync.Once + workers []*schemaChangeWorker + fkParentInvalidPct int + fkChildInvalidPct int + declarativeSchemaChangerPct int + declarativeSchemaMaxStmtsPerTxn int } var schemaChangeMeta = workload.Meta{ @@ -111,6 +115,13 @@ var schemaChangeMeta = workload.Meta{ `Percentage of times to choose an invalid parent column in a fk constraint.`) s.flags.IntVar(&s.fkChildInvalidPct, `fk-child-invalid-pct`, defaultFkChildInvalidPct, `Percentage of times to choose an invalid child column in a fk constraint.`) + s.flags.IntVar(&s.declarativeSchemaChangerPct, `declarative-schema-changer-pct`, + defaultDeclarativeSchemaChangerPct, + `Percentage of the declarative schema changer is used.`) + s.flags.IntVar(&s.declarativeSchemaMaxStmtsPerTxn, `declarative-schema-changer-stmt-per-txn`, + defaultDeclarativeSchemaMaxStmtsPerTxn, + `Number of statements per-txn used by the declarative schema changer.`) + return s }, } @@ -168,6 +179,17 @@ func (s *schemaChange) Ops( return workload.QueryLoad{}, err } ops := newDeck(rand.New(rand.NewSource(timeutil.Now().UnixNano())), opWeights...) + // A separate deck is constructed of only schema changes supported + // by the declarative schema changer. This deck has equal weights, + // only for supported schema changes. + declarativeOpWeights := make([]int, len(opWeights)) + for idx, weight := range opWeights { + if opDeclarative[idx] { + declarativeOpWeights[idx] = weight + } + } + declarativeOps := newDeck(rand.New(rand.NewSource(timeutil.Now().UnixNano())), declarativeOpWeights...) + ql := workload.QueryLoad{SQLDatabase: sqlDatabase} stdoutLog := makeAtomicLog(os.Stdout) @@ -190,6 +212,7 @@ func (s *schemaChange) Ops( enumPct: s.enumPct, rng: rand.New(rand.NewSource(timeutil.Now().UnixNano())), ops: ops, + declarativeOps: declarativeOps, maxSourceTables: s.maxSourceTables, sequenceOwnedByPct: s.sequenceOwnedByPct, fkParentInvalidPct: s.fkParentInvalidPct, @@ -326,10 +349,15 @@ func (w *schemaChangeWorker) WrapWithErrorState(err error) error { } } -func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error { +func (w *schemaChangeWorker) runInTxn( + ctx context.Context, tx pgx.Tx, useDeclarativeSchemaChanger bool, +) error { w.logger.startLog(w.id) w.logger.writeLog("BEGIN") opsNum := 1 + w.opGen.randIntn(w.maxOpsPerWorker) + if useDeclarativeSchemaChanger && opsNum > w.workload.declarativeSchemaMaxStmtsPerTxn { + opsNum = w.workload.declarativeSchemaMaxStmtsPerTxn + } for i := 0; i < opsNum; i++ { // Terminating this loop early if there are expected commit errors prevents unexpected commit behavior from being @@ -343,7 +371,7 @@ func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error { break } - op, err := w.opGen.randOp(ctx, tx) + op, err := w.opGen.randOp(ctx, tx, useDeclarativeSchemaChanger) if pgErr := new(pgconn.PgError); errors.As(err, &pgErr) && pgcode.MakeCode(pgErr.Code) == pgcode.SerializationFailure { return errors.Mark(err, errRunInTxnRbkSentinel) @@ -391,7 +419,22 @@ func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error { } func (w *schemaChangeWorker) run(ctx context.Context) error { - tx, err := w.pool.Get().Begin(ctx) + conn, err := w.pool.Get().Acquire(ctx) + defer conn.Release() + if err != nil { + return errors.Wrap(err, "cannot get a connection") + } + useDeclarativeSchemaChanger := w.opGen.randIntn(100) > w.workload.declarativeSchemaChangerPct + if useDeclarativeSchemaChanger { + if _, err := conn.Exec(ctx, "SET use_declarative_schema_changer='unsafe_always';"); err != nil { + return err + } + } else { + if _, err := conn.Exec(ctx, "SET use_declarative_schema_changer='off';"); err != nil { + return err + } + } + tx, err := conn.Begin(ctx) if err != nil { return errors.Wrap(err, "cannot get a connection and begin a txn") } @@ -407,7 +450,7 @@ func (w *schemaChangeWorker) run(ctx context.Context) error { defer watchDog.Stop() start := timeutil.Now() w.opGen.resetTxnState() - err = w.runInTxn(ctx, tx) + err = w.runInTxn(ctx, tx, useDeclarativeSchemaChanger) if err != nil { // Rollback in all cases to release the txn object and its conn pool. Wrap the original