Skip to content

Commit

Permalink
Merge #88085
Browse files Browse the repository at this point in the history
88085: workload/schemachanger: re-enable the schema changer workload r=fqazi a=fqazi

Previously the schema changer workload was disabled because of two intermittent issues:

1.  The workload itself had an issue due to a recent refactor that did not properly detect/handle potential execution errors.  While the set of errors was generated it was never used properly in some cases after a recent refactor
2. Some scenarios involving inserts could fail binding OID's inside the resolveOID code path, specifically this code path does not properly handle dropped descriptors. These changes enforces the old behaviour where the resolution should just *fail*.

Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Sep 22, 2022
2 parents ae2278a + 2abce71 commit a33d71d
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/testccl/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "schemachange_test",
size = "large",
srcs = [
"main_test.go",
"schema_change_external_test.go",
],
args = ["-test.timeout=295s"],
args = ["-test.timeout=895s"],
data = [
"//c-deps:libgeos",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func TestWorkload(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()
skip.WithIssue(t, 78478)
skip.UnderRace(t, "test connections can be too slow under race option.")

dir := t.TempDir()
ctx := context.Background()
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestWorkload(t *testing.T) {
ql, err := wl.Ops(ctx, []string{pgURL.String()}, reg)
require.NoError(t, err)

const N = 100
const N = 800
workerFn := func(ctx context.Context, fn func(ctx context.Context) error) func() error {
return func() error {
for i := 0; i < N; i++ {
Expand Down
3 changes: 0 additions & 3 deletions pkg/cmd/roachtest/tests/schemachange_random_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ func registerSchemaChangeRandomLoad(r registry.Registry) {
spec.Geo(),
spec.Zones(geoZonesStr),
),
// This is set while development is still happening on the workload and we
// fix (or bypass) minor schema change bugs that are discovered.
NonReleaseBlocker: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
maxOps := 5000
concurrency := 20
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/resolve_oid.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -77,7 +78,12 @@ func resolveOID(
results, err := ie.QueryRowEx(ctx, "queryOid", txn,
sessiondata.NoSessionDataOverride, q, toResolve)
if err != nil {
if errors.HasType(err, (*tree.MultipleResultsError)(nil)) {
if catalog.HasInactiveDescriptorError(err) {
// Descriptor is either dropped or offline, so
// the OID does not exist.
return nil, true, pgerror.Newf(info.errType,
"%s %s does not exist", info.objName, toResolve)
} else if errors.HasType(err, (*tree.MultipleResultsError)(nil)) {
return nil, false, pgerror.Newf(pgcode.AmbiguousAlias,
"more than one %s named %s", info.objName, toResolve)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,20 @@ SELECT count(*) > 0
return nil
}

func getValidGenerationErrors() errorCodeSet {
return errorCodeSet{
pgcode.NumericValueOutOfRange: true,
pgcode.FloatingPointException: true,
pgcode.InvalidTextRepresentation: true,
}
}

// isValidGenerationError these codes can be observed when evaluating values
// for generated expressions. These are errors are not ignored, but added into
// the expected set of errors.
func isValidGenerationError(code string) bool {
pgCode := pgcode.MakeCode(code)
return pgCode == pgcode.NumericValueOutOfRange ||
pgCode == pgcode.FloatingPointException ||
pgCode == pgcode.InvalidTextRepresentation
return getValidGenerationErrors().contains(pgCode)
}

// validateGeneratedExpressionsForInsert goes through generated expressions and
Expand Down
15 changes: 10 additions & 5 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type operationGeneratorParams struct {
// The OperationBuilder has the sole responsibility of generating ops
type operationGenerator struct {
params *operationGeneratorParams
potentialExecErrors errorCodeSet
expectedCommitErrors errorCodeSet
potentialCommitErrors errorCodeSet

Expand Down Expand Up @@ -103,7 +102,6 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato
return &operationGenerator{
params: params,
expectedCommitErrors: makeExpectedErrorSet(),
potentialExecErrors: makeExpectedErrorSet(),
potentialCommitErrors: makeExpectedErrorSet(),
candidateExpectedCommitErrors: makeExpectedErrorSet(),
}
Expand All @@ -112,7 +110,6 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato
// Reset internal state used per operation within a transaction
func (og *operationGenerator) resetOpState() {
og.candidateExpectedCommitErrors.reset()
og.potentialExecErrors.reset()
og.opGenLog = strings.Builder{}
}

Expand Down Expand Up @@ -881,7 +878,7 @@ func (og *operationGenerator) addForeignKeyConstraint(
// perfectly if an error is expected. We can confirm post transaction with a time
// travel query.
_ = rowsSatisfyConstraint
og.potentialExecErrors.add(pgcode.ForeignKeyViolation)
stmt.potentialExecErrors.add(pgcode.ForeignKeyViolation)
og.potentialCommitErrors.add(pgcode.ForeignKeyViolation)

// It's possible for the table to be dropped concurrently, while we are running
Expand Down Expand Up @@ -1338,6 +1335,14 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op
{code: pgcode.DuplicateColumn, condition: duplicateColumns},
}.add(opStmt.expectedExecErrors)

// Confirm the select itself doesn't run into any column generation errors,
// by executing it independently first until we add validation when adding
// generated columns. See issue: #81698?, which will allow us to remove this
// logic in the future.
if opStmt.expectedExecErrors.empty() {
opStmt.potentialExecErrors.merge(getValidGenerationErrors())
}

opStmt.sql = fmt.Sprintf(`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY`,
destTableName, selectStatement.String(), MaxRowsToConsume)
return opStmt, nil
Expand Down Expand Up @@ -2404,7 +2409,7 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *o
return nil, err
}
// We may have errors that are possible, but not guaranteed.
potentialErrors.add(og.potentialExecErrors)
potentialErrors.add(stmt.potentialExecErrors)
if invalidInsert {
generatedErrors.add(stmt.expectedExecErrors)
// We will be pessimistic and assume that other column related errors can
Expand Down
5 changes: 0 additions & 5 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,12 @@ func (w *schemaChangeWorker) recordInHist(elapsed time.Duration, bin histBin) {

func (w *schemaChangeWorker) getErrorState() string {
return fmt.Sprintf("Dumping state before death:\n"+
"Expected errors: %s"+
"Potential exec errors: %s"+
"Expected commit errors: %s"+
"Potential commit errors: %s"+
"==========================="+
"Executed queries for generating errors: %s"+
"==========================="+
"Previous statements %s",
"",
// w.opGen.expectedExecErrors.String(),
w.opGen.potentialExecErrors.String(),
w.opGen.expectedCommitErrors.String(),
w.opGen.potentialCommitErrors.String(),
w.opGen.GetOpGenLog(),
Expand Down
10 changes: 9 additions & 1 deletion pkg/workload/schemachange/watch_dog.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,15 @@ func (w *schemaChangeWatchDog) watchLoop(ctx context.Context) {
close(responseChannel)
return
case <-ctx.Done():
panic("dumping stacks, we failed to terminate threads on time.")
// Give the connections a small amount of time to clean up, if they fail
// to do so, we will dump stacks.
select {
case <-w.cmdChannel:
return
case <-time.After(time.Second * 4):
panic("dumping stacks, we failed to terminate threads on time.")

}
case <-time.After(time.Second):
// If the connection is making progress, the watch dog timer can be reset
// again.
Expand Down

0 comments on commit a33d71d

Please sign in to comment.