Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
92345: changefeedccl: sqlsmith test r=[mgartner,jayshrivastava] a=HonoreDB

Informs cockroachdb#83591

Adds a test generating random predicates and
verifies that (except for a few known issues)
if a changefeed runs successfully it will output
the same rows as a regular query.

Release note: None

95850: rules: avoid O(columns^2) behavior with a small change r=ajwerner a=ajwerner

Epic: None

Release note (performance improvement): Fixed a bug which could lead to very slow drop when tables or views have a very large number of columns >1000.

Co-authored-by: Aaron Zinger <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Jan 26, 2023
3 parents f49b07c + 469222f + 22dc2fd commit 2ad8df3
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ go_test(
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/internal/sqlsmith",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
Expand Down
134 changes: 134 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // registers cloud storage providers
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/internal/sqlsmith"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -65,8 +66,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -857,6 +861,136 @@ func TestChangefeedResolvedFrequency(t *testing.T) {
cdcTest(t, testFn)
}

func TestChangefeedRandomExpressions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStress(t)
skip.UnderRace(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
rng, _ := randutil.NewTestRand()
tblName := "seed"
defer s.DB.Close()

sqlDB.ExecMultiple(t, sqlsmith.Setups[tblName](rng)...)

// TODO: PopulateTableWithRandData doesn't work with enums
sqlDB.Exec(t, "ALTER TABLE seed DROP COLUMN _enum")

for rows := 0; rows < 100; {
var err error
var newRows int
if newRows, err = randgen.PopulateTableWithRandData(rng, s.DB, tblName, 200); err != nil {
t.Fatal(err)
}
rows += newRows
}

sqlDB.Exec(t, `DELETE FROM seed WHERE rowid NOT IN (SELECT rowid FROM seed LIMIT 100)`)

// Put the enums back. enum_range('hi'::greeting)[rowid%7] will give nulls when rowid%7=0 or 6.
sqlDB.Exec(t, `ALTER TABLE seed ADD COLUMN _enum greeting`)
sqlDB.Exec(t, `UPDATE seed SET _enum = enum_range('hi'::greeting)[rowid%7]`)

queryGen, err := sqlsmith.NewSmither(s.DB, rng,
sqlsmith.DisableWith(),
sqlsmith.DisableMutations(),
sqlsmith.DisableLimits(),
sqlsmith.DisableAggregateFuncs(),
sqlsmith.DisableWindowFuncs(),
sqlsmith.DisableJoins(),
sqlsmith.DisableLimits(),
sqlsmith.DisableIndexHints(),
sqlsmith.SetScalarComplexity(0.5),
sqlsmith.SetComplexity(0.5),
)
require.NoError(t, err)
defer queryGen.Close()
numNonTrivialTestRuns := 0
whereClausesChecked := make(map[string]struct{}, 1000)
for i := 0; i < 1000; i++ {
query := queryGen.Generate()
where, ok := getWhereClause(query)
if !ok {
continue
}
if _, alreadyChecked := whereClausesChecked[where]; alreadyChecked {
continue
}
whereClausesChecked[where] = struct{}{}
query = "SELECT array_to_string(IFNULL(array_agg(distinct rowid),'{}'),'|') FROM seed WHERE " + where
t.Log(query)
rows := s.DB.QueryRow(query)
var expectedRowIDsStr string
if err := rows.Scan(&expectedRowIDsStr); err != nil {
t.Logf("Skipping query %s because error %s", query, err)
continue
}
expectedRowIDs := strings.Split(expectedRowIDsStr, "|")
if expectedRowIDsStr == "" {
t.Logf("Skipping predicate %s because it returned no rows", where)
continue
}
createStmt := `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT rowid FROM seed WHERE ` + where
t.Logf("Expecting statement %s to emit %d events", createStmt, len(expectedRowIDs))
seedFeed, err := f.Feed(createStmt)
if err != nil {
t.Logf("Test tolerating create changefeed error: %s", err.Error())
closeFeedIgnoreError(t, seedFeed)
continue
}
numNonTrivialTestRuns++
assertedPayloads := make([]string, len(expectedRowIDs))
for i, id := range expectedRowIDs {
assertedPayloads[i] = fmt.Sprintf(`seed: [%s]->{"rowid": %s}`, id, id)
}
err = assertPayloadsBaseErr(context.Background(), seedFeed, assertedPayloads, false, false)
closeFeedIgnoreError(t, seedFeed)
if err != nil {
t.Error(err)
}
}
require.Greater(t, numNonTrivialTestRuns, 1)
t.Logf("%d predicates checked: all had the same result in SELECT and CHANGEFEED", numNonTrivialTestRuns)

}

