Skip to content

Commit

Permalink
Merge #88537
Browse files Browse the repository at this point in the history
88537: workload/schemachanger: re-enable the schema changer workload r=fqazi a=fqazi

Fixes: #82133

Previously the schema changer workload was disabled because of two intermittent issues:

The workload itself had an issue due to a recent refactor that did not properly detect/handle potential execution errors. While the set of errors was generated it was never used properly in some cases after a recent refactor
Some scenarios involving inserts could fail binding OID's inside the resolveOID code path, specifically this code path does not properly handle dropped descriptors. These changes enforces the old behaviour where the resolution should just fail.

Note: This is the same as the previously merged PR (#88085 on 22.2)

Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Oct 28, 2022
2 parents b76537e + 85904e5 commit 068845f
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 18 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/testccl/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "schemachange_test",
size = "large",
srcs = [
"main_test.go",
"schema_change_external_test.go",
],
args = ["-test.timeout=295s"],
args = ["-test.timeout=895s"],
data = [
"//c-deps:libgeos",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func TestWorkload(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()
skip.WithIssue(t, 78478)
skip.UnderRace(t, "test connections can be too slow under race option.")

dir := t.TempDir()
ctx := context.Background()
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestWorkload(t *testing.T) {
ql, err := wl.Ops(ctx, []string{pgURL.String()}, reg)
require.NoError(t, err)

const N = 100
const N = 800
workerFn := func(ctx context.Context, fn func(ctx context.Context) error) func() error {
return func() error {
for i := 0; i < N; i++ {
Expand Down
3 changes: 0 additions & 3 deletions pkg/cmd/roachtest/tests/schemachange_random_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ func registerSchemaChangeRandomLoad(r registry.Registry) {
spec.Zones(geoZonesStr),
),
NativeLibs: registry.LibGEOS,
// This is set while development is still happening on the workload and we
// fix (or bypass) minor schema change bugs that are discovered.
NonReleaseBlocker: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
maxOps := 5000
concurrency := 20
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/resolve_oid.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -77,7 +78,12 @@ func resolveOID(
results, err := ie.QueryRowEx(ctx, "queryOid", txn,
sessiondata.NoSessionDataOverride, q, toResolve)
if err != nil {
if errors.HasType(err, (*tree.MultipleResultsError)(nil)) {
if catalog.HasInactiveDescriptorError(err) {
// Descriptor is either dropped or offline, so
// the OID does not exist.
return nil, true, pgerror.Newf(info.errType,
"%s %s does not exist", info.objName, toResolve)
} else if errors.HasType(err, (*tree.MultipleResultsError)(nil)) {
return nil, false, pgerror.Newf(pgcode.AmbiguousAlias,
"more than one %s named %s", info.objName, toResolve)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,14 +562,20 @@ SELECT count(*) > 0
return nil
}

func getValidGenerationErrors() errorCodeSet {
return errorCodeSet{
pgcode.NumericValueOutOfRange: true,
pgcode.FloatingPointException: true,
pgcode.InvalidTextRepresentation: true,
}
}

// isValidGenerationError these codes can be observed when evaluating values
// for generated expressions. These are errors are not ignored, but added into
// the expected set of errors.
func isValidGenerationError(code string) bool {
pgCode := pgcode.MakeCode(code)
return pgCode == pgcode.NumericValueOutOfRange ||
pgCode == pgcode.FloatingPointException ||
pgCode == pgcode.InvalidTextRepresentation
return getValidGenerationErrors().contains(pgCode)
}

// validateGeneratedExpressionsForInsert goes through generated expressions and
Expand Down
17 changes: 11 additions & 6 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type operationGeneratorParams struct {
// The OperationBuilder has the sole responsibility of generating ops
type operationGenerator struct {
params *operationGeneratorParams
potentialExecErrors errorCodeSet
expectedCommitErrors errorCodeSet
potentialCommitErrors errorCodeSet

Expand Down Expand Up @@ -126,7 +125,6 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato
return &operationGenerator{
params: params,
expectedCommitErrors: makeExpectedErrorSet(),
potentialExecErrors: makeExpectedErrorSet(),
potentialCommitErrors: makeExpectedErrorSet(),
candidateExpectedCommitErrors: makeExpectedErrorSet(),
}
Expand All @@ -135,7 +133,6 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato
// Reset internal state used per operation within a transaction
func (og *operationGenerator) resetOpState() {
og.candidateExpectedCommitErrors.reset()
og.potentialExecErrors.reset()
}

// Reset internal state used per transaction
Expand Down Expand Up @@ -903,7 +900,7 @@ func (og *operationGenerator) addForeignKeyConstraint(
// perfectly if an error is expected. We can confirm post transaction with a time
// travel query.
_ = rowsSatisfyConstraint
og.potentialExecErrors.add(pgcode.ForeignKeyViolation)
stmt.potentialExecErrors.add(pgcode.ForeignKeyViolation)
og.potentialCommitErrors.add(pgcode.ForeignKeyViolation)

// It's possible for the table to be dropped concurrently, while we are running
Expand Down Expand Up @@ -1418,6 +1415,14 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op
{code: pgcode.DuplicateColumn, condition: duplicateColumns},
}.add(opStmt.expectedExecErrors)

// Confirm the select itself doesn't run into any column generation errors,
// by executing it independently first until we add validation when adding
// generated columns. See issue: #81698?, which will allow us to remove this
// logic in the future.
if opStmt.expectedExecErrors.empty() {
opStmt.potentialExecErrors.merge(getValidGenerationErrors())
}

opStmt.sql = fmt.Sprintf(`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY`,
destTableName, selectStatement.String(), MaxRowsToConsume)
return opStmt, nil
Expand Down Expand Up @@ -2496,7 +2501,7 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *o
return nil, err
}
// We may have errors that are possible, but not guaranteed.
potentialErrors.add(og.potentialExecErrors)
potentialErrors.add(stmt.potentialExecErrors)
if invalidInsert {
generatedErrors.add(stmt.expectedExecErrors)
// We will be pessimistic and assume that other column related errors can
Expand Down Expand Up @@ -3607,7 +3612,7 @@ func (og *operationGenerator) typeFromTypeName(
return nil, errors.Wrapf(err, "typeFromTypeName: %s", typeName)
}
typ, err := tree.ResolveType(
context.Background(),
ctx,
stmt.AST.(*tree.Select).Select.(*tree.SelectClause).Exprs[0].Expr.(*tree.CastExpr).Type,
&txTypeResolver{tx: tx},
)
Expand Down
1 change: 0 additions & 1 deletion pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ func (w *schemaChangeWorker) WrapWithErrorState(err error) error {
}
return &ErrorState{
cause: err,
PotentialErrors: w.opGen.potentialExecErrors.StringSlice(),
PotentialCommitErrors: w.opGen.potentialCommitErrors.StringSlice(),
ExpectedCommitErrors: w.opGen.expectedCommitErrors.StringSlice(),
QueriesForGeneratingErrors: w.opGen.GetOpGenLog(),
Expand Down
11 changes: 10 additions & 1 deletion pkg/workload/schemachange/watch_dog.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,16 @@ func (w *schemaChangeWatchDog) watchLoop(ctx context.Context) {
close(responseChannel)
return
case <-ctx.Done():
panic("dumping stacks, we failed to terminate threads on time.")
// Give the connections a small amount of time to clean up, if they fail
// to do so, we will dump stacks.
select {
case responseChannel := <-w.cmdChannel:
close(responseChannel)
return
case <-time.After(time.Second * 4):
panic("dumping stacks, we failed to terminate threads on time.")

}
case <-time.After(time.Second):
// If the connection is making progress, the watch dog timer can be reset
// again.
Expand Down

0 comments on commit 068845f

Please sign in to comment.