Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workload/schemachanger: re-enable the schema changer workload #88085

Merged
merged 5 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ccl/testccl/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "schemachange_test",
size = "large",
srcs = [
"main_test.go",
"schema_change_external_test.go",
],
args = ["-test.timeout=295s"],
args = ["-test.timeout=895s"],
data = [
"//c-deps:libgeos",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func TestWorkload(t *testing.T) {
defer leaktest.AfterTest(t)()
defer utilccl.TestingEnableEnterprise()()
skip.WithIssue(t, 78478)
skip.UnderRace(t, "test connections can be too slow under race option.")

dir := t.TempDir()
ctx := context.Background()
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestWorkload(t *testing.T) {
ql, err := wl.Ops(ctx, []string{pgURL.String()}, reg)
require.NoError(t, err)

const N = 100
const N = 800
workerFn := func(ctx context.Context, fn func(ctx context.Context) error) func() error {
return func() error {
for i := 0; i < N; i++ {
Expand Down
3 changes: 0 additions & 3 deletions pkg/cmd/roachtest/tests/schemachange_random_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ func registerSchemaChangeRandomLoad(r registry.Registry) {
spec.Geo(),
spec.Zones(geoZonesStr),
),
// This is set while development is still happening on the workload and we
// fix (or bypass) minor schema change bugs that are discovered.
NonReleaseBlocker: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
maxOps := 5000
concurrency := 20
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/resolve_oid.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -77,7 +78,12 @@ func resolveOID(
results, err := ie.QueryRowEx(ctx, "queryOid", txn,
sessiondata.NoSessionDataOverride, q, toResolve)
if err != nil {
if errors.HasType(err, (*tree.MultipleResultsError)(nil)) {
if catalog.HasInactiveDescriptorError(err) {
// Descriptor is either dropped or offline, so
// the OID does not exist.
return nil, true, pgerror.Newf(info.errType,
"%s %s does not exist", info.objName, toResolve)
} else if errors.HasType(err, (*tree.MultipleResultsError)(nil)) {
return nil, false, pgerror.Newf(pgcode.AmbiguousAlias,
"more than one %s named %s", info.objName, toResolve)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,14 +560,20 @@ SELECT count(*) > 0
return nil
}

func getValidGenerationErrors() errorCodeSet {
return errorCodeSet{
pgcode.NumericValueOutOfRange: true,
pgcode.FloatingPointException: true,
pgcode.InvalidTextRepresentation: true,
}
}

// isValidGenerationError these codes can be observed when evaluating values
// for generated expressions. These are errors are not ignored, but added into
// the expected set of errors.
func isValidGenerationError(code string) bool {
pgCode := pgcode.MakeCode(code)
return pgCode == pgcode.NumericValueOutOfRange ||
pgCode == pgcode.FloatingPointException ||
pgCode == pgcode.InvalidTextRepresentation
return getValidGenerationErrors().contains(pgCode)
}

// validateGeneratedExpressionsForInsert goes through generated expressions and
Expand Down
15 changes: 10 additions & 5 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type operationGeneratorParams struct {
// The OperationBuilder has the sole responsibility of generating ops
type operationGenerator struct {
params *operationGeneratorParams
potentialExecErrors errorCodeSet
expectedCommitErrors errorCodeSet
potentialCommitErrors errorCodeSet

Expand Down Expand Up @@ -103,7 +102,6 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato
return &operationGenerator{
params: params,
expectedCommitErrors: makeExpectedErrorSet(),
potentialExecErrors: makeExpectedErrorSet(),
potentialCommitErrors: makeExpectedErrorSet(),
candidateExpectedCommitErrors: makeExpectedErrorSet(),
}
Expand All @@ -112,7 +110,6 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato
// Reset internal state used per operation within a transaction
func (og *operationGenerator) resetOpState() {
og.candidateExpectedCommitErrors.reset()
og.potentialExecErrors.reset()
og.opGenLog = strings.Builder{}
}

Expand Down Expand Up @@ -881,7 +878,7 @@ func (og *operationGenerator) addForeignKeyConstraint(
// perfectly if an error is expected. We can confirm post transaction with a time
// travel query.
_ = rowsSatisfyConstraint
og.potentialExecErrors.add(pgcode.ForeignKeyViolation)
stmt.potentialExecErrors.add(pgcode.ForeignKeyViolation)
og.potentialCommitErrors.add(pgcode.ForeignKeyViolation)

// It's possible for the table to be dropped concurrently, while we are running
Expand Down Expand Up @@ -1338,6 +1335,14 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op
{code: pgcode.DuplicateColumn, condition: duplicateColumns},
}.add(opStmt.expectedExecErrors)

// Confirm the select itself doesn't run into any column generation errors,
// by executing it independently first until we add validation when adding
// generated columns. See issue: #81698?, which will allow us to remove this
// logic in the future.
if opStmt.expectedExecErrors.empty() {
opStmt.potentialExecErrors.merge(getValidGenerationErrors())
}

opStmt.sql = fmt.Sprintf(`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY`,
destTableName, selectStatement.String(), MaxRowsToConsume)
return opStmt, nil
Expand Down Expand Up @@ -2404,7 +2409,7 @@ func (og *operationGenerator) insertRow(ctx context.Context, tx pgx.Tx) (stmt *o
return nil, err
}
// We may have errors that are possible, but not guaranteed.
potentialErrors.add(og.potentialExecErrors)
potentialErrors.add(stmt.potentialExecErrors)
if invalidInsert {
generatedErrors.add(stmt.expectedExecErrors)
// We will be pessimistic and assume that other column related errors can
Expand Down
5 changes: 0 additions & 5 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,12 @@ func (w *schemaChangeWorker) recordInHist(elapsed time.Duration, bin histBin) {

func (w *schemaChangeWorker) getErrorState() string {
return fmt.Sprintf("Dumping state before death:\n"+
"Expected errors: %s"+
"Potential exec errors: %s"+
"Expected commit errors: %s"+
"Potential commit errors: %s"+
"==========================="+
"Executed queries for generating errors: %s"+
"==========================="+
"Previous statements %s",
"",
// w.opGen.expectedExecErrors.String(),
w.opGen.potentialExecErrors.String(),
w.opGen.expectedCommitErrors.String(),
w.opGen.potentialCommitErrors.String(),
w.opGen.GetOpGenLog(),
Expand Down
10 changes: 9 additions & 1 deletion pkg/workload/schemachange/watch_dog.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,15 @@ func (w *schemaChangeWatchDog) watchLoop(ctx context.Context) {
close(responseChannel)
return
case <-ctx.Done():
panic("dumping stacks, we failed to terminate threads on time.")
// Give the connections a small amount of time to clean up, if they fail
// to do so, we will dump stacks.
select {
case <-w.cmdChannel:
return
case <-time.After(time.Second * 4):
panic("dumping stacks, we failed to terminate threads on time.")

}
case <-time.After(time.Second):
// If the connection is making progress, the watch dog timer can be reset
// again.
Expand Down