cdcTest(t, testFn, feedTestForceSink(`kafka`))
}

// getWhereClause extracts the predicate from a randomly generated SQL statement.
func getWhereClause(query string) (string, bool) {
var p parser.Parser
stmts, err := p.Parse(query)
if err != nil {
return "", false
}
if len(stmts) != 1 {
return "", false
}
selectStmt, ok := stmts[0].AST.(*tree.Select).Select.(*tree.SelectClause)
if !ok {
return "", false
}
if selectStmt.Where == nil || len(selectStmt.From.Tables) == 0 {
return "", false
}
// Replace all table references with "seed" because we're not using the FROM clause so we can't reference aliases.
replaced, err := tree.SimpleVisit(selectStmt.Where.Expr, func(expr tree.Expr) (recurse bool, newExpr tree.Expr, err error) {
if ci, ok := expr.(*tree.ColumnItem); ok {
newCI := *ci
newCI.TableName = &tree.UnresolvedObjectName{NumParts: 1, Parts: [3]string{``, ``, `seed`}}
expr = &newCI
}
if un, ok := expr.(*tree.UnresolvedName); ok && un.NumParts > 1 {
un.Parts[un.NumParts-1] = `seed`
}
return true, expr, nil
})
return replaced.String(), err == nil
}

// Test how Changefeeds react to schema changes that do not require a backfill
// operation.
func TestChangefeedInitialScan(t *testing.T) {
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func assertPayloadsBase(

require.NoError(t,
withTimeout(f, timeout,
func(ctx context.Context) error {
func(ctx context.Context) (err error) {
return assertPayloadsBaseErr(ctx, f, expected, stripTs, perKeyOrdered)
},
))
Expand Down Expand Up @@ -697,6 +697,18 @@ func closeFeed(t testing.TB, f cdctest.TestFeed) {
}
}

func closeFeedIgnoreError(t testing.TB, f cdctest.TestFeed) {
defer func() {
if e := recover(); e != nil {
t.Log(e)
}
}()
t.Helper()
if err := f.Close(); err != nil {
t.Log(err)
}
}

// TestServer is a struct to allow tests to operate on a shared API regardless
// of a test running as the system tenant or a secondary tenant
type TestServer struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/internal/sqlsmith/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ var joinTypes = []string{
}

func makeJoinExpr(s *Smither, refs colRefs, forJoin bool) (tree.TableExpr, colRefs, bool) {
if s.disableJoins {
return nil, nil, false
}
left, leftRefs, ok := makeTableExpr(s, refs, true)
if !ok {
return nil, nil, false
Expand Down Expand Up @@ -280,7 +283,8 @@ func makeAndedJoinCond(
v := available[0]
available = available[1:]
var expr *tree.ComparisonExpr
useEQ := cond == nil || onlyEqualityPreds || s.coin()
_, expressionsAreComparable := tree.CmpOps[treecmp.LT].LookupImpl(v[0].ResolvedType(), v[1].ResolvedType())
useEQ := cond == nil || onlyEqualityPreds || !expressionsAreComparable || s.coin()
if useEQ {
expr = tree.NewTypedComparisonExpr(treecmp.MakeComparisonOperator(treecmp.EQ), v[0], v[1])
} else {
Expand Down Expand Up @@ -645,7 +649,11 @@ func (s *Smither) makeSelectClause(
}
clause.From.Tables = append(clause.From.Tables, from)
// Restrict so that we don't have a crazy amount of rows due to many joins.
if len(clause.From.Tables) >= 4 {
tableLimit := 4
if s.disableJoins {
tableLimit = 1
}
if len(clause.From.Tables) >= tableLimit {
break
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/internal/sqlsmith/sqlsmith.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Smither struct {
unlikelyConstantPredicate bool
favorCommonData bool
unlikelyRandomNulls bool
disableJoins bool
disableCrossJoins bool
disableIndexHints bool
lowProbWhereWithJoinTables bool
Expand Down Expand Up @@ -434,6 +435,11 @@ var UnlikelyRandomNulls = simpleOption("unlikely random nulls", func(s *Smither)
s.unlikelyRandomNulls = true
})

// DisableJoins causes the Smither to disable joins.
var DisableJoins = simpleOption("disable joins", func(s *Smither) {
s.disableJoins = true
})

// DisableCrossJoins causes the Smither to disable cross joins.
var DisableCrossJoins = simpleOption("disable cross joins", func(s *Smither) {
s.disableCrossJoins = true
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/randgen/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,10 @@ func PopulateTableWithRandData(
// them to the list of columns to insert data into.
continue
}
colTypes = append(colTypes, tree.MustBeStaticallyKnownType(col.Type.(*types.T)))
if _, ok := col.Type.(*types.T); !ok {
return 0, errors.Newf("No type for %v", col)
}
colTypes = append(colTypes, tree.MustBeStaticallyKnownType(col.Type))
nullable = append(nullable, col.Nullable.Nullability == tree.Null)

colNameBuilder.WriteString(comma)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func init() {
status := rel.Var("status")
return rel.Clauses{
from.Type((*scpb.Column)(nil)),
// Join first on the target and node to only explore all columns
// which are being added as opposed to all columns. If we joined
// first on the columns, we'd be filtering the cross product of
// table columns. If a relation has a lot of columns, this can hurt.
// It's less likely that we have a very large number of columns which
// are being added. We'll want to do something else here when we start
// creating tables and all the columns are being added.
//
// The "right" answer is to push ordering predicates into rel; it also
// is maintaining sorted data structures.
from.JoinTargetNode(),
to.Type((*scpb.Column)(nil)),
JoinOnDescID(from, to, "table-id"),
ToPublicOrTransient(from, to),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2130,6 +2130,7 @@ deprules
to: earlier-column-Node
query:
- $later-column[Type] = '*scpb.Column'
- joinTargetNode($later-column, $later-column-Target, $later-column-Node)
- $earlier-column[Type] = '*scpb.Column'
- joinOnDescID($later-column, $earlier-column, $table-id)
- ToPublicOrTransient($later-column-Target, $earlier-column-Target)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func init() {
status := rel.Var("status")
return rel.Clauses{
from.Type((*scpb.Column)(nil)),
// Join first on the target and node to only explore all columns
// which are being added as opposed to all columns. If we joined
// first on the columns, we'd be filtering the cross product of
// table columns. If a relation has a lot of columns, this can hurt.
// It's less likely that we have a very large number of columns which
// are being added. We'll want to do something else here when we start
// creating tables and all the columns are being added.
//
// The "right" answer is to push ordering predicates into rel; it also
// is maintaining sorted data structures.
from.JoinTargetNode(),
to.Type((*scpb.Column)(nil)),
JoinOnDescID(from, to, "table-id"),
ToPublicOrTransient(from, to),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2036,6 +2036,7 @@ deprules
to: earlier-column-Node
query:
- $later-column[Type] = '*scpb.Column'
- joinTargetNode($later-column, $later-column-Target, $later-column-Node)
- $earlier-column[Type] = '*scpb.Column'
- joinOnDescID($later-column, $earlier-column, $table-id)
- ToPublicOrTransient($later-column-Target, $earlier-column-Target)
Expand Down

0 comments on commit 2ad8df3

Please sign in to comment.