diff --git a/pkg/workload/schemachange/BUILD.bazel b/pkg/workload/schemachange/BUILD.bazel index 759a17b6e707..3a6da0648bde 100644 --- a/pkg/workload/schemachange/BUILD.bazel +++ b/pkg/workload/schemachange/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "operation_generator.go", "schemachange.go", "type_resolver.go", + "watch_dog.go", ":gen-optype-stringer", # keep ], importpath = "github.com/cockroachdb/cockroach/pkg/workload/schemachange", @@ -36,6 +37,7 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_jackc_pgconn//:pgconn", "@com_github_jackc_pgx_v4//:pgx", + "@com_github_jackc_pgx_v4//pgxpool", "@com_github_lib_pq//oid", "@com_github_spf13_pflag//:pflag", ], diff --git a/pkg/workload/schemachange/error_screening.go b/pkg/workload/schemachange/error_screening.go index 0c1972d131af..6292014bee12 100644 --- a/pkg/workload/schemachange/error_screening.go +++ b/pkg/workload/schemachange/error_screening.go @@ -326,8 +326,8 @@ GROUP BY name; for range constraints { constraintTuples = append(constraintTuples, make(map[string]struct{})) } - for _, row := range rows { + hasGenerationError := false // Put values to be inserted into a column name to value map to simplify lookups. columnsToValues := map[string]string{} for i := 0; i < len(columns); i++ { @@ -339,7 +339,32 @@ GROUP BY name; if !colInfo.generated { continue } + evalTxn, err := tx.Begin(ctx) + if err != nil { + return false, nil, err + } newCols[colInfo.name], err = og.generateColumn(ctx, tx, colInfo, columnsToValues) + if err != nil { + if rbkErr := evalTxn.Rollback(ctx); rbkErr != nil { + return false, nil, errors.WithSecondaryError(err, rbkErr) + } + var pgErr *pgconn.PgError + if !errors.As(err, &pgErr) { + return false, nil, err + } + // Only accept know error types for generated expressions. + if !isValidGenerationError(pgErr.Code) { + return false, nil, err + } + generatedCodes = append(generatedCodes, + codesWithConditions{ + {code: pgcode.MakeCode(pgErr.Code), condition: true}, + }..., + ) + hasGenerationError = true + continue + } + err = evalTxn.Commit(ctx) if err != nil { return false, nil, err } @@ -347,6 +372,10 @@ GROUP BY name; for k, v := range newCols { columnsToValues[k] = v } + // Skip over constraint validation, since we know an expression is bad here. + if hasGenerationError { + continue + } // Next validate the uniqueness of both constraints and index expressions. for constraintIdx, constraint := range constraints { nonTupleConstraint := constraint[0] diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index dd950beb9e2e..b7129af2108f 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -17,6 +17,7 @@ import ( "strconv" "strings" "sync/atomic" + "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachange" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgconn" @@ -52,7 +54,6 @@ type operationGeneratorParams struct { // The OperationBuilder has the sole responsibility of generating ops type operationGenerator struct { params *operationGeneratorParams - expectedExecErrors errorCodeSet potentialExecErrors errorCodeSet expectedCommitErrors errorCodeSet potentialCommitErrors errorCodeSet @@ -69,7 +70,7 @@ type operationGenerator struct { opsInTxn []opType // stmtsInTxn is a list of statements in the current transaction. - stmtsInTxt []string + stmtsInTxt []*opStmt // opGenLog log of statement used to generate the current statement. opGenLog strings.Builder @@ -101,7 +102,6 @@ func (og *operationGenerator) GetOpGenLog() string { func makeOperationGenerator(params *operationGeneratorParams) *operationGenerator { return &operationGenerator{ params: params, - expectedExecErrors: makeExpectedErrorSet(), expectedCommitErrors: makeExpectedErrorSet(), potentialExecErrors: makeExpectedErrorSet(), potentialCommitErrors: makeExpectedErrorSet(), @@ -111,7 +111,6 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato // Reset internal state used per operation within a transaction func (og *operationGenerator) resetOpState() { - og.expectedExecErrors.reset() og.candidateExpectedCommitErrors.reset() og.potentialExecErrors.reset() og.opGenLog = strings.Builder{} @@ -172,12 +171,14 @@ const ( insertRow // INSERT INTO () VALUES () + selectStmt // SELECT.. + validate // validate all table descriptors numOpTypes int = iota ) -var opFuncs = map[opType]func(*operationGenerator, context.Context, pgx.Tx) (string, error){ +var opFuncs = map[opType]func(*operationGenerator, context.Context, pgx.Tx) (*opStmt, error){ addColumn: (*operationGenerator).addColumn, addConstraint: (*operationGenerator).addConstraint, addForeignKeyConstraint: (*operationGenerator).addForeignKeyConstraint, @@ -212,6 +213,7 @@ var opFuncs = map[opType]func(*operationGenerator, context.Context, pgx.Tx) (str setColumnType: (*operationGenerator).setColumnType, survive: (*operationGenerator).survive, insertRow: (*operationGenerator).insertRow, + selectStmt: (*operationGenerator).selectStmt, validate: (*operationGenerator).validate, } @@ -257,7 +259,8 @@ var opWeights = []int{ setColumnType: 0, // Disabled and tracked with #66662. survive: 0, // Disabled and tracked with #83831 insertRow: 10, // Temporarily reduced because of #80820 - validate: 2, // validate twice more often + selectStmt: 10, + validate: 2, // validate twice more often } // adjustOpWeightsForActiveVersion adjusts the weights for the active cockroach @@ -289,7 +292,7 @@ func adjustOpWeightsForCockroachVersion( // change constructed. Constructing a random schema change may require a few // stochastic attempts and if verbosity is >= 2 the unsuccessful attempts are // recorded in `log` to help with debugging of the workload. -func (og *operationGenerator) randOp(ctx context.Context, tx pgx.Tx) (stmt string, err error) { +func (og *operationGenerator) randOp(ctx context.Context, tx pgx.Tx) (stmt *opStmt, err error) { for { op := opType(og.params.ops.Int()) og.resetOpState() @@ -303,7 +306,7 @@ func (og *operationGenerator) randOp(ctx context.Context, tx pgx.Tx) (stmt strin if errors.Is(err, ErrSchemaChangesDisallowedDueToPkSwap) { continue } - return "", err + return nil, err } // Screen for schema change after write in the same transaction. og.stmtsInTxt = append(og.stmtsInTxt, stmt) @@ -316,34 +319,35 @@ func (og *operationGenerator) randOp(ctx context.Context, tx pgx.Tx) (stmt strin return stmt, err } -func (og *operationGenerator) addColumn(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) addColumn(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s ADD COLUMN IrrelevantColumnName string`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN IrrelevantColumnName string`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(false)) if err != nil { - return "", err + return nil, err } typName, typ, err := og.randType(ctx, tx, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } def := &tree.ColumnTableDef{ @@ -354,11 +358,11 @@ func (og *operationGenerator) addColumn(ctx context.Context, tx pgx.Tx) (string, databaseHasRegionChange, err := og.databaseHasRegionChange(ctx, tx) if err != nil { - return "", err + return nil, err } tableIsRegionalByRow, err := og.tableIsRegionalByRow(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !(tableIsRegionalByRow && databaseHasRegionChange) && og.randIntn(10) == 0 { @@ -367,21 +371,21 @@ func (og *operationGenerator) addColumn(ctx context.Context, tx pgx.Tx) (string, columnExistsOnTable, err := og.columnExistsOnTable(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } var hasRows bool if tableExists { hasRows, err = og.tableHasRows(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } } hasAlterPKSchemaChange, err := og.tableHasOngoingAlterPKSchemaChanges(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } - + op := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.DuplicateColumn, condition: columnExistsOnTable}, {code: pgcode.UndefinedObject, condition: typ == nil}, @@ -392,132 +396,137 @@ func (og *operationGenerator) addColumn(ctx context.Context, tx pgx.Tx) (string, code: pgcode.FeatureNotSupported, condition: def.Unique.IsUnique && typ != nil && !colinfo.ColumnTypeIsIndexable(typ), }, - }.add(og.expectedExecErrors) - - return fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s`, tableName, tree.Serialize(def)), nil + }.add(op.expectedExecErrors) + op.sql = fmt.Sprintf(`ALTER TABLE %s ADD COLUMN %s`, tableName, tree.Serialize(def)) + return op, nil } -func (og *operationGenerator) addConstraint(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) addConstraint(ctx context.Context, tx pgx.Tx) (*opStmt, error) { // TODO(peter): unimplemented // - Export sqlbase.randColumnTableDef. - return "", nil + return nil, nil } -func (og *operationGenerator) addUniqueConstraint(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) addUniqueConstraint(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s ADD CONSTRAINT IrrelevantConstraintName UNIQUE (IrrelevantColumnName)`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ADD CONSTRAINT IrrelevantConstraintName UNIQUE (IrrelevantColumnName)`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnForConstraint, err := og.randColumnWithMeta(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } constaintName := fmt.Sprintf("%s_%s_unique", tableName.Object(), columnForConstraint.name) columnExistsOnTable, err := og.columnExistsOnTable(ctx, tx, tableName, columnForConstraint.name) if err != nil { - return "", err + return nil, err } constraintExists, err := og.constraintExists(ctx, tx, constaintName) if err != nil { - return "", err + return nil, err } canApplyConstraint := true if columnExistsOnTable { canApplyConstraint, err = og.canApplyUniqueConstraint(ctx, tx, tableName, []string{columnForConstraint.name}) if err != nil { - return "", err + return nil, err } } hasAlterPKSchemaChange, err := og.tableHasOngoingAlterPKSchemaChanges(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } databaseHasRegionChange, err := og.databaseHasRegionChange(ctx, tx) if err != nil { - return "", err + return nil, err } tableIsRegionalByRow, err := og.tableIsRegionalByRow(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } - + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.UndefinedColumn, condition: !columnExistsOnTable}, {code: pgcode.DuplicateObject, condition: constraintExists}, {code: pgcode.FeatureNotSupported, condition: columnExistsOnTable && !colinfo.ColumnTypeIsIndexable(columnForConstraint.typ)}, {pgcode.FeatureNotSupported, hasAlterPKSchemaChange}, {code: pgcode.ObjectNotInPrerequisiteState, condition: databaseHasRegionChange && tableIsRegionalByRow}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) if !canApplyConstraint { og.candidateExpectedCommitErrors.add(pgcode.UniqueViolation) } - return fmt.Sprintf(`ALTER TABLE %s ADD CONSTRAINT %s UNIQUE (%s)`, tableName, constaintName, columnForConstraint.name), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s ADD CONSTRAINT %s UNIQUE (%s)`, tableName, constaintName, columnForConstraint.name) + return stmt, nil } -func (og *operationGenerator) alterTableLocality(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) alterTableLocality(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s SET LOCALITY REGIONAL BY ROW`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s SET LOCALITY REGIONAL BY ROW`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } databaseRegionNames, err := og.getDatabaseRegionNames(ctx, tx) if err != nil { - return "", err + return nil, err } if len(databaseRegionNames) == 0 { - og.expectedExecErrors.add(pgcode.InvalidTableDefinition) - return fmt.Sprintf(`ALTER TABLE %s SET LOCALITY REGIONAL BY ROW`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s SET LOCALITY REGIONAL BY ROW`, tableName), + pgcode.InvalidTableDefinition), nil } hasSchemaChange, err := og.tableHasOngoingSchemaChanges(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } databaseHasRegionChange, err := og.databaseHasRegionChange(ctx, tx) if err != nil { - return "", err + return nil, err } databaseHasMultiRegion, err := og.databaseIsMultiRegion(ctx, tx) if err != nil { - return "", err + return nil, err } if hasSchemaChange || databaseHasRegionChange || !databaseHasMultiRegion { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return `ALTER TABLE invalid_table SET LOCALITY REGIONAL BY ROW`, nil + return makeOpStmtForSingleError(OpStmtDDL, + `ALTER TABLE invalid_table SET LOCALITY REGIONAL BY ROW`, + pgcode.UndefinedTable), nil } - + stmt := makeOpStmt(OpStmtDDL) localityOptions := []func() (string, error){ func() (string, error) { return "REGIONAL BY TABLE", nil @@ -556,7 +565,7 @@ func (og *operationGenerator) alterTableLocality(ctx context.Context, tx pgx.Tx) for _, col := range columnNames { if col.name == tree.RegionalByRowRegionDefaultCol && col.nullable { - og.expectedExecErrors.add(pgcode.InvalidTableDefinition) + stmt.expectedExecErrors.add(pgcode.InvalidTableDefinition) } } } @@ -567,9 +576,10 @@ func (og *operationGenerator) alterTableLocality(ctx context.Context, tx pgx.Tx) idx := og.params.rng.Intn(len(localityOptions)) toLocality, err := localityOptions[idx]() if err != nil { - return "", err + return nil, err } - return fmt.Sprintf(`ALTER TABLE %s SET LOCALITY %s`, tableName, toLocality), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s SET LOCALITY %s`, tableName, toLocality) + return stmt, nil } func (og *operationGenerator) getClusterRegionNames( @@ -652,36 +662,38 @@ func (og *operationGenerator) scanRegionNames( return regionNames, nil } -func (og *operationGenerator) addRegion(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) addRegion(ctx context.Context, tx pgx.Tx) (*opStmt, error) { regionResult, err := og.getRegions(ctx, tx) if err != nil { - return "", err + return nil, err } database, err := og.getDatabase(ctx, tx) if err != nil { - return "", err + return nil, err } // No regions in cluster, try add an invalid region and expect an error. if len(regionResult.regionNamesInCluster) == 0 { - og.expectedExecErrors.add(pgcode.InvalidDatabaseDefinition) - return fmt.Sprintf(`ALTER DATABASE %s ADD REGION "invalid-region"`, database), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER DATABASE %s ADD REGION "invalid-region"`, database), + pgcode.InvalidDatabaseDefinition), nil } // No regions in database, add a random region from the cluster and expect an error. if len(regionResult.regionNamesInDatabase) == 0 { idx := og.params.rng.Intn(len(regionResult.regionNamesInCluster)) - og.expectedExecErrors.add(pgcode.InvalidDatabaseDefinition) - return fmt.Sprintf( - `ALTER DATABASE %s ADD REGION "%s"`, - database, - regionResult.regionNamesInCluster[idx], - ), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf( + `ALTER DATABASE %s ADD REGION "%s"`, + database, + regionResult.regionNamesInCluster[idx], + ), + pgcode.InvalidDatabaseDefinition), nil } // If the database is undergoing a regional by row related change on the // database, error out. if len(regionResult.regionNamesInDatabase) > 0 { databaseHasRegionalByRowChange, err := og.databaseHasRegionalByRowChange(ctx, tx) if err != nil { - return "", err + return nil, err } if databaseHasRegionalByRowChange { // There's a timing hole here, as by the time we issue the ADD @@ -689,90 +701,98 @@ func (og *operationGenerator) addRegion(ctx context.Context, tx pgx.Tx) (string, // already completed. Either way, we'll get one of the following // two errors (the first, if the schema change has completed, and // the second, if it has not). - og.expectedExecErrors.add(pgcode.InvalidName) - og.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) - return fmt.Sprintf(`ALTER DATABASE %s ADD REGION "invalid-region"`, database), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER DATABASE %s ADD REGION "invalid-region"`, database), + pgcode.InvalidName, + pgcode.ObjectNotInPrerequisiteState), nil } } // All regions are already in the database, expect an error with adding an existing one. if len(regionResult.regionNamesNotInDatabase) == 0 { idx := og.params.rng.Intn(len(regionResult.regionNamesInDatabase)) - og.expectedExecErrors.add(pgcode.DuplicateObject) - return fmt.Sprintf( - `ALTER DATABASE %s ADD REGION "%s"`, - database, - regionResult.regionNamesInDatabase[idx], - ), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf( + `ALTER DATABASE %s ADD REGION "%s"`, + database, + regionResult.regionNamesInDatabase[idx], + ), + pgcode.DuplicateObject), nil } // Here we have a region that is not yet marked as public on the enum. // Double check this first. + stmt := makeOpStmt(OpStmtDDL) idx := og.params.rng.Intn(len(regionResult.regionNamesNotInDatabase)) region := regionResult.regionNamesNotInDatabase[idx] valuePresent, err := og.enumMemberPresent(ctx, tx, tree.RegionEnum, string(region)) if err != nil { - return "", err + return nil, err } if valuePresent { - og.expectedExecErrors.add(pgcode.DuplicateObject) + stmt.expectedExecErrors.add(pgcode.DuplicateObject) } - return fmt.Sprintf( + stmt.sql = fmt.Sprintf( `ALTER DATABASE %s ADD REGION "%s"`, database, region, - ), nil + ) + return stmt, nil } -func (og *operationGenerator) primaryRegion(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) primaryRegion(ctx context.Context, tx pgx.Tx) (*opStmt, error) { regionResult, err := og.getRegions(ctx, tx) if err != nil { - return "", err + return nil, err } database, err := og.getDatabase(ctx, tx) if err != nil { - return "", err + return nil, err } // No regions in cluster, try PRIMARY REGION an invalid region and expect an error. if len(regionResult.regionNamesInCluster) == 0 { - og.expectedExecErrors.add(pgcode.InvalidDatabaseDefinition) - return fmt.Sprintf(`ALTER DATABASE %s PRIMARY REGION "invalid-region"`, database), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER DATABASE %s PRIMARY REGION "invalid-region"`, database), + pgcode.InvalidDatabaseDefinition), nil } // Conversion to multi-region is only allowed if the data is not already // partitioned. + stmt := makeOpStmt(OpStmtDDL) hasPartitioning, err := og.databaseHasTablesWithPartitioning(ctx, tx, database) if err != nil { - return "", err + return nil, err } if hasPartitioning { - og.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) + stmt.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) } // No regions in database, set a random region to be the PRIMARY REGION. if len(regionResult.regionNamesInDatabase) == 0 { idx := og.params.rng.Intn(len(regionResult.regionNamesInCluster)) - return fmt.Sprintf( + stmt.sql = fmt.Sprintf( `ALTER DATABASE %s PRIMARY REGION "%s"`, database, regionResult.regionNamesInCluster[idx], - ), nil + ) + return stmt, nil } // Regions exist in database, so set a random region to be the primary region. idx := og.params.rng.Intn(len(regionResult.regionNamesInDatabase)) - return fmt.Sprintf( + stmt.sql = fmt.Sprintf( `ALTER DATABASE %s PRIMARY REGION "%s"`, database, regionResult.regionNamesInDatabase[idx], - ), nil + ) + return stmt, nil } func (og *operationGenerator) addForeignKeyConstraint( ctx context.Context, tx pgx.Tx, -) (string, error) { +) (*opStmt, error) { parentTable, parentColumn, err := og.randParentColumnForFkRelation(ctx, tx, og.randIntn(100) >= og.params.fkParentInvalidPct) if err != nil { - return "", err + return nil, err } fetchInvalidChild := og.randIntn(100) < og.params.fkChildInvalidPct @@ -781,7 +801,7 @@ func (og *operationGenerator) addForeignKeyConstraint( if fetchInvalidChild { _, typ, err := og.randType(ctx, tx, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } if typ != nil { childType = typ @@ -790,7 +810,7 @@ func (og *operationGenerator) addForeignKeyConstraint( childTable, childColumn, err := og.randChildColumnForFkRelation(ctx, tx, !fetchInvalidChild, childType.SQLString()) if err != nil { - return "", err + return nil, err } constraintName := tree.Name(fmt.Sprintf("%s_%s_%s_%s_fk", parentTable.Object(), parentColumn.name, childTable.Object(), childColumn.name)) @@ -816,23 +836,23 @@ func (og *operationGenerator) addForeignKeyConstraint( parentColumnHasUniqueConstraint, err := og.columnHasSingleUniqueConstraint(ctx, tx, parentTable, parentColumn.name) if err != nil { - return "", err + return nil, err } parentColumnIsVirtualComputed, err := og.columnIsVirtualComputed(ctx, tx, parentTable, parentColumn.name) if err != nil { - return "", err + return nil, err } childColumnIsVirtualComputed, err := og.columnIsVirtualComputed(ctx, tx, childTable, childColumn.name) if err != nil { - return "", err + return nil, err } childColumnIsStoredVirtual, err := og.columnIsStoredComputed(ctx, tx, childTable, childColumn.name) if err != nil { - return "", err + return nil, err } constraintExists, err := og.constraintExists(ctx, tx, string(constraintName)) if err != nil { - return "", err + return nil, err } // If we are intentionally using an invalid child type, then it doesn't make // sense to validate if the rows validate the constraint. @@ -840,10 +860,11 @@ func (og *operationGenerator) addForeignKeyConstraint( if !fetchInvalidChild { rowsSatisfyConstraint, err = og.rowsSatisfyFkConstraint(ctx, tx, parentTable, parentColumn, childTable, childColumn) if err != nil { - return "", err + return nil, err } } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.ForeignKeyViolation, condition: !parentColumnHasUniqueConstraint}, {code: pgcode.FeatureNotSupported, condition: childColumnIsVirtualComputed}, @@ -851,7 +872,7 @@ func (og *operationGenerator) addForeignKeyConstraint( {code: pgcode.FeatureNotSupported, condition: parentColumnIsVirtualComputed}, {code: pgcode.DuplicateObject, condition: constraintExists}, {code: pgcode.DatatypeMismatch, condition: !childColumn.typ.Equivalent(parentColumn.typ)}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) codesWithConditions{}.add(og.expectedCommitErrors) // TODO(fqazi): We need to do after the fact validation for foreign key violations @@ -867,22 +888,21 @@ func (og *operationGenerator) addForeignKeyConstraint( // validation. In which case a potential commit error is an undefined table // error. og.potentialCommitErrors.add(pgcode.UndefinedTable) - - return tree.Serialize(def), nil + stmt.sql = tree.Serialize(def) + return stmt, nil } -func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) def := &tree.CreateIndex{ Name: tree.Name("IrrelevantName"), Table: *tableName, @@ -890,26 +910,28 @@ func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (strin {Column: "IrrelevantColumn", Direction: tree.Ascending}, }, } - return tree.Serialize(def), nil + return makeOpStmtForSingleError(OpStmtDDL, + tree.Serialize(def), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnNames, err := og.getTableColumns(ctx, tx, tableName.String(), true) if err != nil { - return "", err + return nil, err } indexName, err := og.randIndex(ctx, tx, *tableName, og.pctExisting(false)) if err != nil { - return "", err + return nil, err } indexExists, err := og.indexExists(ctx, tx, tableName, indexName) if err != nil { - return "", err + return nil, err } def := &tree.CreateIndex{ @@ -926,12 +948,12 @@ func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (strin regionColumn := "" tableIsRegionalByRow, err := og.tableIsRegionalByRow(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if tableIsRegionalByRow { regionColumn, err = og.getRegionColumn(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } } @@ -967,6 +989,7 @@ func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (strin // If there are extra columns not used in the index, randomly use them // as stored columns. + stmt := makeOpStmt(OpStmtDDL) duplicateStore := false virtualComputedStored := false regionColStored := false @@ -985,7 +1008,7 @@ func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (strin if columnNames[i].generated && !virtualComputedStored { isStored, err := og.columnIsStoredComputed(ctx, tx, tableName, columnNames[i].name) if err != nil { - return "", err + return nil, err } if !isStored { virtualComputedStored = true @@ -997,7 +1020,7 @@ func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (strin if !duplicateStore { colUsedInPrimaryIdx, err := og.colIsPrimaryKey(ctx, tx, tableName, columnNames[i].name) if err != nil { - return "", err + return nil, err } if colUsedInPrimaryIdx { duplicateStore = true @@ -1015,21 +1038,21 @@ func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (strin } uniqueViolationWillNotOccur, err = og.canApplyUniqueConstraint(ctx, tx, tableName, columns) if err != nil { - return "", err + return nil, err } } hasAlterPKSchemaChange, err := og.tableHasOngoingAlterPKSchemaChanges(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } databaseHasRegionChange, err := og.databaseHasRegionChange(ctx, tx) if err != nil { - return "", err + return nil, err } if databaseHasRegionChange && tableIsRegionalByRow { - og.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) + stmt.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) } // When an index exists, but `IF NOT EXISTS` is used, then @@ -1052,25 +1075,26 @@ func (og *operationGenerator) createIndex(ctx context.Context, tx pgx.Tx) (strin {code: pgcode.FeatureNotSupported, condition: duplicateRegionColumn}, {code: pgcode.Uncategorized, condition: virtualComputedStored}, {code: pgcode.FeatureNotSupported, condition: hasAlterPKSchemaChange}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) } - return tree.Serialize(def), nil + stmt.sql = tree.Serialize(def) + return stmt, nil } -func (og *operationGenerator) createSequence(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) createSequence(ctx context.Context, tx pgx.Tx) (*opStmt, error) { seqName, err := og.randSequence(ctx, tx, og.pctExisting(false), "") if err != nil { - return "", err + return nil, err } schemaExists, err := og.schemaExists(ctx, tx, seqName.Schema()) if err != nil { - return "", err + return nil, err } sequenceExists, err := og.sequenceExists(ctx, tx, seqName) if err != nil { - return "", err + return nil, err } // If the sequence exists and an error should be produced, then @@ -1080,11 +1104,11 @@ func (og *operationGenerator) createSequence(ctx context.Context, tx pgx.Tx) (st if sequenceExists && og.produceError() { ifNotExists = false } - + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.UndefinedSchema, condition: !schemaExists}, {code: pgcode.DuplicateRelation, condition: sequenceExists && !ifNotExists}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) var seqOptions tree.SequenceOptions // Decide if the sequence should be owned by a column. If so, it can @@ -1092,11 +1116,11 @@ func (og *operationGenerator) createSequence(ctx context.Context, tx pgx.Tx) (st if og.randIntn(100) < og.params.sequenceOwnedByPct { table, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, table) if err != nil { - return "", err + return nil, err } if !tableExists { @@ -1107,21 +1131,21 @@ func (og *operationGenerator) createSequence(ctx context.Context, tx pgx.Tx) (st ColumnItemVal: &tree.ColumnItem{TableName: table.ToUnresolvedObjectName(), ColumnName: "IrrelevantColumnName"}}, ) if !(sequenceExists && ifNotExists) { // IF NOT EXISTS prevents the error - og.expectedExecErrors.add(pgcode.UndefinedTable) + stmt.expectedExecErrors.add(pgcode.UndefinedTable) } } else { column, err := og.randColumn(ctx, tx, *table, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, table, column) if err != nil { - return "", err + return nil, err } // If a duplicate sequence exists, then a new sequence will not be created. In this case, // a pgcode.UndefinedColumn will not occur. if !columnExists && !sequenceExists { - og.expectedExecErrors.add(pgcode.UndefinedColumn) + stmt.expectedExecErrors.add(pgcode.UndefinedColumn) } seqOptions = append( @@ -1139,22 +1163,23 @@ func (og *operationGenerator) createSequence(ctx context.Context, tx pgx.Tx) (st Options: seqOptions, } - return tree.Serialize(createSeq), nil + stmt.sql = tree.Serialize(createSeq) + return stmt, nil } -func (og *operationGenerator) createTable(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) createTable(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(false), "") if err != nil { - return "", err + return nil, err } tableIdx, err := strconv.Atoi(strings.TrimPrefix(tableName.Table(), "table")) if err != nil { - return "", err + return nil, err } databaseHasMultiRegion, err := og.databaseIsMultiRegion(ctx, tx) if err != nil { - return "", err + return nil, err } stmt := randgen.RandCreateTableWithColumnIndexNumberGenerator(og.params.rng, "table", tableIdx, databaseHasMultiRegion, og.newUniqueSeqNum) @@ -1163,39 +1188,43 @@ func (og *operationGenerator) createTable(ctx context.Context, tx pgx.Tx) (strin tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } schemaExists, err := og.schemaExists(ctx, tx, tableName.Schema()) if err != nil { - return "", err + return nil, err } + opStmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.DuplicateRelation, condition: tableExists && !stmt.IfNotExists}, {code: pgcode.UndefinedSchema, condition: !schemaExists}, - }.add(og.expectedExecErrors) - - return tree.Serialize(stmt), nil + }.add(opStmt.expectedExecErrors) + opStmt.sql = tree.Serialize(stmt) + return opStmt, nil } -func (og *operationGenerator) createEnum(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) createEnum(ctx context.Context, tx pgx.Tx) (*opStmt, error) { typName, typeExists, err := og.randEnum(ctx, tx, og.pctExisting(false)) if err != nil { - return "", err + return nil, err } schemaExists, err := og.schemaExists(ctx, tx, typName.Schema()) if err != nil { - return "", err + return nil, err } + opStmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.DuplicateObject, condition: typeExists}, {code: pgcode.InvalidSchemaName, condition: !schemaExists}, - }.add(og.expectedExecErrors) + }.add(opStmt.expectedExecErrors) stmt := randgen.RandCreateType(og.params.rng, typName.Object(), "asdf") stmt.(*tree.CreateType).TypeName = typName.ToUnresolvedObjectName() - return tree.Serialize(stmt), nil + opStmt.sql = tree.Serialize(stmt) + return opStmt, nil } -func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*opStmt, error) { + const MaxRowsToConsume = 300000 numSourceTables := og.randIntn(og.params.maxSourceTables) + 1 sourceTableNames := make([]tree.TableExpr, numSourceTables) @@ -1218,21 +1247,21 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (str case 0: tableName, err = og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } sourceTableExists, err = og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } case 1: tableName, err = og.randView(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } sourceTableExists, err = og.viewExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } } @@ -1252,6 +1281,7 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (str // uniqueColumnNames and duplicateColumns are used to track unique // columns. If there are any duplicates, then a pgcode.DuplicateColumn error // is expected on execution. + opStmt := makeOpStmt(OpStmtDDL) uniqueColumnNames := map[string]bool{} duplicateColumns := false for i := 0; i < numSourceTables; i++ { @@ -1263,7 +1293,7 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (str if tableExists { columnNamesForTable, err := og.tableColumnsShuffled(ctx, tx, tableName.(*tree.TableName).String()) if err != nil { - return "", err + return nil, err } columnNamesForTable = columnNamesForTable[:1+og.randIntn(len(columnNamesForTable))] @@ -1281,7 +1311,7 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (str } } } else { - og.expectedExecErrors.add(pgcode.UndefinedTable) + opStmt.expectedExecErrors.add(pgcode.UndefinedTable) colItem := tree.ColumnItem{ ColumnName: tree.Name("IrrelevantColumnName"), } @@ -1291,15 +1321,15 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (str destTableName, err := og.randTable(ctx, tx, og.pctExisting(false), "") if err != nil { - return "", err + return nil, err } schemaExists, err := og.schemaExists(ctx, tx, destTableName.Schema()) if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, destTableName) if err != nil { - return "", err + return nil, err } codesWithConditions{ @@ -1308,13 +1338,14 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (str {code: pgcode.Syntax, condition: len(selectStatement.Exprs) == 0}, {code: pgcode.DuplicateAlias, condition: duplicateSourceTables}, {code: pgcode.DuplicateColumn, condition: duplicateColumns}, - }.add(og.expectedExecErrors) + }.add(opStmt.expectedExecErrors) - return fmt.Sprintf(`CREATE TABLE %s AS %s`, - destTableName, selectStatement.String()), nil + opStmt.sql = fmt.Sprintf(`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY`, + destTableName, selectStatement.String(), MaxRowsToConsume) + return opStmt, nil } -func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (*opStmt, error) { numSourceTables := og.randIntn(og.params.maxSourceTables) + 1 @@ -1338,21 +1369,21 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (string case 0: tableName, err = og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } sourceTableExists, err = og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } case 1: tableName, err = og.randView(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } sourceTableExists, err = og.viewExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } } @@ -1372,6 +1403,7 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (string // uniqueColumnNames and duplicateColumns are used to track unique // columns. If there are any duplicates, then a pgcode.DuplicateColumn error // is expected on execution. + opStmt := makeOpStmt(OpStmtDDL) uniqueColumnNames := map[string]bool{} duplicateColumns := false for i := 0; i < numSourceTables; i++ { @@ -1383,7 +1415,7 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (string if tableExists { columnNamesForTable, err := og.tableColumnsShuffled(ctx, tx, tableName.(*tree.TableName).String()) if err != nil { - return "", err + return nil, err } columnNamesForTable = columnNamesForTable[:1+og.randIntn(len(columnNamesForTable))] @@ -1401,7 +1433,7 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (string } } } else { - og.expectedExecErrors.add(pgcode.UndefinedTable) + opStmt.expectedExecErrors.add(pgcode.UndefinedTable) colItem := tree.ColumnItem{ ColumnName: tree.Name("IrrelevantColumnName"), } @@ -1411,15 +1443,15 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (string destViewName, err := og.randView(ctx, tx, og.pctExisting(false), "") if err != nil { - return "", err + return nil, err } schemaExists, err := og.schemaExists(ctx, tx, destViewName.Schema()) if err != nil { - return "", err + return nil, err } viewExists, err := og.viewExists(ctx, tx, destViewName) if err != nil { - return "", err + return nil, err } codesWithConditions{ @@ -1428,217 +1460,225 @@ func (og *operationGenerator) createView(ctx context.Context, tx pgx.Tx) (string {code: pgcode.Syntax, condition: len(selectStatement.Exprs) == 0}, {code: pgcode.DuplicateAlias, condition: duplicateSourceTables}, {code: pgcode.DuplicateColumn, condition: duplicateColumns}, - }.add(og.expectedExecErrors) + }.add(opStmt.expectedExecErrors) - return fmt.Sprintf(`CREATE VIEW %s AS %s`, - destViewName, selectStatement.String()), nil + opStmt.sql = fmt.Sprintf(`CREATE VIEW %s AS %s`, + destViewName, selectStatement.String()) + return opStmt, nil } -func (og *operationGenerator) dropColumn(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropColumn(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IrrelevantColumnName"`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "IrrelevantColumnName"`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } colIsPrimaryKey, err := og.colIsPrimaryKey(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } columnIsDependedOn, err := og.columnIsDependedOn(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } columnIsInDroppingIndex, err := og.columnIsInDroppingIndex(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } hasAlterPKSchemaChange, err := og.tableHasOngoingAlterPKSchemaChanges(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.ObjectNotInPrerequisiteState, condition: columnIsInDroppingIndex}, {code: pgcode.UndefinedColumn, condition: !columnExists}, {code: pgcode.InvalidColumnReference, condition: colIsPrimaryKey}, {code: pgcode.DependentObjectsStillExist, condition: columnIsDependedOn}, {code: pgcode.FeatureNotSupported, condition: hasAlterPKSchemaChange}, - }.add(og.expectedExecErrors) - - return fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "%s"`, tableName, columnName), nil + }.add(stmt.expectedExecErrors) + stmt.sql = fmt.Sprintf(`ALTER TABLE %s DROP COLUMN "%s"`, tableName, columnName) + return stmt, nil } -func (og *operationGenerator) dropColumnDefault(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropColumnDefault(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "IrrelevantColumnName" DROP DEFAULT`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "IrrelevantColumnName" DROP DEFAULT`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) if !columnExists { - og.expectedExecErrors.add(pgcode.UndefinedColumn) + stmt.expectedExecErrors.add(pgcode.UndefinedColumn) } - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" DROP DEFAULT`, tableName, columnName), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" DROP DEFAULT`, tableName, columnName) + return stmt, nil } -func (og *operationGenerator) dropColumnNotNull(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropColumnNotNull(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "IrrelevantColumnName" DROP NOT NULL`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "IrrelevantColumnName" DROP NOT NULL`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } colIsPrimaryKey, err := og.colIsPrimaryKey(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } - hasAlterPKSchemaChange, err := og.tableHasOngoingAlterPKSchemaChanges(ctx, tx, tableName) + err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err - } - if hasAlterPKSchemaChange { - // Possible timing hole. Don't issue this schema change with a - // background PK change in progress. Tracked with #66663. - return `SELECT 'avoiding timing hole'`, nil + return nil, err } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {pgcode.UndefinedColumn, !columnExists}, {pgcode.InvalidTableDefinition, colIsPrimaryKey}, - }.add(og.expectedExecErrors) - - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" DROP NOT NULL`, tableName, columnName), nil + }.add(stmt.expectedExecErrors) + stmt.sql = fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" DROP NOT NULL`, tableName, columnName) + return stmt, nil } -func (og *operationGenerator) dropColumnStored(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropColumnStored(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN IrrelevantColumnName DROP STORED`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN IrrelevantColumnName DROP STORED`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } columnIsStored, err := og.columnIsStoredComputed(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.InvalidColumnDefinition, condition: !columnIsStored}, {code: pgcode.UndefinedColumn, condition: !columnExists}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" DROP STORED`, tableName, columnName), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" DROP STORED`, tableName, columnName) + return stmt, nil } -func (og *operationGenerator) dropConstraint(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropConstraint(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s DROP CONSTRAINT IrrelevantConstraintName`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s DROP CONSTRAINT IrrelevantConstraintName`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } constraintName, err := og.randConstraint(ctx, tx, tableName.String()) if err != nil { - return "", err + return nil, err } // Dropping the primary key of a table without adding a new primary key // subsequently in the transaction is not supported. Since addConstraint is not implemented, // a replacement primary key will not be created in the same transaction. Thus, // dropping a primary key will always produce an error. + stmt := makeOpStmt(OpStmtDDL) constraintIsPrimary, err := og.constraintIsPrimary(ctx, tx, tableName, constraintName) if err != nil { - return "", err + return nil, err } if constraintIsPrimary { og.candidateExpectedCommitErrors.add(pgcode.FeatureNotSupported) @@ -1648,81 +1688,85 @@ func (og *operationGenerator) dropConstraint(ctx context.Context, tx pgx.Tx) (st // dropping the constraint with ALTER TABLE ... DROP CONSTRAINT is unsupported. constraintIsUnique, err := og.constraintIsUnique(ctx, tx, tableName, constraintName) if err != nil { - return "", err + return nil, err } if constraintIsUnique { - og.expectedExecErrors.add(pgcode.FeatureNotSupported) + stmt.expectedExecErrors.add(pgcode.FeatureNotSupported) } constraintBeingDropped, err := og.constraintInDroppingState(ctx, tx, tableName, constraintName) if err != nil { - return "", err + return nil, err } if constraintBeingDropped { - og.expectedExecErrors.add(pgcode.FeatureNotSupported) + stmt.expectedExecErrors.add(pgcode.FeatureNotSupported) } - return fmt.Sprintf(`ALTER TABLE %s DROP CONSTRAINT "%s"`, tableName, constraintName), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s DROP CONSTRAINT "%s"`, tableName, constraintName) + return stmt, nil } -func (og *operationGenerator) dropIndex(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropIndex(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`DROP INDEX %s@"IrrelevantIndexName"`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`DROP INDEX %s@"IrrelevantIndexName"`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } indexName, err := og.randIndex(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) indexExists, err := og.indexExists(ctx, tx, tableName, indexName) if err != nil { - return "", err + return nil, err } if !indexExists { - og.expectedExecErrors.add(pgcode.UndefinedObject) + stmt.expectedExecErrors.add(pgcode.UndefinedObject) } hasAlterPKSchemaChange, err := og.tableHasOngoingAlterPKSchemaChanges(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if hasAlterPKSchemaChange { - og.expectedExecErrors.add(pgcode.FeatureNotSupported) + stmt.expectedExecErrors.add(pgcode.FeatureNotSupported) } databaseHasRegionChange, err := og.databaseHasRegionChange(ctx, tx) if err != nil { - return "", err + return nil, err } tableIsRegionalByRow, err := og.tableIsRegionalByRow(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if databaseHasRegionChange && tableIsRegionalByRow { - og.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) + stmt.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) } - return fmt.Sprintf(`DROP INDEX %s@"%s" CASCADE`, tableName, indexName), nil + stmt.sql = fmt.Sprintf(`DROP INDEX %s@"%s" CASCADE`, tableName, indexName) + return stmt, nil } -func (og *operationGenerator) dropSequence(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropSequence(ctx context.Context, tx pgx.Tx) (*opStmt, error) { sequenceName, err := og.randSequence(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } ifExists := og.randIntn(2) == 0 dropSeq := &tree.DropSequence{ @@ -1730,28 +1774,30 @@ func (og *operationGenerator) dropSequence(ctx context.Context, tx pgx.Tx) (stri IfExists: ifExists, } + stmt := makeOpStmt(OpStmtDDL) sequenceExists, err := og.sequenceExists(ctx, tx, sequenceName) if err != nil { - return "", err + return nil, err } if !sequenceExists && !ifExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) + stmt.expectedExecErrors.add(pgcode.UndefinedTable) } - return tree.Serialize(dropSeq), nil + stmt.sql = tree.Serialize(dropSeq) + return stmt, nil } -func (og *operationGenerator) dropTable(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropTable(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } tableHasDependencies, err := og.tableHasDependencies(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } dropBehavior := tree.DropBehavior(og.randIntn(3)) @@ -1763,26 +1809,27 @@ func (og *operationGenerator) dropTable(ctx context.Context, tx pgx.Tx) (string, DropBehavior: dropBehavior, } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {pgcode.UndefinedTable, !ifExists && !tableExists}, {pgcode.DependentObjectsStillExist, dropBehavior != tree.DropCascade && tableHasDependencies}, - }.add(og.expectedExecErrors) - - return dropTable.String(), nil + }.add(stmt.expectedExecErrors) + stmt.sql = dropTable.String() + return stmt, nil } -func (og *operationGenerator) dropView(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropView(ctx context.Context, tx pgx.Tx) (*opStmt, error) { viewName, err := og.randView(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } viewExists, err := og.tableExists(ctx, tx, viewName) if err != nil { - return "", err + return nil, err } viewHasDependencies, err := og.tableHasDependencies(ctx, tx, viewName) if err != nil { - return "", err + return nil, err } dropBehavior := tree.DropBehavior(og.randIntn(3)) @@ -1794,118 +1841,126 @@ func (og *operationGenerator) dropView(ctx context.Context, tx pgx.Tx) (string, DropBehavior: dropBehavior, } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {pgcode.UndefinedTable, !ifExists && !viewExists}, {pgcode.DependentObjectsStillExist, dropBehavior != tree.DropCascade && viewHasDependencies}, - }.add(og.expectedExecErrors) - return dropView.String(), nil + }.add(stmt.expectedExecErrors) + stmt.sql = dropView.String() + return stmt, nil } -func (og *operationGenerator) renameColumn(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) renameColumn(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } srcTableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !srcTableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s RENAME COLUMN "IrrelevantColumnName" TO "OtherIrrelevantName"`, - tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s RENAME COLUMN "IrrelevantColumnName" TO "OtherIrrelevantName"`, + tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } srcColumnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } destColumnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(false)) if err != nil { - return "", err + return nil, err } srcColumnExists, err := og.columnExistsOnTable(ctx, tx, tableName, srcColumnName) if err != nil { - return "", err + return nil, err } destColumnExists, err := og.columnExistsOnTable(ctx, tx, tableName, destColumnName) if err != nil { - return "", err + return nil, err } columnIsDependedOn, err := og.columnIsDependedOn(ctx, tx, tableName, srcColumnName) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {pgcode.UndefinedColumn, !srcColumnExists}, {pgcode.DuplicateColumn, destColumnExists && srcColumnName != destColumnName}, {pgcode.DependentObjectsStillExist, columnIsDependedOn}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) - return fmt.Sprintf(`ALTER TABLE %s RENAME COLUMN "%s" TO "%s"`, - tableName, srcColumnName, destColumnName), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s RENAME COLUMN "%s" TO "%s"`, + tableName, srcColumnName, destColumnName) + return stmt, nil } -func (og *operationGenerator) renameIndex(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) renameIndex(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } srcTableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !srcTableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER INDEX %s@"IrrelevantConstraintName" RENAME TO "OtherConstraintName"`, - tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER INDEX %s@"IrrelevantConstraintName" RENAME TO "OtherConstraintName"`, + tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } srcIndexName, err := og.randIndex(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } destIndexName, err := og.randIndex(ctx, tx, *tableName, og.pctExisting(false)) if err != nil { - return "", err + return nil, err } srcIndexExists, err := og.indexExists(ctx, tx, tableName, srcIndexName) if err != nil { - return "", err + return nil, err } destIndexExists, err := og.indexExists(ctx, tx, tableName, destIndexName) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.UndefinedObject, condition: !srcIndexExists}, {code: pgcode.DuplicateRelation, condition: destIndexExists && srcIndexName != destIndexName}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) - return fmt.Sprintf(`ALTER INDEX %s@"%s" RENAME TO "%s"`, - tableName, srcIndexName, destIndexName), nil + stmt.sql = fmt.Sprintf(`ALTER INDEX %s@"%s" RENAME TO "%s"`, + tableName, srcIndexName, destIndexName) + return stmt, nil } -func (og *operationGenerator) renameSequence(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) renameSequence(ctx context.Context, tx pgx.Tx) (*opStmt, error) { srcSequenceName, err := og.randSequence(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } // Decide whether or not to produce a 'cannot change schema of table with RENAME' error @@ -1916,39 +1971,41 @@ func (og *operationGenerator) renameSequence(ctx context.Context, tx pgx.Tx) (st destSequenceName, err := og.randSequence(ctx, tx, og.pctExisting(false), desiredSchema) if err != nil { - return "", err + return nil, err } srcSequenceExists, err := og.sequenceExists(ctx, tx, srcSequenceName) if err != nil { - return "", err + return nil, err } destSchemaExists, err := og.schemaExists(ctx, tx, destSequenceName.Schema()) if err != nil { - return "", err + return nil, err } destSequenceExists, err := og.sequenceExists(ctx, tx, destSequenceName) if err != nil { - return "", err + return nil, err } srcEqualsDest := srcSequenceName.String() == destSequenceName.String() + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.UndefinedTable, condition: !srcSequenceExists}, {code: pgcode.UndefinedSchema, condition: !destSchemaExists}, {code: pgcode.DuplicateRelation, condition: !srcEqualsDest && destSequenceExists}, {code: pgcode.InvalidName, condition: srcSequenceName.Schema() != destSequenceName.Schema()}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) - return fmt.Sprintf(`ALTER SEQUENCE %s RENAME TO %s`, srcSequenceName, destSequenceName), nil + stmt.sql = fmt.Sprintf(`ALTER SEQUENCE %s RENAME TO %s`, srcSequenceName, destSequenceName) + return stmt, nil } -func (og *operationGenerator) renameTable(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) renameTable(ctx context.Context, tx pgx.Tx) (*opStmt, error) { srcTableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } // Decide whether or not to produce a 'cannot change schema of table with RENAME' error @@ -1958,51 +2015,53 @@ func (og *operationGenerator) renameTable(ctx context.Context, tx pgx.Tx) (strin } destTableName, err := og.randTable(ctx, tx, og.pctExisting(false), desiredSchema) if err != nil { - return "", err + return nil, err } srcTableExists, err := og.tableExists(ctx, tx, srcTableName) if err != nil { - return "", err + return nil, err } if srcTableExists { err = og.tableHasPrimaryKeySwapActive(ctx, tx, srcTableName) if err != nil { - return "", err + return nil, err } } destSchemaExists, err := og.schemaExists(ctx, tx, destTableName.Schema()) if err != nil { - return "", err + return nil, err } destTableExists, err := og.tableExists(ctx, tx, destTableName) if err != nil { - return "", err + return nil, err } srcTableHasDependencies, err := og.tableHasDependencies(ctx, tx, srcTableName) if err != nil { - return "", err + return nil, err } srcEqualsDest := destTableName.String() == srcTableName.String() + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.UndefinedTable, condition: !srcTableExists}, {code: pgcode.UndefinedSchema, condition: !destSchemaExists}, {code: pgcode.DuplicateRelation, condition: !srcEqualsDest && destTableExists}, {code: pgcode.DependentObjectsStillExist, condition: srcTableHasDependencies}, {code: pgcode.InvalidName, condition: srcTableName.Schema() != destTableName.Schema()}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) - return fmt.Sprintf(`ALTER TABLE %s RENAME TO %s`, srcTableName, destTableName), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s RENAME TO %s`, srcTableName, destTableName) + return stmt, nil } -func (og *operationGenerator) renameView(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) renameView(ctx context.Context, tx pgx.Tx) (*opStmt, error) { srcViewName, err := og.randView(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } // Decide whether or not to produce a 'cannot change schema of table with RENAME' error @@ -2012,74 +2071,78 @@ func (og *operationGenerator) renameView(ctx context.Context, tx pgx.Tx) (string } destViewName, err := og.randView(ctx, tx, og.pctExisting(false), desiredSchema) if err != nil { - return "", err + return nil, err } srcViewExists, err := og.viewExists(ctx, tx, srcViewName) if err != nil { - return "", err + return nil, err } destSchemaExists, err := og.schemaExists(ctx, tx, destViewName.Schema()) if err != nil { - return "", err + return nil, err } destViewExists, err := og.viewExists(ctx, tx, destViewName) if err != nil { - return "", err + return nil, err } srcTableHasDependencies, err := og.tableHasDependencies(ctx, tx, srcViewName) if err != nil { - return "", err + return nil, err } srcEqualsDest := destViewName.String() == srcViewName.String() + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {code: pgcode.UndefinedTable, condition: !srcViewExists}, {code: pgcode.UndefinedSchema, condition: !destSchemaExists}, {code: pgcode.DuplicateRelation, condition: !srcEqualsDest && destViewExists}, {code: pgcode.DependentObjectsStillExist, condition: srcTableHasDependencies}, {code: pgcode.InvalidName, condition: srcViewName.Schema() != destViewName.Schema()}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) - return fmt.Sprintf(`ALTER VIEW %s RENAME TO %s`, srcViewName, destViewName), nil + stmt.sql = fmt.Sprintf(`ALTER VIEW %s RENAME TO %s`, srcViewName, destViewName) + return stmt, nil } -func (og *operationGenerator) setColumnDefault(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) setColumnDefault(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN IrrelevantColumnName SET DEFAULT "IrrelevantValue"`, - tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN IrrelevantColumnName SET DEFAULT "IrrelevantValue"`, + tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnForDefault, err := og.randColumnWithMeta(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, tableName, columnForDefault.name) if err != nil { - return "", err + return nil, err } if !columnExists { - og.expectedExecErrors.add(pgcode.UndefinedColumn) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s SET DEFAULT "IrrelevantValue"`, - tableName, columnForDefault.name), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s SET DEFAULT "IrrelevantValue"`, + tableName, columnForDefault.name), + pgcode.UndefinedColumn), nil } datumTyp := columnForDefault.typ @@ -2087,136 +2150,138 @@ func (og *operationGenerator) setColumnDefault(ctx context.Context, tx pgx.Tx) ( if og.produceError() { newTypeName, newTyp, err := og.randType(ctx, tx, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } if newTyp == nil { - og.expectedExecErrors.add(pgcode.UndefinedObject) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s SET DEFAULT 'IrrelevantValue':::%s`, tableName, columnForDefault.name, newTypeName.SQLString()), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s SET DEFAULT 'IrrelevantValue':::%s`, tableName, columnForDefault.name, newTypeName.SQLString()), + pgcode.UndefinedColumn), nil } datumTyp = newTyp } defaultDatum := randgen.RandDatum(og.params.rng, datumTyp, columnForDefault.nullable) - + stmt := makeOpStmt(OpStmtDDL) if (!datumTyp.Equivalent(columnForDefault.typ)) && defaultDatum != tree.DNull { - og.expectedExecErrors.add(pgcode.DatatypeMismatch) + stmt.expectedExecErrors.add(pgcode.DatatypeMismatch) } // Generated columns cannot have default values. if columnForDefault.generated { - og.expectedExecErrors.add(pgcode.InvalidTableDefinition) + stmt.expectedExecErrors.add(pgcode.InvalidTableDefinition) } - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s SET DEFAULT %s`, tableName, columnForDefault.name, tree.AsStringWithFlags(defaultDatum, tree.FmtParsable)), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN %s SET DEFAULT %s`, tableName, columnForDefault.name, tree.AsStringWithFlags(defaultDatum, tree.FmtParsable)) + return stmt, nil } -func (og *operationGenerator) setColumnNotNull(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) setColumnNotNull(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN IrrelevantColumnName SET NOT NULL`, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN IrrelevantColumnName SET NOT NULL`, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnName, err := og.randColumn(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } constraintBeingAdded, err := og.columnNotNullConstraintInMutation(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) if constraintBeingAdded { - og.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) + stmt.expectedExecErrors.add(pgcode.ObjectNotInPrerequisiteState) } if !columnExists { - og.expectedExecErrors.add(pgcode.UndefinedColumn) + stmt.expectedExecErrors.add(pgcode.UndefinedColumn) } else { // If the column has null values, then a check violation will occur upon committing. colContainsNull, err := og.columnContainsNull(ctx, tx, tableName, columnName) if err != nil { - return "", err + return nil, err } if colContainsNull { og.candidateExpectedCommitErrors.add(pgcode.CheckViolation) } } - hasPKSchemaChanges, err := og.tableHasOngoingAlterPKSchemaChanges(ctx, tx, tableName) + err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err - } - if hasPKSchemaChanges { - // Possible timing hole. Don't issue this schema change with a - // background PK change in progress. Tracked with #66663. - return `SELECT 'avoiding timing hole'`, nil + return nil, err } - - return fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" SET NOT NULL`, tableName, columnName), nil + stmt.sql = fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN "%s" SET NOT NULL`, tableName, columnName) + return stmt, nil } -func (og *operationGenerator) setColumnType(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) setColumnType(ctx context.Context, tx pgx.Tx) (*opStmt, error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", err + return nil, err } const setSessionVariableString = `SET enable_experimental_alter_column_type_general = true;` tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf(`%s ALTER TABLE %s ALTER COLUMN IrrelevantColumnName SET DATA TYPE IrrelevantDataType`, setSessionVariableString, tableName), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`%s ALTER TABLE %s ALTER COLUMN IrrelevantColumnName SET DATA TYPE IrrelevantDataType`, setSessionVariableString, tableName), + pgcode.UndefinedTable), nil } err = og.tableHasPrimaryKeySwapActive(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } columnForTypeChange, err := og.randColumnWithMeta(ctx, tx, *tableName, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnExists, err := og.columnExistsOnTable(ctx, tx, tableName, columnForTypeChange.name) if err != nil { - return "", err + return nil, err } if !columnExists { - og.expectedExecErrors.add(pgcode.UndefinedColumn) - return fmt.Sprintf(`%s ALTER TABLE %s ALTER COLUMN "%s" SET DATA TYPE IrrelevantTypeName`, - setSessionVariableString, tableName, columnForTypeChange.name), nil + return makeOpStmtForSingleError(OpStmtDDL, + fmt.Sprintf(`%s ALTER TABLE %s ALTER COLUMN "%s" SET DATA TYPE IrrelevantTypeName`, + setSessionVariableString, tableName, columnForTypeChange.name), + pgcode.UndefinedColumn), nil } newTypeName, newType, err := og.randType(ctx, tx, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } columnHasDependencies, err := og.columnIsDependedOn(ctx, tx, tableName, columnForTypeChange.name) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) if newType != nil { // Ignoring the error here intentionally, as we want to carry on with // the operation and not fail it prematurely. @@ -2224,22 +2289,23 @@ func (og *operationGenerator) setColumnType(ctx context.Context, tx pgx.Tx) (str codesWithConditions{ {code: pgcode.CannotCoerce, condition: kind == schemachange.ColumnConversionImpossible}, {code: pgcode.FeatureNotSupported, condition: kind != schemachange.ColumnConversionTrivial}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) } codesWithConditions{ {code: pgcode.UndefinedObject, condition: newType == nil}, {code: pgcode.DependentObjectsStillExist, condition: columnHasDependencies}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) - return fmt.Sprintf(`%s ALTER TABLE %s ALTER COLUMN "%s" SET DATA TYPE %s`, - setSessionVariableString, tableName, columnForTypeChange.name, newTypeName.SQLString()), nil + stmt.sql = fmt.Sprintf(`%s ALTER TABLE %s ALTER COLUMN "%s" SET DATA TYPE %s`, + setSessionVariableString, tableName, columnForTypeChange.name, newTypeName.SQLString()) + return stmt, nil } -func (og *operationGenerator) survive(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) survive(ctx context.Context, tx pgx.Tx) (*opStmt, error) { dbRegions, err := og.getDatabaseRegionNames(ctx, tx) if err != nil { - return "", err + return nil, err } // Choose a survival mode based on a coin toss. @@ -2252,6 +2318,7 @@ func (og *operationGenerator) survive(ctx context.Context, tx pgx.Tx) (string, e // Expect 0 regions to fail, and less than three regions to fail // if there are < 3 regions. + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ { code: pgcode.InvalidName, @@ -2261,35 +2328,37 @@ func (og *operationGenerator) survive(ctx context.Context, tx pgx.Tx) (string, e code: pgcode.InvalidParameterValue, condition: needsAtLeastThreeRegions && len(dbRegions) < 3, }, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) dbName, err := og.getDatabase(ctx, tx) if err != nil { - return "", err + return nil, err } - return fmt.Sprintf(`ALTER DATABASE %s SURVIVE %s`, dbName, survive), nil + stmt.sql = fmt.Sprintf(`ALTER DATABASE %s SURVIVE %s`, dbName, survive) + return stmt, nil } -func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (sq string, err error) { +func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *opStmt, err error) { tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") if err != nil { - return "", errors.Wrapf(err, "error getting random table name") + return nil, errors.Wrapf(err, "error getting random table name") } tableExists, err := og.tableExists(ctx, tx, tableName) if err != nil { - return "", err + return nil, err } if !tableExists { - og.expectedExecErrors.add(pgcode.UndefinedTable) - return fmt.Sprintf( - `INSERT INTO %s (IrrelevantColumnName) VALUES ("IrrelevantValue")`, - tableName, - ), nil + return makeOpStmtForSingleError(OpStmtDML, + fmt.Sprintf( + `INSERT INTO %s (IrrelevantColumnName) VALUES ("IrrelevantValue")`, + tableName, + ), + pgcode.UndefinedTable), nil } allColumns, err := og.getTableColumns(ctx, tx, tableName.String(), false) nonGeneratedCols := allColumns if err != nil { - return "", errors.Wrapf(err, "error getting table columns for insert row") + return nil, errors.Wrapf(err, "error getting table columns for insert row") } // Filter out computed columns. @@ -2330,21 +2399,22 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (sq stri } // Verify that none of the generated expressions will blow up on this insert. anyInvalidInserts := false + stmt = makeOpStmt(OpStmtDML) for _, row := range rows { invalidInsert, generatedErrors, potentialErrors, err := og.validateGeneratedExpressionsForInsert(ctx, tx, tableName, colNames, allColumns, row) if err != nil { - return "", err + return nil, err } // We may have errors that are possible, but not guaranteed. potentialErrors.add(og.potentialExecErrors) if invalidInsert { - generatedErrors.add(og.expectedExecErrors) + generatedErrors.add(stmt.expectedExecErrors) // We will be pessimistic and assume that other column related errors can // be hit, since the function above fails only on generated columns. But, // there maybe index expressions with the exact same problem. - og.expectedExecErrors.add(pgcode.NumericValueOutOfRange) - og.expectedExecErrors.add(pgcode.FloatingPointException) - og.expectedExecErrors.add(pgcode.NotNullViolation) + stmt.expectedExecErrors.add(pgcode.NumericValueOutOfRange) + stmt.expectedExecErrors.add(pgcode.FloatingPointException) + stmt.expectedExecErrors.add(pgcode.NotNullViolation) anyInvalidInserts = true } } @@ -2359,31 +2429,31 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (sq stri var generatedErrors codesWithConditions uniqueConstraintViolation, generatedErrors, err = og.valuesViolateUniqueConstraints(ctx, tx, tableName, colNames, allColumns, rows) if err != nil { - return "", err + return nil, err } if !uniqueConstraintViolation { - generatedErrors.add(og.expectedExecErrors) + generatedErrors.add(stmt.expectedExecErrors) } // Verify if the new row will violate fk constraints by checking the constraints and rows // in the database. fkConstraintsEnabled, err := isFkConstraintsEnabled(ctx, tx) if err != nil { - return "", err + return nil, err } if fkConstraintsEnabled { fkViolation, err = og.violatesFkConstraints(ctx, tx, tableName, colNames, rows) } if err != nil { - return "", err + return nil, err } } codesWithConditions{ {code: pgcode.UniqueViolation, condition: uniqueConstraintViolation}, - }.add(og.expectedExecErrors) + }.add(stmt.expectedExecErrors) codesWithConditions{ {code: pgcode.ForeignKeyViolation, condition: fkViolation}, - }.add(og.potentialExecErrors) + }.add(stmt.potentialExecErrors) codesWithConditions{ {code: pgcode.ForeignKeyViolation, condition: fkViolation}, }.add(og.expectedCommitErrors) @@ -2393,19 +2463,148 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (sq stri formattedRows = append(formattedRows, fmt.Sprintf("(%s)", strings.Join(row, ","))) } - return fmt.Sprintf( + stmt.sql = fmt.Sprintf( `INSERT INTO %s (%s) VALUES %s`, tableName, strings.Join(colNames, ","), strings.Join(formattedRows, ","), - ), nil + ) + return stmt, nil +} + +type opStmtType int + +const ( + // OpStmtDDL statement is a data definition language statement. + OpStmtDDL opStmtType = 1 + // OpStmtDML statement is a data manipulation language statement. + OpStmtDML opStmtType = 2 +) + +type opStmtQueryResultCallback func(ctx context.Context, rows pgx.Rows) error + +// opStmt a generated statement that is either DDL or DML, including the potential +// set of execution errors this statement can generate. +type opStmt struct { + // sql the query being executed. + sql string + // queryType family of the query type being executed (DML or DDL). + queryType opStmtType + // expectedExecErrors expected set of execution errors. + expectedExecErrors errorCodeSet + // potentialExecErrors errors that could be potentially seen on execution. + potentialExecErrors errorCodeSet + queryResultCallback opStmtQueryResultCallback +} + +// String implements Stringer +func (s *opStmt) String() string { + return fmt.Sprintf("QUERY: %s, Expected Errors: %s, Potential Errors: %s", + s.sql, + s.expectedExecErrors, + s.potentialExecErrors) +} + +// makeOpStmtForSingleError constructs a statement that will only produce +// an error. +func makeOpStmtForSingleError(queryType opStmtType, sql string, codes ...pgcode.Code) *opStmt { + s := makeOpStmt(queryType) + s.sql = sql + for _, code := range codes { + s.expectedExecErrors.add(code) + } + return s +} + +// makeOpStmt constructs an empty operation of a given type. +func makeOpStmt(queryType opStmtType) *opStmt { + return &opStmt{ + queryType: queryType, + expectedExecErrors: makeExpectedErrorSet(), + potentialExecErrors: makeExpectedErrorSet(), + } +} + +// getErrorState dumps the object state when an error is hit +func (og *operationGenerator) getErrorState(op *opStmt) string { + return fmt.Sprintf("Dumping state before death:\n"+ + "Expected errors: %s\n"+ + "Potential errors: %s\n"+ + "Expected commit errors: %s\n"+ + "Potential commit errors: %s\n"+ + "===========================\n"+ + "Executed queries for generating errors: %s\n"+ + "===========================\n"+ + "Previous statements %s\n", + op.expectedExecErrors, + op.potentialExecErrors, + og.expectedCommitErrors.String(), + og.potentialCommitErrors.String(), + og.GetOpGenLog(), + og.stmtsInTxt) +} + +// executeStmt executes the given operation statement, and validates the result +// of the execution. Note: Commit time failures will be handled separately from +// statement specific logic. +func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenerator) error { + var err error + var rows pgx.Rows + // Statement doesn't produce any result set that needs to be validated. + if s.queryResultCallback == nil { + _, err = tx.Exec(ctx, s.sql) + } else { + rows, err = tx.Query(ctx, s.sql) + } + if err != nil { + // If the error not an instance of pgconn.PgError, then it is unexpected. + pgErr := new(pgconn.PgError) + if !errors.As(err, &pgErr) { + return errors.Mark( + errors.Wrapf(err, "***UNEXPECTED ERROR; Received a non pg error.\n %s", + og.getErrorState(s)), + errRunInTxnFatalSentinel, + ) + } + if pgcode.MakeCode(pgErr.Code) == pgcode.SerializationFailure { + return err + } + if !s.expectedExecErrors.contains(pgcode.MakeCode(pgErr.Code)) && + !s.potentialExecErrors.contains(pgcode.MakeCode(pgErr.Code)) { + return errors.Mark( + errors.Wrapf(err, "***UNEXPECTED ERROR; Received an unexpected execution error.\n %s", + og.getErrorState(s)), + errRunInTxnFatalSentinel, + ) + } + return errors.Mark( + errors.Wrapf(err, "ROLLBACK; Successfully got expected execution error.\n %s", + og.getErrorState(s)), + errRunInTxnRbkSentinel, + ) + } + if !s.expectedExecErrors.empty() { + return errors.Mark( + errors.Newf("***FAIL; Failed to receive an execution error when errors were expected. %s", + og.getErrorState(s)), + errRunInTxnFatalSentinel, + ) + } + // Next validate the result set. + if s.queryResultCallback != nil { + if err := s.queryResultCallback(ctx, rows); err != nil { + return err + } + } + return nil } -func (og *operationGenerator) validate(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) validate(ctx context.Context, tx pgx.Tx) (*opStmt, error) { // Finish validation off by validating multi region zone configs are as expected. // Configs can be invalid if a user decides to override a multi-region field, but // this is not performed by the schemachange workload. - validateStmt := "SELECT 'validating all objects', crdb_internal.validate_multi_region_zone_configs()" + validateStmt := makeOpStmt(OpStmtDML) + validateStmt.sql = "SELECT 'validating all objects', crdb_internal.validate_multi_region_zone_configs()" rows, err := tx.Query(ctx, `SELECT * FROM "".crdb_internal.invalid_objects ORDER BY id`) if err != nil { return validateStmt, err @@ -2426,7 +2625,7 @@ func (og *operationGenerator) validate(ctx context.Context, tx pgx.Tx) (string, } if rows.Err() != nil { - return "", errors.Wrap(rows.Err(), "querying for validation errors failed") + return nil, errors.Wrap(rows.Err(), "querying for validation errors failed") } if len(errs) == 0 { @@ -3063,24 +3262,26 @@ func (og *operationGenerator) randType( return &typeName, typ, nil } -func (og *operationGenerator) createSchema(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) createSchema(ctx context.Context, tx pgx.Tx) (*opStmt, error) { schemaName, err := og.randSchema(ctx, tx, og.pctExisting(false)) if err != nil { - return "", err + return nil, err } ifNotExists := og.randIntn(2) == 0 schemaExists, err := og.schemaExists(ctx, tx, schemaName) if err != nil { - return "", err + return nil, err } + opStmt := makeOpStmt(OpStmtDDL) if schemaExists && !ifNotExists { - og.expectedExecErrors.add(pgcode.DuplicateSchema) + opStmt.expectedExecErrors.add(pgcode.DuplicateSchema) } // TODO(jayshrivastava): Support authorization stmt := randgen.MakeSchemaName(ifNotExists, schemaName, tree.MakeRoleSpecWithRoleName(username.RootUserName().Normalized())) - return tree.Serialize(stmt), nil + opStmt.sql = tree.Serialize(stmt) + return opStmt, nil } func (og *operationGenerator) randSchema( @@ -3105,27 +3306,141 @@ ORDER BY random() return name, nil } -func (og *operationGenerator) dropSchema(ctx context.Context, tx pgx.Tx) (string, error) { +func (og *operationGenerator) dropSchema(ctx context.Context, tx pgx.Tx) (*opStmt, error) { schemaName, err := og.randSchema(ctx, tx, og.pctExisting(true)) if err != nil { - return "", err + return nil, err } schemaExists, err := og.schemaExists(ctx, tx, schemaName) if err != nil { - return "", err + return nil, err } crossReferences, err := og.schemaContainsTypesWithCrossSchemaReferences(ctx, tx, schemaName) if err != nil { - return "", err + return nil, err } + stmt := makeOpStmt(OpStmtDDL) codesWithConditions{ {pgcode.UndefinedSchema, !schemaExists}, {pgcode.InvalidSchemaName, schemaName == tree.PublicSchema}, {pgcode.FeatureNotSupported, crossReferences}, - }.add(og.expectedExecErrors) - - return fmt.Sprintf(`DROP SCHEMA "%s" CASCADE`, schemaName), nil + }.add(stmt.expectedExecErrors) + + stmt.sql = fmt.Sprintf(`DROP SCHEMA "%s" CASCADE`, schemaName) + return stmt, nil +} + +func (og *operationGenerator) selectStmt(ctx context.Context, tx pgx.Tx) (stmt *opStmt, err error) { + const maxTablesForSelect = 3 + const maxColumnsForSelect = 16 + const maxRowsToConsume = 1 + // Select the number of target tables. + numTables := og.randIntn(maxTablesForSelect) + 1 + tableNames := make([]*tree.TableName, numTables) + colInfos := make([][]column, numTables) + allTableExists := true + totalColumns := 0 + for idx := range tableNames { + tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "") + if err != nil { + return nil, errors.Wrapf(err, "error getting random table name") + } + tableExists, err := og.tableExists(ctx, tx, tableName) + if err != nil { + return nil, err + } + tableNames[idx] = tableName + if !tableExists { + allTableExists = false + continue + } + colInfo, err := og.getTableColumns(ctx, tx, tableName.String(), false) + if err != nil { + return nil, err + } + colInfos[idx] = colInfo + totalColumns += len(colInfo) + } + // Determine which columns to select. + selectColumns := strings.Builder{} + numColumnsToSelect := og.randIntn(maxColumnsForSelect) + 1 + if numColumnsToSelect > totalColumns { + numColumnsToSelect = totalColumns + } + // Randomly select our columns from the set of tables. + for colIdx := 0; colIdx < numColumnsToSelect; colIdx++ { + tableIdx := og.randIntn(len(colInfos)) + // Skip over empty tables. + if len(colInfos[tableIdx]) == 0 { + colIdx-- + continue + } + col := colInfos[tableIdx][og.randIntn(len(colInfos[tableIdx]))] + if colIdx != 0 { + selectColumns.WriteString(",") + } + selectColumns.WriteString(fmt.Sprintf("t%d.", tableIdx)) + selectColumns.WriteString(col.name) + selectColumns.WriteString(" AS ") + selectColumns.WriteString(fmt.Sprintf("col%d", colIdx)) + } + // No columns, so anything goes + if totalColumns == 0 { + selectColumns.WriteString("*") + } + + // TODO(fqazi): Start injecting WHERE clauses, joins, and aggregations too + selectQuery := strings.Builder{} + selectQuery.WriteString("SELECT ") + selectQuery.WriteString(selectColumns.String()) + selectQuery.WriteString(" FROM ") + for idx, tableName := range tableNames { + if idx != 0 { + selectQuery.WriteString(",") + } + selectQuery.WriteString(tableName.String()) + selectQuery.WriteString(" AS ") + selectQuery.WriteString(fmt.Sprintf("t%d ", idx)) + } + if maxRowsToConsume > 0 { + selectQuery.WriteString(fmt.Sprintf(" FETCH FIRST %d ROWS ONLY", maxRowsToConsume)) + } + // Setup a statement with the query and a call back to validate the result + // set. + stmt = makeOpStmt(OpStmtDML) + stmt.sql = selectQuery.String() + stmt.queryResultCallback = func(ctx context.Context, rows pgx.Rows) error { + // Only read rows from the select for up to a minute. + const MaxTimeForRead = time.Minute + startTime := timeutil.Now() + defer rows.Close() + for rows.Next() && timeutil.Since(startTime) < MaxTimeForRead { + // Detect if the context is cancelled while processing + // the result set. + if err = ctx.Err(); err != nil { + return err + } + rawValues := rows.RawValues() + if len(rawValues) != numColumnsToSelect { + return errors.AssertionFailedf("query returned incorrect number of columns. "+ + "Got: %d Expected:%d", + len(rawValues), + totalColumns) + } + } + if err := rows.Err(); err != nil { + return err + } + return nil + } + codesWithConditions{ + {code: pgcode.UndefinedTable, condition: !allTableExists}, + }.add(stmt.expectedExecErrors) + // TODO(fqazi): Temporarily allow out of memory errors on select queries. Not + // sure where we are hitting these, need to investigate further. + stmt.potentialExecErrors.add(pgcode.OutOfMemory) + return stmt, nil } // pctExisting is used to specify the probability that a name exists when getting a random name. It diff --git a/pkg/workload/schemachange/optype_string.go b/pkg/workload/schemachange/optype_string.go index f03e92e622cd..8a11ff323f77 100644 --- a/pkg/workload/schemachange/optype_string.go +++ b/pkg/workload/schemachange/optype_string.go @@ -42,12 +42,13 @@ func _() { _ = x[setColumnType-31] _ = x[survive-32] _ = x[insertRow-33] - _ = x[validate-34] + _ = x[selectStmt-34] + _ = x[validate-35] } -const _opType_name = "addColumnaddConstraintaddForeignKeyConstraintaddRegionaddUniqueConstraintalterTableLocalitycreateIndexcreateSequencecreateTablecreateTableAscreateViewcreateEnumcreateSchemadropColumndropColumnDefaultdropColumnNotNulldropColumnStoreddropConstraintdropIndexdropSequencedropTabledropViewdropSchemaprimaryRegionrenameColumnrenameIndexrenameSequencerenameTablerenameViewsetColumnDefaultsetColumnNotNullsetColumnTypesurviveinsertRowvalidate" +const _opType_name = "addColumnaddConstraintaddForeignKeyConstraintaddRegionaddUniqueConstraintalterTableLocalitycreateIndexcreateSequencecreateTablecreateTableAscreateViewcreateEnumcreateSchemadropColumndropColumnDefaultdropColumnNotNulldropColumnStoreddropConstraintdropIndexdropSequencedropTabledropViewdropSchemaprimaryRegionrenameColumnrenameIndexrenameSequencerenameTablerenameViewsetColumnDefaultsetColumnNotNullsetColumnTypesurviveinsertRowselectStmtvalidate" -var _opType_index = [...]uint16{0, 9, 22, 45, 54, 73, 91, 102, 116, 127, 140, 150, 160, 172, 182, 199, 216, 232, 246, 255, 267, 276, 284, 294, 307, 319, 330, 344, 355, 365, 381, 397, 410, 417, 426, 434} +var _opType_index = [...]uint16{0, 9, 22, 45, 54, 73, 91, 102, 116, 127, 140, 150, 160, 172, 182, 199, 216, 232, 246, 255, 267, 276, 284, 294, 307, 319, 330, 344, 355, 365, 381, 397, 410, 417, 426, 436, 444} func (i opType) String() string { if i < 0 || i >= opType(len(_opType_index)-1) { diff --git a/pkg/workload/schemachange/schemachange.go b/pkg/workload/schemachange/schemachange.go index 092712db5c30..93f3d1240e3c 100644 --- a/pkg/workload/schemachange/schemachange.go +++ b/pkg/workload/schemachange/schemachange.go @@ -321,7 +321,8 @@ func (w *schemaChangeWorker) getErrorState() string { "Executed queries for generating errors: %s"+ "==========================="+ "Previous statements %s", - w.opGen.expectedExecErrors.String(), + "", + // w.opGen.expectedExecErrors.String(), w.opGen.potentialExecErrors.String(), w.opGen.expectedCommitErrors.String(), w.opGen.potentialCommitErrors.String(), @@ -353,6 +354,9 @@ func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error { } else if err != nil && errors.Is(err, errRunInTxnRbkSentinel) { // Error was already marked for us. return err + } else if errors.Is(err, context.DeadlineExceeded) { + // Deadline was encountered while generating the operation, so bail out. + return errors.Mark(err, errRunInTxnRbkSentinel) } else if err != nil { return errors.Mark( errors.Wrapf(err, "***UNEXPECTED ERROR; Failed to generate a random operation\n OpGen log: \n%s\nStmts: \n%s\n", @@ -363,20 +367,16 @@ func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error { ) } - w.logger.addExpectedErrors(w.opGen.expectedExecErrors, w.opGen.expectedCommitErrors) - w.logger.writeLog(op) + w.logger.addExpectedErrors(op.expectedExecErrors, w.opGen.expectedCommitErrors) + w.logger.writeLog(op.String()) if !w.dryRun { start := timeutil.Now() - - if _, err = tx.Exec(ctx, op); err != nil { + err := op.executeStmt(ctx, tx, w.opGen) + if err != nil { // If the error not an instance of pgconn.PgError, then it is unexpected. pgErr := new(pgconn.PgError) if !errors.As(err, &pgErr) { - return errors.Mark( - errors.Wrapf(err, "***UNEXPECTED ERROR; Received a non pg error. %s", - w.getErrorState()), - errRunInTxnFatalSentinel, - ) + return err } // Transaction retry errors are acceptable. Allow the transaction @@ -388,33 +388,8 @@ func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error { errRunInTxnRbkSentinel, ) } - - // Screen for any unexpected errors. - if !w.opGen.expectedExecErrors.contains(pgcode.MakeCode(pgErr.Code)) && - !w.opGen.potentialExecErrors.contains(pgcode.MakeCode(pgErr.Code)) { - return errors.Mark( - errors.Wrapf(err, "***UNEXPECTED ERROR; Received an unexpected execution error. %s", - w.getErrorState()), - errRunInTxnFatalSentinel, - ) - } - - // Rollback because the error was anticipated. - w.recordInHist(timeutil.Since(start), txnRollback) - return errors.Mark( - errors.Wrapf(err, "ROLLBACK; Successfully got expected execution error. %s", - w.getErrorState()), - errRunInTxnRbkSentinel, - ) + return err } - if !w.opGen.expectedExecErrors.empty() { - return errors.Mark( - errors.Newf("***FAIL; Failed to receive an execution error when errors were expected. %s", - w.getErrorState()), - errRunInTxnFatalSentinel, - ) - } - fmt.Printf("OK: %s", op) w.recordInHist(timeutil.Since(start), operationOk) } } @@ -431,6 +406,11 @@ func (w *schemaChangeWorker) run(ctx context.Context) error { defer w.releaseLocksIfHeld() // Run between 1 and maxOpsPerWorker schema change operations. + watchDog := newSchemaChangeWatchDog(w.pool.Get(), w.logger) + if err := watchDog.Start(ctx, tx); err != nil { + return errors.Wrapf(err, "unable to start watch dog") + } + defer watchDog.Stop() start := timeutil.Now() w.opGen.resetTxnState() err = w.runInTxn(ctx, tx) @@ -459,7 +439,6 @@ func (w *schemaChangeWorker) run(ctx context.Context) error { return errors.Wrapf(err, "***UNEXPECTED ERROR") } } - w.logger.writeLog("COMMIT") if err = tx.Commit(ctx); err != nil { // If the error not an instance of pgconn.PgError, then it is unexpected. @@ -509,7 +488,6 @@ func (w *schemaChangeWorker) run(ctx context.Context) error { w.logger.flushLog(tx, "COMMIT; Successfully got expected commit error") return nil } - if !w.opGen.expectedCommitErrors.empty() { err := errors.Newf("***FAIL; Failed to receive a commit error when at least one commit error was expected %s", w.getErrorState()) w.logger.flushLog(tx, err.Error()) @@ -638,6 +616,29 @@ func (l *logger) flushLogAndLock(_ pgx.Tx, message string, stdout bool) { l.currentLogEntry.mu.entry = nil } +// logWatchDog used by the watch dog to log entries on behalf of the current +// worker. +func (l *logger) logWatchDog(entry string) { + logEntry := LogEntry{ + ClientTimestamp: timeutil.Now().Format("15:04:05.999999"), + Ops: nil, + Message: fmt.Sprintf("WATCH DOG: %s", entry), + } + jsonBytes, err := json.MarshalIndent(logEntry, "", " ") + if err != nil { + return + } + l.stdoutLog.printLn(string(jsonBytes)) + if l.artifactsLog != nil { + var jsonBuf bytes.Buffer + err = json.Compact(&jsonBuf, jsonBytes) + if err != nil { + return + } + l.artifactsLog.printLn(jsonBuf.String()) + } +} + type logger struct { verbose int currentLogEntry *struct { diff --git a/pkg/workload/schemachange/watch_dog.go b/pkg/workload/schemachange/watch_dog.go new file mode 100644 index 000000000000..da021dc5d3f6 --- /dev/null +++ b/pkg/workload/schemachange/watch_dog.go @@ -0,0 +1,141 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package schemachange + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/errors" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" +) + +// schemaChangeWatchDog connection watch dog object. +type schemaChangeWatchDog struct { + // conn used to monitor the target session. + conn *pgxpool.Pool + // logger used for logging for connections. + logger *logger + // sessionID the session we should be monitoring. + sessionID string + // cmdChannel used to communicate from thread executing commands on the session. + cmdChannel chan chan struct{} + // activeQuery last active query observed on the monitored connection. + activeQuery string + // txnID last active transaction ID observed on the target connection. + txnID string + // numRetries number of transaction retries observed. + numRetries int +} + +// newSchemaChangeWatchDog constructs a new watch dog for monitoring +// a single connection. +func newSchemaChangeWatchDog(conn *pgxpool.Pool, logger *logger) *schemaChangeWatchDog { + return &schemaChangeWatchDog{ + conn: conn, + cmdChannel: make(chan chan struct{}), + logger: logger, + } +} + +// isConnectionActive checks if a connection is alive and making progress, +// towards completing its current operations. This includes making sure +// data is being read or transactions are being retried. Returns true +// when progress is detected. +func (w *schemaChangeWatchDog) isConnectionActive(ctx context.Context) bool { + // Scan the session to make sure progress is being made first. + sessionInfo := w.conn.QueryRow(ctx, + "SELECT active_queries, kv_txn FROM crdb_internal.cluster_sessions WHERE session_id = $1", + w.sessionID) + lastTxnID := w.txnID + lastActiveQuery := w.activeQuery + if err := sessionInfo.Scan(&w.activeQuery, &w.txnID); err != nil { + w.logger.logWatchDog(fmt.Sprintf("failed to get session information: %v\n", err)) + return false + } + if w.activeQuery != lastActiveQuery { + return true + } + lastNumRetries := w.numRetries + txnInfo := w.conn.QueryRow(ctx, + "SELECT sum(num_retries) + sum(num_auto_retries) FROM crdb_internal.cluster_transactions WHERE id=$1", + &w.txnID) + if err := txnInfo.Scan(&w.numRetries); err != nil { + w.logger.logWatchDog(fmt.Sprintf("failed to get transaction information: %v\n", err)) + return false + } + if lastTxnID != w.txnID || + lastNumRetries != w.numRetries { + return true + } + return false +} + +// watchLoop monitors the connection until either observed work is finished, +// or a timeout is hit. +func (w *schemaChangeWatchDog) watchLoop(ctx context.Context) { + const maxTimeOutForDump = time.Second * 300 + var totalTimeWaited time.Duration = 0 + for { + select { + case responseChannel := <-w.cmdChannel: + // Only command is to stop. + close(responseChannel) + return + case <-ctx.Done(): + panic("dumping stacks, we failed to terminate threads on time.") + case <-time.After(time.Second): + // If the connection is making progress, the watch dog timer can be reset + // again. + if w.isConnectionActive(ctx) { + totalTimeWaited = 0 + } + totalTimeWaited += time.Second + if totalTimeWaited > maxTimeOutForDump { + panic(fmt.Sprintf("connection has timed out %v", w)) + } + } + } +} + +// Start starts monitoring the given transaction, as a part of this process, +// any required session information will be collected. +func (w *schemaChangeWatchDog) Start(ctx context.Context, tx pgx.Tx) error { + sessionInfo := tx.QueryRow(ctx, "SELECT session_id FROM [SHOW session_id]") + if sessionInfo == nil { + return errors.AssertionFailedf("unable to retrieve session id on connection") + } + err := sessionInfo.Scan(&w.sessionID) + if err != nil { + return err + } + // Start up the session watch loop. + go w.watchLoop(ctx) + return nil +} + +// reset prepares the watch dog for re-use. +func (w *schemaChangeWatchDog) reset() { + w.sessionID = "" + w.activeQuery = "" + w.txnID = "" +} + +// Stop stops monitoring the connection and waits for the watch dog thread to +// return. +func (w *schemaChangeWatchDog) Stop() { + replyChan := make(chan struct{}) + w.cmdChannel <- replyChan + <-replyChan + w.reset() +}