Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107825: sql: fix rare flake in TestDistSQLFlowsVirtualTables r=yuzefovich a=yuzefovich

This commit fixes a rare flake in `TestDistSQLFlowsVirtualTables`. The test previously asserted that only the target query was present in `*_distsql_flows` virtual tables, but we just saw a case where an internal query showed up in there (related to table stats cache). This assertion was too strict and unnecessary - the test really cares whether the target query is present or not, so this commit removes the assertion.

Fixes: #107769.

Release note: None

108053: sql: unskip TestSchemaChangePurgeFailure r=chengxiong-ruan a=chengxiong-ruan

Informs #51796

There has been a lot of changes since the test was first written and looks some of integer variables are not used to control the schema changer flow at all. Specifically the concept of "Async Schema Changer" was misleading. This commit changes the test to test the purging behavior by comparing the key counts before and after rollback. We don't need to test how many attempts.

Release note: None
Release justification: test only change

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Chengxiong Ruan <[email protected]>
  • Loading branch information
3 people committed Aug 3, 2023
3 parents 83a56ea + 5113599 + 02d937c commit 4fe2a80
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 115 deletions.
12 changes: 4 additions & 8 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,15 +636,11 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
const clusterScope = "cluster"
const nodeScope = "node"
getNum := func(db *sqlutils.SQLRunner, scope string) int {
querySuffix := fmt.Sprintf("FROM crdb_internal.%s_distsql_flows", scope)
// Check that all remote flows (if any) correspond to the expected
// statement.
stmts := db.QueryStr(t, "SELECT stmt "+querySuffix)
for _, stmt := range stmts {
require.Equal(t, query, stmt[0])
}
// Count the number of flows matching our target query. Note that there
// could be other flows in the table for the internal operations.
countQuery := fmt.Sprintf("SELECT count(*) FROM crdb_internal.%s_distsql_flows WHERE stmt = '%s'", scope, query)
var num int
db.QueryRow(t, "SELECT count(*) "+querySuffix).Scan(&num)
db.QueryRow(t, countQuery).Scan(&num)
return num
}
for nodeID := 0; nodeID < numNodes; nodeID++ {
Expand Down
199 changes: 92 additions & 107 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -1587,125 +1588,109 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
func TestSchemaChangePurgeFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 51796)
// TODO (lucy): This test needs more complicated schema changer knobs than
// currently implemented. Previously this test disabled the async schema
// changer so that we don't retry the cleanup of the failed schema change
// until a certain point in the test.
params, _ := createTestServerParams()
const chunkSize = 200
var enableAsyncSchemaChanges uint32
var attempts int32
// attempt 1: write the first chunk of the index.
// attempt 2: write the second chunk and hit a unique constraint
// violation; purge the schema change.
// attempt 3: return an error while purging the schema change.
var expectedAttempts int32 = 3
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
BackfillChunkSize: chunkSize,
},
DistSQL: &execinfra.TestingKnobs{
RunBeforeBackfillChunk: func(sp roachpb.Span) error {
// Return a deadline exceeded error during the third attempt
// which attempts to clean up the schema change.
if atomic.AddInt32(&attempts, 1) == expectedAttempts {
// Disable the async schema changer for assertions.
atomic.StoreUint32(&enableAsyncSchemaChanges, 0)
return context.DeadlineExceeded
}
return nil
},
BulkAdderFlushesEveryBatch: true,
},
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())
codec := server.ApplicationLayer().Codec()

