Skip to content

Commit

Permalink
Merge #65325 #65712
Browse files Browse the repository at this point in the history
65325: changefeedccl: set groundwork for nemeses replay r=HonoreDB a=stevendanna

This contains #53771 but with the enumCreate event mix set to 0, effectively
disabling it until we can fix the issue it uncovered.

Further, it sets the groundwork for making ChangefeedNemeses something we
can record and play back by moving various random choices into the event payload.

Subsequent PRs will add the ability to record events and their payload such that they
can be replayed.

See the individual commits for more details.

65712: CODEOWNERS: add dev-inf as owner of `.github` r=rail a=rail

Keep the Developer Infrastructure team aware of the changes to the
`.github` directory, which may contain CI-related changes.

Release note: None

Co-authored-by: Rohan Yadav <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Rail Aliiev <[email protected]>
  • Loading branch information
4 people committed May 26, 2021
3 parents 4e3437f + 5b38f36 + d2a0781 commit 821d315
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 67 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# [2]: https://help.github.com/articles/about-codeowners/
# [3]: pkg/internal/codeowners

/.github/ @cockroachdb/dev-inf

/docs/RFCS/ @cockroachdb/rfc-prs

/pkg/sql/opt/ @cockroachdb/sql-opt-prs
Expand Down
236 changes: 169 additions & 67 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
eventRemoveColumn{
CanRemoveColumnAfter: fsm.False,
}: 5,

// eventCreateEnum creates a new enum type.
eventCreateEnum{}: 5,
},
}

Expand All @@ -132,7 +135,11 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
// then randomly either committing or rolling back transactions. This will
// leave some committed rows.
for i := 0; i < ns.rowCount*5; i++ {
if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns}); err != nil {
payload, err := newOpenTxnPayload(ns)
if err != nil {
return nil, err
}
if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns, Payload: payload}); err != nil {
return nil, err
}
// Randomly commit or rollback, but commit at least one row to the table.
Expand Down Expand Up @@ -188,7 +195,11 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
// Maybe open an intent.
if rand.Intn(2) < 1 {
txnOpenBeforeInitialScan = true
if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns}); err != nil {
payload, err := newOpenTxnPayload(ns)
if err != nil {
return nil, err
}
if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns, Payload: payload}); err != nil {
return nil, err
}
}
Expand All @@ -208,11 +219,11 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er
if _, ok := state.(stateDone); ok {
return ns.v, nil
}
event, err := ns.nextEvent(rng, state, foo, &m)
event, eventPayload, err := ns.nextEvent(rng, state, &m)
if err != nil {
return nil, err
}
if err := m.Apply(ctx, event); err != nil {
if err := m.ApplyWithPayload(ctx, event, eventPayload); err != nil {
return nil, err
}
}
Expand All @@ -225,6 +236,27 @@ const (
openTxnTypeDelete openTxnType = `DELETE`
)

type openTxnPayload struct {
openTxnType openTxnType

// rowID is the column ID to operate on.
rowID int
}

type addColumnType string

const (
addColumnTypeString addColumnType = "string"
addColumnTypeEnum addColumnType = "enum"
)

type addColumnPayload struct {
columnType addColumnType

// if columnType is enumColumn, which enum to add
enum int
}

type nemeses struct {
rowCount int
maxTestColumnCount int
Expand All @@ -240,24 +272,27 @@ type nemeses struct {
openTxnType openTxnType
openTxnID int
openTxnTs string

enumCount int
}

// nextEvent selects the next state transition.
func (ns *nemeses) nextEvent(
rng *rand.Rand, state fsm.State, f TestFeed, m *fsm.Machine,
) (se fsm.Event, err error) {
rng *rand.Rand, state fsm.State, m *fsm.Machine,
) (se fsm.Event, payload fsm.EventPayload, err error) {
var noPayload interface{}
if ns.v.NumResolvedWithRows >= 6 && ns.v.NumResolvedRows >= 10 {
return eventFinished{}, nil
return eventFinished{}, noPayload, nil
}
possibleEvents, ok := compiledStateTransitions.GetExpanded()[state]
if !ok {
return nil, errors.Errorf(`unknown state: %T %s`, state, state)
return nil, noPayload, errors.Errorf(`unknown state: %T %s`, state, state)
}
mixTotal := 0
for event := range possibleEvents {
weight, ok := ns.eventMix[event]
if !ok {
return nil, errors.Errorf(`unknown event: %T`, event)
return nil, noPayload, errors.Errorf(`unknown event: %T`, event)
}
mixTotal += weight
}
Expand All @@ -273,26 +308,72 @@ func (ns *nemeses) nextEvent(
if ns.availableRows < 1 {
s := state.(stateRunning)
if s.TxnOpen.Get() {
return eventCommit{}, nil
return eventCommit{}, noPayload, nil
}
return eventOpenTxn{}, nil
payload, err := newOpenTxnPayload(ns)
if err != nil {
return eventOpenTxn{}, noPayload, err
}
return eventOpenTxn{}, payload, nil
}
return eventFeedMessage{}, noPayload, nil
}
if _, ok := event.(eventOpenTxn); ok {
payload, err := newOpenTxnPayload(ns)
if err != nil {
return eventOpenTxn{}, noPayload, err
}
return eventFeedMessage{}, nil
return eventOpenTxn{}, payload, nil
}
if e, ok := event.(eventAddColumn); ok {
e.CanAddColumnAfter = fsm.FromBool(ns.currentTestColumnCount < ns.maxTestColumnCount-1)
return e, nil
payload := addColumnPayload{}
if ns.enumCount > 0 && rng.Intn(4) < 1 {
payload.columnType = addColumnTypeEnum
payload.enum = rng.Intn(ns.enumCount)
} else {
payload.columnType = addColumnTypeString
}
return e, payload, nil
}
if e, ok := event.(eventRemoveColumn); ok {
e.CanRemoveColumnAfter = fsm.FromBool(ns.currentTestColumnCount > 1)
return e, nil
return e, noPayload, nil
}
return event, nil
return event, noPayload, nil
}

panic(`unreachable`)
}

func newOpenTxnPayload(ns *nemeses) (openTxnPayload, error) {
payload := openTxnPayload{}
if rand.Intn(10) == 0 {
rows, err := ns.db.Query(`SELECT id FROM foo ORDER BY random() LIMIT 1`)
if err != nil {
return payload, err
}
defer func() { _ = rows.Close() }()
if rows.Next() {
var deleteID int
if err := rows.Scan(&deleteID); err != nil {
return payload, err
}
payload.rowID = deleteID
payload.openTxnType = openTxnTypeDelete
return payload, nil
}
if err := rows.Err(); err != nil {
return payload, err
}
// No rows to delete, do an upsert
}

payload.rowID = rand.Intn(ns.rowCount)
payload.openTxnType = openTxnTypeUpsert
return payload, nil
}