// Disable strict GC TTL enforcement because we're going to shove a zero-value
// TTL into the system with AddImmediateGCZoneConfig.
defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)()

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
`); err != nil {
t.Fatal(err)
}

// Bulk insert.
const maxValue = chunkSize + 1
if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

// Add a row with a duplicate value=0 which is the same
// value as for the key maxValue.
if _, err := sqlDB.Exec(
`INSERT INTO t.test VALUES ($1, $2)`, maxValue+1, 0,
); err != nil {
t.Fatal(err)
}

// A schema change that violates integrity constraints.
if _, err := sqlDB.Exec(
"CREATE UNIQUE INDEX foo ON t.test (v)",
); !testutils.IsError(err, `violates unique constraint "foo"`) {
t.Fatal(err)
}
for _, schemaChangerSetup := range []string{
"SET use_declarative_schema_changer='off'",
"SET use_declarative_schema_changer='on'",
} {
params, _ := createTestServerParams()
const chunkSize = 200

// The index doesn't exist
if _, err := sqlDB.Query(
`SELECT v from t.test@foo`,
); !testutils.IsError(err, "index .* not found") {
t.Fatal(err)
}
var getKeyCount func() (int, error)
countBeforeRollback := 0
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeOnFailOrCancel: func(jobID jobspb.JobID) error {
cnt, err := getKeyCount()
if err != nil {
return err
}
countBeforeRollback = cnt
return nil
},
},
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
RunBeforeMakingPostCommitPlan: func(inRollback bool) error {
if inRollback {
cnt, err := getKeyCount()
if err != nil {
return err
}
countBeforeRollback = cnt
}
return nil
},
},
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())
codec := server.ApplicationLayer().Codec()

// Allow async schema change purge to attempt backfill and error.
atomic.StoreUint32(&enableAsyncSchemaChanges, 1)
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test")
// deal with schema change knob
if _, err := sqltestutils.AddImmediateGCZoneConfig(sqlDB, tableDesc.GetID()); err != nil {
t.Fatal(err)
}
_, err := sqlDB.Exec(fmt.Sprintf("SET CLUSTER SETTING bulkio.index_backfill.batch_size = %d", chunkSize))
require.NoError(t, err)

// The deadline exceeded error in the schema change purge results in no
// retry attempts of the purge.
testutils.SucceedsSoon(t, func() error {
if read := atomic.LoadInt32(&attempts); read != expectedAttempts {
return errors.Errorf("%d retries, despite allowing only (schema change + reverse) = %d", read, expectedAttempts)
getKeyCount = func() (int, error) {
return sqltestutils.GetTableKeyCount(ctx, kvDB, codec)
}
return nil
})
// Disable strict GC TTL enforcement because we're going to shove a zero-value
// TTL into the system with AddImmediateGCZoneConfig.
defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)()

// There is still some garbage index data that needs to be purged. All the
// rows from k = 0 to k = chunkSize - 1 have index values.
numGarbageValues := chunkSize
_, err = sqlDB.Exec(schemaChangerSetup)
require.NoError(t, err)

ctx := context.Background()
if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
`); err != nil {
t.Fatal(err)
}

if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue+1+numGarbageValues); err != nil {
t.Fatal(err)
}
// Bulk insert.
const maxValue = chunkSize + 1
if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

if err := sqlutils.RunScrub(sqlDB, "t", "test"); err != nil {
t.Fatal(err)
}
// Add a row with a duplicate value=0 which is the same
// value as for the key maxValue.
if _, err := sqlDB.Exec(
`INSERT INTO t.test VALUES ($1, $2)`, maxValue+1, 0,
); err != nil {
t.Fatal(err)
}

// Enable async schema change processing to ensure that it cleans up the
// above garbage left behind.
atomic.StoreUint32(&enableAsyncSchemaChanges, 1)
// A schema change that violates integrity constraints.
if _, err := sqlDB.Exec(
"CREATE UNIQUE INDEX foo ON t.test (v)",
); !testutils.IsError(err, `violates unique constraint "foo"`) {
t.Fatal(err)
}

// No garbage left behind.
testutils.SucceedsSoon(t, func() error {
numGarbageValues = 0
return sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue+1+numGarbageValues)
})
// The index doesn't exist
if _, err := sqlDB.Query(
`SELECT v from t.test@foo`,
); !testutils.IsError(err, "index .* not found") {
t.Fatal(err)
}

// A new attempt cleans up a chunk of data.
if attempts != expectedAttempts+1 {
t.Fatalf("%d chunk ops, despite allowing only (schema change + reverse) = %d", attempts, expectedAttempts)
// countBeforeRollback is assigned in the rollback testing knob which is
// called before rollback starts so that the first chunk (200 keys) written
// is still visible. The first chunk is visible because there is no
// duplicate keys within it. The duplicate keys only exist in the second
// chunk. Also note that we wrote maxValue+1 rows, and there is 1 extra key
// from kv.
require.Equal(t, countBeforeRollback, maxValue+2+chunkSize)

// No garbage left behind after rollback. This check should succeed pretty
// fast since we use `DelRange` in GC and `CheckTableKeyCount` cannot see
// tombstones.
testutils.SucceedsSoon(t, func() error {
return sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue+1)
})
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/schemachanger/scexec/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type TestingKnobs struct {

// RunBeforeBackfill is called just before starting the backfill.
RunBeforeBackfill func() error

// RunBeforeMakingPostCommitPlan is called just before making the post commit
// plan.
RunBeforeMakingPostCommitPlan func(inRollback bool) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/schemachanger/scrun/scrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func RunSchemaChangesInJob(
descriptorIDs []descpb.ID,
rollbackCause error,
) error {
if knobs != nil && knobs.RunBeforeMakingPostCommitPlan != nil {
if err := knobs.RunBeforeMakingPostCommitPlan(rollbackCause != nil); err != nil {
return err
}
}
p, err := makePostCommitPlan(ctx, deps, jobID, descriptorIDs, rollbackCause)
if err != nil {
if knobs != nil && knobs.OnPostCommitPlanError != nil {
Expand Down

0 comments on commit 4fe2a80

Please sign in to comment.