type stateRunning struct {
FeedPaused fsm.Bool
TxnOpen fsm.Bool
Expand All @@ -319,6 +400,7 @@ type eventAddColumn struct {
type eventRemoveColumn struct {
CanRemoveColumnAfter fsm.Bool
}
type eventCreateEnum struct{}
type eventFinished struct{}

func (eventOpenTxn) Event() {}
Expand All @@ -332,6 +414,7 @@ func (eventRollback) Event() {}
func (eventSplit) Event() {}
func (eventAddColumn) Event() {}
func (eventRemoveColumn) Event() {}
func (eventCreateEnum) Event() {}
func (eventFinished) Event() {}

var stateTransitions = fsm.Pattern{
Expand Down Expand Up @@ -417,6 +500,15 @@ var stateTransitions = fsm.Pattern{
CanRemoveColumn: fsm.Var("CanRemoveColumn")},
Action: logEvent(openTxn),
},
eventCreateEnum{}: {
Next: stateRunning{
FeedPaused: fsm.Var("FeedPaused"),
TxnOpen: fsm.False,
CanAddColumn: fsm.Var("CanAddColumn"),
CanRemoveColumn: fsm.Var("CanRemoveColumn"),
},
Action: logEvent(createEnum),
},
},
stateRunning{
FeedPaused: fsm.Var("FeedPaused"),
Expand Down Expand Up @@ -493,9 +585,7 @@ var compiledStateTransitions = fsm.Compile(stateTransitions)

func logEvent(fn func(fsm.Args) error) func(fsm.Args) error {
return func(a fsm.Args) error {
if log.V(1) {
log.Infof(a.Ctx, "%#v\n", a.Event)
}
log.Infof(a.Ctx, "Event: %#v, Payload: %#v\n", a.Event, a.Payload)
return fn(a)
}
}
Expand All @@ -509,47 +599,31 @@ func cleanup(a fsm.Args) error {

func openTxn(a fsm.Args) error {
ns := a.Extended.(*nemeses)

const noDeleteSentinel = int(-1)
// 10% of the time attempt a DELETE.
deleteID := noDeleteSentinel
if rand.Intn(10) == 0 {
rows, err := ns.db.Query(`SELECT id FROM foo ORDER BY random() LIMIT 1`)
if err != nil {
return err
}
defer func() { _ = rows.Close() }()
if rows.Next() {
if err := rows.Scan(&deleteID); err != nil {
return err
}
}
if err := rows.Err(); err != nil {
return err
}
// If there aren't any rows, skip the DELETE this time.
}
payload := a.Payload.(openTxnPayload)

txn, err := ns.db.Begin()
if err != nil {
return err
}
if deleteID == noDeleteSentinel {
switch payload.openTxnType {
case openTxnTypeUpsert:
if err := txn.QueryRow(
`UPSERT INTO foo VALUES ((random() * $1)::int, cluster_logical_timestamp()::string) RETURNING id, ts`,
ns.rowCount,
`UPSERT INTO foo VALUES ($1, cluster_logical_timestamp()::string) RETURNING id, ts`,
payload.rowID,
).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil {
return err
}
ns.openTxnType = openTxnTypeUpsert
} else {
case openTxnTypeDelete:
if err := txn.QueryRow(
`DELETE FROM foo WHERE id = $1 RETURNING id, ts`, deleteID,
`DELETE FROM foo WHERE id = $1 RETURNING id, ts`,
payload.rowID,
).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil {
return err
}
ns.openTxnType = openTxnTypeDelete
default:
panic("unreachable")
}
ns.openTxnType = payload.openTxnType
ns.txn = txn
return nil
}
Expand All @@ -574,18 +648,40 @@ func rollback(a fsm.Args) error {
return ns.txn.Rollback()
}

func createEnum(a fsm.Args) error {
ns := a.Extended.(*nemeses)

if _, err := ns.db.Exec(fmt.Sprintf(`CREATE TYPE enum%d AS ENUM ('hello')`, ns.enumCount)); err != nil {
return err
}
ns.enumCount++
return nil
}

func addColumn(a fsm.Args) error {
ns := a.Extended.(*nemeses)
payload := a.Payload.(addColumnPayload)

if ns.currentTestColumnCount >= ns.maxTestColumnCount {
return errors.AssertionFailedf(`addColumn should be called when`+
`there are less than %d columns.`, ns.maxTestColumnCount)
}

if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d STRING DEFAULT 'x'`,
ns.currentTestColumnCount)); err != nil {
return err
switch payload.columnType {
case addColumnTypeEnum:
// Pick a random enum to add.
enum := payload.enum
if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d enum%d DEFAULT 'hello'`,
ns.currentTestColumnCount, enum)); err != nil {
return err
}
case addColumnTypeString:
if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d STRING DEFAULT 'x'`,
ns.currentTestColumnCount)); err != nil {
return err
}
}

ns.currentTestColumnCount++
var rows int
// Adding a column should trigger a full table scan.
Expand Down Expand Up @@ -628,29 +724,35 @@ func noteFeedMessage(a fsm.Args) error {
return errors.AssertionFailedf(`noteFeedMessage should be called with at` +
`least one available row.`)
}
m, err := ns.f.Next()
if err != nil {
return err
} else if m == nil {
return errors.Errorf(`expected another message`)
}

if len(m.Resolved) > 0 {
_, ts, err := ParseJSONValueTimestamps(m.Resolved)
for {
m, err := ns.f.Next()
if err != nil {
return err
} else if m == nil {
return errors.Errorf(`expected another message`)
}
log.Infof(a.Ctx, "%v", string(m.Resolved))
return ns.v.NoteResolved(m.Partition, ts)
}
ts, _, err := ParseJSONValueTimestamps(m.Value)
if err != nil {
return err
}

ns.availableRows--
log.Infof(a.Ctx, "%s->%s", m.Key, m.Value)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts)
if len(m.Resolved) > 0 {
_, ts, err := ParseJSONValueTimestamps(m.Resolved)
if err != nil {
return err
}
log.Infof(a.Ctx, "%v", string(m.Resolved))
err = ns.v.NoteResolved(m.Partition, ts)
if err != nil {
return err
}
// Keep consuming until we hit a row
} else {
ts, _, err := ParseJSONValueTimestamps(m.Value)
if err != nil {
return err
}
ns.availableRows--
log.Infof(a.Ctx, "%s->%s", m.Key, m.Value)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts)
}
}
}

func pause(a fsm.Args) error {
Expand Down

0 comments on commit 821d315

Please sign in to comment.