From c82d570486da31bcb06e864d63653a22a19b1c99 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 22 Oct 2019 23:19:33 -0400 Subject: [PATCH] changefeedccl: add schema change events to nemeses Currently, the changefeed nemeses system does not support schema changes. This has previously allowed some fairly critical bugs to creep past our testing. This PR adds support for schema change events to the changefeed nemeses. Release note: None. --- pkg/ccl/changefeedccl/cdctest/nemeses.go | 566 +++++++++++++----- pkg/ccl/changefeedccl/cdctest/validator.go | 265 +++++--- .../changefeedccl/cdctest/validator_test.go | 57 +- pkg/cmd/roachtest/cdc.go | 2 +- pkg/util/fsm/fsm.go | 5 + 5 files changed, 618 insertions(+), 277 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index c621072874ec..e52e66bc1086 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -9,15 +9,17 @@ package cdctest import ( + "bytes" "context" gosql "database/sql" + "fmt" "math/rand" "strings" "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" - "github.com/pkg/errors" + "github.com/cockroachdb/errors" ) // RunNemesis runs a jepsen-style validation of whether a changefeed meets our @@ -52,34 +54,66 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er eventPauseCount = 0 } ns := &nemeses{ - rowCount: 4, - db: db, + maxTestColumnCount: 10, + rowCount: 4, + db: db, // eventMix does not have to add to 100 eventMix: map[fsm.Event]int{ - // eventTransact opens an UPSERT transaction is there is not one open. If - // there is one open, it either commits it or rolls it back. - eventTransact{}: 50, + // We don't want `eventFinished` to ever be returned by `nextEvent` so we set + // its weight to 0. + eventFinished{}: 0, // eventFeedMessage reads a message from the feed, or if the state machine - // thinks there will be no message available, it falls back to - // eventTransact. + // thinks there will be no message available, it falls back to eventOpenTxn or + // eventCommit (if there is already a txn open). eventFeedMessage{}: 50, - // eventPause PAUSEs the changefeed. The state machine will handle - // RESUMEing it. - eventPause{}: eventPauseCount, + // eventSplit splits between two random rows (the split is a no-op if it + // already exists). + eventSplit{}: 5, - // eventPush pushes every open transaction by running a high priority - // SELECT. + // TRANSACTIONS + // eventOpenTxn opens an UPSERT or DELETE transaction. + eventOpenTxn{}: 10, + + // eventCommit commits the outstanding transaction. + eventCommit{}: 5, + + // eventRollback simply rolls the outstanding transaction back. + eventRollback{}: 5, + + // eventPush pushes every open transaction by running a high priority SELECT. eventPush{}: 5, - // eventAbort aborts every open transaction by running a high priority - // DELETE. + // eventAbort aborts every open transaction by running a high priority DELETE. eventAbort{}: 5, - // eventSplit splits between two random rows (the split is a no-op if it - // already exists). - eventSplit{}: 5, + // PAUSE / RESUME + // eventPause PAUSEs the changefeed. + eventPause{}: eventPauseCount, + + // eventResume RESUMEs the changefeed. + eventResume{}: 50, + + // SCHEMA CHANGES + // eventAddColumn performs a schema change by adding a new column with a default + // value in order to trigger a backfill. + eventAddColumn{ + CanAddColumnAfter: fsm.True, + }: 5, + + eventAddColumn{ + CanAddColumnAfter: fsm.False, + }: 5, + + // eventRemoveColumn performs a schema change by removing a column. + eventRemoveColumn{ + CanRemoveColumnAfter: fsm.True, + }: 5, + + eventRemoveColumn{ + CanRemoveColumnAfter: fsm.False, + }: 5, }, } @@ -94,14 +128,23 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er return nil, err } - // Initialize table rows by repeatedly running the `transact` transition, - // which randomly starts, commits, and rolls back transactions. This will - // leave some committed rows and maybe an outstanding intent for the initial - // scan. + // Initialize table rows by repeatedly running the `openTxn` transition, + // then randomly either committing or rolling back transactions. This will + // leave some committed rows. for i := 0; i < ns.rowCount*5; i++ { - if err := transact(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { return nil, err } + // Randomly commit or rollback, but commit at least one row to the table. + if rand.Intn(3) < 2 || i == 0 { + if err := commit(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + return nil, err + } + } else { + if err := rollback(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + return nil, err + } + } } foo, err := f.Feed(`CREATE CHANGEFEED FOR foo WITH updated, resolved`) @@ -111,10 +154,15 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er ns.f = foo defer func() { _ = foo.Close() }() - if _, err := db.Exec(`CREATE TABLE fprint (id INT PRIMARY KEY, ts STRING)`); err != nil { + // Create scratch table with a pre-specified set of test columns to avoid having to + // accommodate schema changes on-the-fly. + scratchTableName := `fprint` + var createFprintStmtBuf bytes.Buffer + fmt.Fprintf(&createFprintStmtBuf, `CREATE TABLE %s (id INT PRIMARY KEY, ts STRING)`, scratchTableName) + if _, err := db.Exec(createFprintStmtBuf.String()); err != nil { return nil, err } - fprintV, err := NewFingerprintValidator(db, `foo`, `fprint`, foo.Partitions()) + fprintV, err := NewFingerprintValidator(db, `foo`, scratchTableName, foo.Partitions(), ns.maxTestColumnCount) if err != nil { return nil, err } @@ -123,38 +171,39 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er fprintV, }) - // Initialize the actual row count, overwriting what `transact` did. - // `transact` has set this to the number of modified rows, which is correct - // during changefeed operation, but not for the initial scan, because some of - // the rows may have had the same primary key. + // Initialize the actual row count, overwriting what the initialization loop did. That + // loop has set this to the number of modified rows, which is correct during + // changefeed operation, but not for the initial scan, because some of the rows may + // have had the same primary key. if err := db.QueryRow(`SELECT count(*) FROM foo`).Scan(&ns.availableRows); err != nil { return nil, err } - // Kick everything off by reading the first message. This accomplishes two - // things. First, it maximizes the chance that we hit an unresolved intent - // during the initial scan. Second, it guarantees that the feed is running - // before anything else commits, which could mess up the availableRows count - // we just set. - if err := noteFeedMessage(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { - return nil, err - } - // Now push everything to make sure the initial scan can complete, otherwise - // we may deadlock. - if err := push(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { - return nil, err + txnOpenBeforeInitialScan := false + // Maybe open an intent. + if rand.Intn(2) < 1 { + txnOpenBeforeInitialScan = true + if err := openTxn(fsm.Args{Ctx: ctx, Extended: ns}); err != nil { + return nil, err + } } // Run the state machine until it finishes. Exit criteria is in `nextEvent` // and is based on the number of rows that have been resolved and the number // of resolved timestamp messages. - m := fsm.MakeMachine(txnStateTransitions, stateRunning{fsm.False}, ns) + initialState := stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.FromBool(txnOpenBeforeInitialScan), + CanAddColumn: fsm.True, + CanRemoveColumn: fsm.False, + } + m := fsm.MakeMachine(compiledStateTransitions, initialState, ns) for { state := m.CurState() if _, ok := state.(stateDone); ok { return ns.v, nil } - event, err := ns.nextEvent(rng, state, foo) + event, err := ns.nextEvent(rng, state, foo, &m) if err != nil { return nil, err } @@ -172,129 +221,276 @@ const ( ) type nemeses struct { - rowCount int - eventMix map[fsm.Event]int - mixTotal int + rowCount int + maxTestColumnCount int + eventMix map[fsm.Event]int v *CountValidator db *gosql.DB f TestFeed - availableRows int - txn *gosql.Tx - openTxnType openTxnType - openTxnID int - openTxnTs string + availableRows int + currentTestColumnCount int + txn *gosql.Tx + openTxnType openTxnType + openTxnID int + openTxnTs string } // nextEvent selects the next state transition. -func (ns *nemeses) nextEvent(rng *rand.Rand, state fsm.State, f TestFeed) (fsm.Event, error) { +func (ns *nemeses) nextEvent( + rng *rand.Rand, state fsm.State, f TestFeed, m *fsm.Machine, +) (se fsm.Event, err error) { if ns.v.NumResolvedWithRows >= 6 && ns.v.NumResolvedRows >= 10 { return eventFinished{}, nil } - - if ns.mixTotal == 0 { - for _, weight := range ns.eventMix { - ns.mixTotal += weight + possibleEvents, ok := compiledStateTransitions.GetExpanded()[state] + if !ok { + return nil, errors.Errorf(`unknown state: %T %s`, state, state) + } + mixTotal := 0 + for event := range possibleEvents { + weight, ok := ns.eventMix[event] + if !ok { + return nil, errors.Errorf(`unknown event: %T`, event) } + mixTotal += weight } - - switch state { - case stateRunning{Paused: fsm.True}: - return eventResume{}, nil - case stateRunning{Paused: fsm.False}: - r, t := rng.Intn(ns.mixTotal), 0 - for event, weight := range ns.eventMix { - t += weight - if r >= t { - continue - } - if _, ok := event.(eventFeedMessage); ok { - break + r, t := rng.Intn(mixTotal), 0 + for event := range possibleEvents { + t += ns.eventMix[event] + if r >= t { + continue + } + if _, ok := event.(eventFeedMessage); ok { + // If there are no available rows, openTxn or commit outstanding txn instead + // of reading. + if ns.availableRows < 1 { + s := state.(stateRunning) + if s.TxnOpen.Get() { + return eventCommit{}, nil + } + return eventOpenTxn{}, nil } - return event, nil + return eventFeedMessage{}, nil } - - // If there are no available rows, transact instead of reading. - if ns.availableRows < 1 { - return eventTransact{}, nil + if e, ok := event.(eventAddColumn); ok { + e.CanAddColumnAfter = fsm.FromBool(ns.currentTestColumnCount < ns.maxTestColumnCount-1) + return e, nil } - return eventFeedMessage{}, nil - default: - return nil, errors.Errorf(`unknown state: %T %s`, state, state) + if e, ok := event.(eventRemoveColumn); ok { + e.CanRemoveColumnAfter = fsm.FromBool(ns.currentTestColumnCount > 1) + return e, nil + } + return event, nil } + + panic(`unreachable`) } type stateRunning struct { - Paused fsm.Bool + FeedPaused fsm.Bool + TxnOpen fsm.Bool + CanRemoveColumn fsm.Bool + CanAddColumn fsm.Bool } type stateDone struct{} func (stateRunning) State() {} func (stateDone) State() {} -type eventTransact struct{} +type eventOpenTxn struct{} type eventFeedMessage struct{} type eventPause struct{} type eventResume struct{} +type eventCommit struct{} type eventPush struct{} type eventAbort struct{} +type eventRollback struct{} type eventSplit struct{} +type eventAddColumn struct { + CanAddColumnAfter fsm.Bool +} +type eventRemoveColumn struct { + CanRemoveColumnAfter fsm.Bool +} type eventFinished struct{} -func (eventTransact) Event() {} -func (eventFeedMessage) Event() {} -func (eventPause) Event() {} -func (eventResume) Event() {} -func (eventPush) Event() {} -func (eventAbort) Event() {} -func (eventSplit) Event() {} -func (eventFinished) Event() {} - -var txnStateTransitions = fsm.Compile(fsm.Pattern{ - stateRunning{Paused: fsm.Any}: { +func (eventOpenTxn) Event() {} +func (eventFeedMessage) Event() {} +func (eventPause) Event() {} +func (eventResume) Event() {} +func (eventCommit) Event() {} +func (eventPush) Event() {} +func (eventAbort) Event() {} +func (eventRollback) Event() {} +func (eventSplit) Event() {} +func (eventAddColumn) Event() {} +func (eventRemoveColumn) Event() {} +func (eventFinished) Event() {} + +var stateTransitions = fsm.Pattern{ + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventSplit{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(split), + }, eventFinished{}: { Next: stateDone{}, Action: logEvent(cleanup), }, }, - stateRunning{Paused: fsm.False}: { - eventPause{}: { - Next: stateRunning{Paused: fsm.True}, - Action: logEvent(pause), + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.True, + CanRemoveColumn: fsm.Any, + }: { + eventAddColumn{ + CanAddColumnAfter: fsm.Var("CanAddColumnAfter"), + }: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumnAfter"), + CanRemoveColumn: fsm.True}, + Action: logEvent(addColumn), }, - eventTransact{}: { - Next: stateRunning{Paused: fsm.False}, - Action: logEvent(transact), + }, + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Any, + CanRemoveColumn: fsm.True, + }: { + eventRemoveColumn{ + CanRemoveColumnAfter: fsm.Var("CanRemoveColumnAfter"), + }: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.True, + CanRemoveColumn: fsm.Var("CanRemoveColumnAfter")}, + Action: logEvent(removeColumn), }, + }, + stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { eventFeedMessage{}: { - Next: stateRunning{Paused: fsm.False}, + Next: stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, Action: logEvent(noteFeedMessage), }, - eventPush{}: { - Next: stateRunning{Paused: fsm.False}, - Action: logEvent(push), + }, + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventOpenTxn{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(openTxn), + }, + }, + stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventCommit{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(commit), + }, + eventRollback{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.False, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(rollback), }, eventAbort{}: { - Next: stateRunning{Paused: fsm.False}, + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, Action: logEvent(abort), }, - eventSplit{}: { - Next: stateRunning{Paused: fsm.False}, - Action: logEvent(split), + eventPush{}: { + Next: stateRunning{ + FeedPaused: fsm.Var("FeedPaused"), + TxnOpen: fsm.True, + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(push), }, }, - stateRunning{Paused: fsm.True}: { + stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { + eventPause{}: { + Next: stateRunning{ + FeedPaused: fsm.True, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, + Action: logEvent(pause), + }, + }, + stateRunning{ + FeedPaused: fsm.True, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn"), + }: { eventResume{}: { - Next: stateRunning{Paused: fsm.False}, + Next: stateRunning{ + FeedPaused: fsm.False, + TxnOpen: fsm.Var("TxnOpen"), + CanAddColumn: fsm.Var("CanAddColumn"), + CanRemoveColumn: fsm.Var("CanRemoveColumn")}, Action: logEvent(resume), }, }, -}) +} + +var compiledStateTransitions = fsm.Compile(stateTransitions) func logEvent(fn func(fsm.Args) error) func(fsm.Args) error { return func(a fsm.Args) error { - log.Infof(a.Ctx, "%#v\n", a.Event) + if log.V(1) { + log.Infof(a.Ctx, "%#v\n", a.Event) + } return fn(a) } } @@ -306,76 +502,124 @@ func cleanup(a fsm.Args) error { return nil } -func transact(a fsm.Args) error { +func openTxn(a fsm.Args) error { ns := a.Extended.(*nemeses) - // If there are no transactions, create one. - if ns.txn == nil { - const noDeleteSentinel = int(-1) - // 10% of the time attempt a DELETE. - deleteID := noDeleteSentinel - if rand.Intn(10) == 0 { - rows, err := ns.db.Query(`SELECT id FROM foo ORDER BY random() LIMIT 1`) - if err != nil { - return err - } - defer func() { _ = rows.Close() }() - if rows.Next() { - if err := rows.Scan(&deleteID); err != nil { - return err - } - } - // If there aren't any rows, skip the DELETE this time. - } - - txn, err := ns.db.Begin() + const noDeleteSentinel = int(-1) + // 10% of the time attempt a DELETE. + deleteID := noDeleteSentinel + if rand.Intn(10) == 0 { + rows, err := ns.db.Query(`SELECT id FROM foo ORDER BY random() LIMIT 1`) if err != nil { return err } - if deleteID == noDeleteSentinel { - if err := txn.QueryRow( - `UPSERT INTO foo VALUES ((random() * $1)::int, cluster_logical_timestamp()::string) RETURNING *`, - ns.rowCount, - ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { - return err - } - ns.openTxnType = openTxnTypeUpsert - } else { - if err := txn.QueryRow( - `DELETE FROM foo WHERE id = $1 RETURNING *`, deleteID, - ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { + defer func() { _ = rows.Close() }() + if rows.Next() { + if err := rows.Scan(&deleteID); err != nil { return err } - ns.openTxnType = openTxnTypeDelete } - ns.txn = txn - return nil + // If there aren't any rows, skip the DELETE this time. } - // If there is an outstanding transaction, roll it back half the time and - // commit it the other half. - txn := ns.txn - ns.txn = nil - - if rand.Intn(2) < 1 { - return txn.Rollback() + txn, err := ns.db.Begin() + if err != nil { + return err } - if err := txn.Commit(); err != nil { + if deleteID == noDeleteSentinel { + if err := txn.QueryRow( + `UPSERT INTO foo VALUES ((random() * $1)::int, cluster_logical_timestamp()::string) RETURNING id, ts`, + ns.rowCount, + ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { + return err + } + ns.openTxnType = openTxnTypeUpsert + } else { + if err := txn.QueryRow( + `DELETE FROM foo WHERE id = $1 RETURNING id, ts`, deleteID, + ).Scan(&ns.openTxnID, &ns.openTxnTs); err != nil { + return err + } + ns.openTxnType = openTxnTypeDelete + } + ns.txn = txn + return nil +} + +func commit(a fsm.Args) error { + ns := a.Extended.(*nemeses) + defer func() { ns.txn = nil }() + if err := ns.txn.Commit(); err != nil { // Don't error out if we got pushed, but don't increment availableRows no // matter what error was hit. if strings.Contains(err.Error(), `restart transaction`) { return nil } - return err } - log.Infof(a.Ctx, "%s (%d, %s)", ns.openTxnType, ns.openTxnID, ns.openTxnTs) ns.availableRows++ return nil } +func rollback(a fsm.Args) error { + ns := a.Extended.(*nemeses) + defer func() { ns.txn = nil }() + return ns.txn.Rollback() +} + +func addColumn(a fsm.Args) error { + ns := a.Extended.(*nemeses) + + if ns.currentTestColumnCount >= ns.maxTestColumnCount { + return errors.AssertionFailedf(`addColumn should be called when`+ + `there are less than %d columns.`, ns.maxTestColumnCount) + } + + if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo ADD COLUMN test%d STRING DEFAULT 'x'`, + ns.currentTestColumnCount)); err != nil { + return err + } + ns.currentTestColumnCount++ + var rows int + // Adding a column should trigger a full table scan. + if err := ns.db.QueryRow(`SELECT count(*) FROM foo`).Scan(&rows); err != nil { + return err + } + // We expect one table scan that corresponds to the schema change backfill, and one + // scan that corresponds to the changefeed level backfill. + ns.availableRows += 2 * rows + return nil +} + +func removeColumn(a fsm.Args) error { + ns := a.Extended.(*nemeses) + + if ns.currentTestColumnCount == 0 { + return errors.AssertionFailedf(`removeColumn should be called with` + + `at least one test column.`) + } + if _, err := ns.db.Exec(fmt.Sprintf(`ALTER TABLE foo DROP COLUMN test%d`, + ns.currentTestColumnCount-1)); err != nil { + return err + } + ns.currentTestColumnCount-- + var rows int + // Dropping a column should trigger a full table scan. + if err := ns.db.QueryRow(`SELECT count(*) FROM foo`).Scan(&rows); err != nil { + return err + } + // We expect one table scan that corresponds to the schema change backfill, and one + // scan that corresponds to the changefeed level backfill. + ns.availableRows += 2 * rows + return nil +} + func noteFeedMessage(a fsm.Args) error { ns := a.Extended.(*nemeses) + if ns.availableRows <= 0 { + return errors.AssertionFailedf(`noteFeedMessage should be called with at` + + `least one available row.`) + } m, err := ns.f.Next() if err != nil { return err @@ -410,12 +654,6 @@ func resume(a fsm.Args) error { return a.Extended.(*nemeses).f.Resume() } -func push(a fsm.Args) error { - ns := a.Extended.(*nemeses) - _, err := ns.db.Exec(`BEGIN TRANSACTION PRIORITY HIGH; SELECT * FROM foo; COMMIT`) - return err -} - func abort(a fsm.Args) error { ns := a.Extended.(*nemeses) const delete = `BEGIN TRANSACTION PRIORITY HIGH; ` + @@ -429,6 +667,12 @@ func abort(a fsm.Args) error { return nil } +func push(a fsm.Args) error { + ns := a.Extended.(*nemeses) + _, err := ns.db.Exec(`BEGIN TRANSACTION PRIORITY HIGH; SELECT * FROM foo; COMMIT`) + return err +} + func split(a fsm.Args) error { ns := a.Extended.(*nemeses) _, err := ns.db.Exec(`ALTER TABLE foo SPLIT AT VALUES ((random() * $1)::int)`, ns.rowCount) diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index bb34f0405444..115972ca45f5 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -126,17 +126,29 @@ type fingerprintValidator struct { // exists, which is valid but complicates the way fingerprintValidator works. // Don't create a fingerprint earlier than the first seen row. firstRowTimestamp hlc.Timestamp - - buffer []validatorRow + // previousRowUpdateTs keeps track of the timestamp of the most recently processed row + // update. Before starting to process row updates belonging to a particular timestamp + // X, we want to fingerprint at `X.Prev()` to catch any "missed" row updates. + // Maintaining `previousRowUpdateTs` allows us to do this. See `NoteResolved()` for + // more details. + previousRowUpdateTs hlc.Timestamp + + // `fprintOrigColumns` keeps track of the number of non test columns in `fprint`. + fprintOrigColumns int + fprintTestColumns int + buffer []validatorRow failures []string } -// NewFingerprintValidator returns a new FingerprintValidator that uses -// `fprintTable` as scratch space to recreate `origTable`. `fprintTable` must -// exist before calling this constructor. +// NewFingerprintValidator returns a new FingerprintValidator that uses `fprintTable` as +// scratch space to recreate `origTable`. `fprintTable` must exist before calling this +// constructor. `maxTestColumnCount` indicates the maximum number of columns that can be +// expected in `origTable` due to test-related schema changes. This fingerprint validator +// will modify `fprint`'s schema to add `maxTestColumnCount` columns to avoid having to +// accommodate schema changes on the fly. func NewFingerprintValidator( - sqlDB *gosql.DB, origTable, fprintTable string, partitions []string, + sqlDB *gosql.DB, origTable, fprintTable string, partitions []string, maxTestColumnCount int, ) (Validator, error) { // Fetch the primary keys though information_schema schema inspections so we // can use them to construct the SQL for DELETEs and also so we can verify @@ -153,6 +165,16 @@ func NewFingerprintValidator( if err != nil { return nil, err } + // Record the non-test%d columns in `fprint`. + var fprintOrigColumns int + if err := sqlDB.QueryRow(` + SELECT count(column_name) + FROM information_schema.columns + WHERE table_name=$1 + `, fprintTable).Scan(&fprintOrigColumns); err != nil { + return nil, err + } + defer func() { _ = rows.Close() }() for rows.Next() { var primaryKeyCol string @@ -165,11 +187,28 @@ func NewFingerprintValidator( return nil, errors.Errorf("no primary key information found for %s", fprintTable) } + // Add test columns to fprint. + if maxTestColumnCount > 0 { + var addColumnStmt bytes.Buffer + addColumnStmt.WriteString(`ALTER TABLE fprint `) + for i := 0; i < maxTestColumnCount; i++ { + if i != 0 { + addColumnStmt.WriteString(`, `) + } + fmt.Fprintf(&addColumnStmt, `ADD COLUMN test%d STRING`, i) + } + if _, err := sqlDB.Query(addColumnStmt.String()); err != nil { + return nil, err + } + } + v := &fingerprintValidator{ - sqlDB: sqlDB, - origTable: origTable, - fprintTable: fprintTable, - primaryKeyCols: primaryKeyCols, + sqlDB: sqlDB, + origTable: origTable, + fprintTable: fprintTable, + primaryKeyCols: primaryKeyCols, + fprintOrigColumns: fprintOrigColumns, + fprintTestColumns: maxTestColumnCount, } v.partitionResolved = make(map[string]hlc.Timestamp) for _, partition := range partitions { @@ -192,6 +231,93 @@ func (v *fingerprintValidator) NoteRow( }) } +// applyRowUpdate applies the update represented by `row` to the scratch table. +func (v *fingerprintValidator) applyRowUpdate(row validatorRow) error { + txn, err := v.sqlDB.Begin() + if err != nil { + return err + } + var args []interface{} + + var primaryKeyDatums []interface{} + if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil { + return err + } + if len(primaryKeyDatums) != len(v.primaryKeyCols) { + return errors.Errorf(`expected primary key columns %s got datums %s`, + v.primaryKeyCols, primaryKeyDatums) + } + + var stmtBuf bytes.Buffer + type wrapper struct { + After map[string]interface{} `json:"after"` + } + var value wrapper + if err := gojson.Unmarshal([]byte(row.value), &value); err != nil { + return err + } + if value.After != nil { + // UPDATE or INSERT + fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable) + for col, colValue := range value.After { + if len(args) != 0 { + stmtBuf.WriteString(`,`) + } + stmtBuf.WriteString(col) + args = append(args, colValue) + } + for i := len(value.After) - v.fprintOrigColumns; i < v.fprintTestColumns; i++ { + fmt.Fprintf(&stmtBuf, `, test%d`, i) + args = append(args, nil) + } + stmtBuf.WriteString(`) VALUES (`) + for i := range args { + if i != 0 { + stmtBuf.WriteString(`,`) + } + fmt.Fprintf(&stmtBuf, `$%d`, i+1) + } + stmtBuf.WriteString(`)`) + + // Also verify that the key matches the value. + primaryKeyDatums = make([]interface{}, len(v.primaryKeyCols)) + for idx, primaryKeyCol := range v.primaryKeyCols { + primaryKeyDatums[idx] = value.After[primaryKeyCol] + } + primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums) + if err != nil { + return err + } + + if string(primaryKeyJSON) != row.key { + v.failures = append(v.failures, + fmt.Sprintf(`key %s did not match expected key %s for value %s`, + row.key, primaryKeyJSON, row.value)) + } + + if _, err := txn.Exec(stmtBuf.String(), args...); err != nil { + return err + } + } else { + // DELETE + fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable) + for i, datum := range primaryKeyDatums { + if len(args) != 0 { + stmtBuf.WriteString(`,`) + } + fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1) + args = append(args, datum) + } + if _, err := txn.Exec(stmtBuf.String(), args...); err != nil { + return err + } + } + if err := txn.Commit(); err != nil { + return err + } + return nil +} + // NoteResolved implements the Validator interface. func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { if r, ok := v.partitionResolved[partition]; !ok { @@ -213,7 +339,6 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times if !v.resolved.Less(newResolved) { return nil } - initialScanComplete := v.resolved != (hlc.Timestamp{}) v.resolved = newResolved // NB: Intentionally not stable sort because it shouldn't matter. @@ -221,7 +346,11 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times return v.buffer[i].updated.Less(v.buffer[j].updated) }) - var lastUpdated hlc.Timestamp + var lastFingerprintedAt hlc.Timestamp + // We apply all the row updates we received in the time window between the last + // resolved timestamp and this one. We process all row updates belonging to a given + // timestamp and then `fingerprint` to ensure the scratch table and the original table + // match. for len(v.buffer) > 0 { if v.resolved.Less(v.buffer[0].updated) { break @@ -229,93 +358,32 @@ func (v *fingerprintValidator) NoteResolved(partition string, resolved hlc.Times row := v.buffer[0] v.buffer = v.buffer[1:] - // If we have already completed the initial scan, verify the fingerprint at - // every point in time. Before the initial scan is complete, the fingerprint - // table might not have the earliest version of every row present in the - // table. - if initialScanComplete { - if row.updated != lastUpdated { - if lastUpdated != (hlc.Timestamp{}) { - if err := v.fingerprint(lastUpdated); err != nil { - return err - } - } - if err := v.fingerprint(row.updated.Prev()); err != nil { - return err - } + // If we've processed all row updates belonging to the previous row's timestamp, + // we fingerprint at `updated.Prev()` since we want to catch cases where one or + // more row updates are missed. For example: If k1 was written at t1, t2, t3 and + // the update for t2 was missed. + if v.previousRowUpdateTs != (hlc.Timestamp{}) && v.previousRowUpdateTs.Less(row.updated) { + if err := v.fingerprint(row.updated.Prev()); err != nil { + return err } - lastUpdated = row.updated - } - - type wrapper struct { - After map[string]interface{} `json:"after"` } - var value wrapper - if err := gojson.Unmarshal([]byte(row.value), &value); err != nil { + if err := v.applyRowUpdate(row); err != nil { return err } - var stmtBuf bytes.Buffer - var args []interface{} - if value.After != nil { - // UPDATE or INSERT - fmt.Fprintf(&stmtBuf, `UPSERT INTO %s (`, v.fprintTable) - for col, colValue := range value.After { - if len(args) != 0 { - stmtBuf.WriteString(`,`) - } - stmtBuf.WriteString(col) - args = append(args, colValue) - } - stmtBuf.WriteString(`) VALUES (`) - for i := range args { - if i != 0 { - stmtBuf.WriteString(`,`) - } - fmt.Fprintf(&stmtBuf, `$%d`, i+1) - } - stmtBuf.WriteString(`)`) - - // Also verify that the key matches the value. - primaryKeyDatums := make([]interface{}, len(v.primaryKeyCols)) - for idx, primaryKeyCol := range v.primaryKeyCols { - primaryKeyDatums[idx] = value.After[primaryKeyCol] - } - primaryKeyJSON, err := gojson.Marshal(primaryKeyDatums) - if err != nil { - return err - } - if string(primaryKeyJSON) != row.key { - v.failures = append(v.failures, fmt.Sprintf( - `key %s did not match expected key %s for value %s`, row.key, primaryKeyJSON, row.value)) - } - } else { - // DELETE - var primaryKeyDatums []interface{} - if err := gojson.Unmarshal([]byte(row.key), &primaryKeyDatums); err != nil { + // If any updates have exactly the same timestamp, we have to apply them all + // before fingerprinting. + if len(v.buffer) == 0 || v.buffer[0].updated != row.updated { + lastFingerprintedAt = row.updated + if err := v.fingerprint(row.updated); err != nil { return err } - if len(primaryKeyDatums) != len(v.primaryKeyCols) { - return errors.Errorf( - `expected primary key columns %s got datums %s`, v.primaryKeyCols, primaryKeyDatums) - } - fmt.Fprintf(&stmtBuf, `DELETE FROM %s WHERE `, v.fprintTable) - for i, datum := range primaryKeyDatums { - if len(args) != 0 { - stmtBuf.WriteString(`,`) - } - fmt.Fprintf(&stmtBuf, `%s = $%d`, v.primaryKeyCols[i], i+1) - args = append(args, datum) - } - } - if len(args) > 0 { - if _, err := v.sqlDB.Exec(stmtBuf.String(), args...); err != nil { - return errors.Wrap(err, stmtBuf.String()) - } } + v.previousRowUpdateTs = row.updated } - if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) { + if !v.firstRowTimestamp.IsEmpty() && !resolved.Less(v.firstRowTimestamp) && + lastFingerprintedAt != resolved { return v.fingerprint(resolved) } return nil @@ -335,8 +403,23 @@ func (v *fingerprintValidator) fingerprint(ts hlc.Timestamp) error { return err } if orig != check { - v.failures = append(v.failures, fmt.Sprintf( - `fingerprints did not match at %s: %s vs %s`, ts.AsOfSystemTime(), orig, check)) + // Ignore the fingerprint mismatch if there was an in-progress schema change job + // on the table. + // TODO(aayush): We currently need to have this hack here since we emit changefeed + // level backfill row updates at the wrong time in the `DROP COLUMN` case. See + // issue #41961 for more details. + var pendingJobs int + var countJobsStmt bytes.Buffer + fmt.Fprintf(&countJobsStmt, `SELECT count(*) from [show jobs] AS OF SYSTEM TIME '%s'`+ + `where job_type = 'SCHEMA CHANGE' and status = 'running' or status = 'pending'`, + ts.AsOfSystemTime()) + if err := v.sqlDB.QueryRow(countJobsStmt.String()).Scan(&pendingJobs); err != nil { + return err + } + if pendingJobs == 0 { + v.failures = append(v.failures, fmt.Sprintf( + `fingerprints did not match at %s: %s vs %s`, ts.AsOfSystemTime(), orig, check)) + } } return nil } diff --git a/pkg/ccl/changefeedccl/cdctest/validator_test.go b/pkg/ccl/changefeedccl/cdctest/validator_test.go index 32ea8dd6fe85..2869d7ef2774 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator_test.go +++ b/pkg/ccl/changefeedccl/cdctest/validator_test.go @@ -10,6 +10,7 @@ package cdctest import ( "context" + "fmt" "reflect" "testing" @@ -126,16 +127,21 @@ func TestFingerprintValidator(t *testing.T) { } } + createTableStmt := func(tableName string) string { + return fmt.Sprintf(`CREATE TABLE %s (k INT PRIMARY KEY, v INT)`, tableName) + } + testColumns := 0 + t.Run(`empty`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE empty (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`empty`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `empty`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) assertValidatorFailures(t, v) }) t.Run(`wrong_data`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE wrong_data (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`wrong_data`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `wrong_data`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":10}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -145,8 +151,8 @@ func TestFingerprintValidator(t *testing.T) { ) }) t.Run(`all_resolved`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE all_resolved (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`all_resolved`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `all_resolved`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`p`, ts[0]); err != nil { t.Fatal(err) @@ -164,8 +170,8 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`rows_unsorted`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE rows_unsorted (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`rows_unsorted`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `rows_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) @@ -176,27 +182,30 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`missed_initial`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE missed_initial (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`missed_initial`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_initial`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) // Intentionally missing {"k":1,"v":1} at ts[1]. + // Insert a fake row since we don't fingerprint earlier than the first seen row. + v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[2].Prev()) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[2]) - noteResolved(t, v, `p`, ts[2]) + noteResolved(t, v, `p`, ts[2].Prev()) assertValidatorFailures(t, v, `fingerprints did not match at `+ts[2].Prev().AsOfSystemTime()+ - `: 590700560494856539 vs EMPTY`, + `: 590700560494856539 vs 590699460983228293`, ) }) t.Run(`missed_middle`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE missed_middle (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`missed_middle`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_middle`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) // Intentionally missing {"k":1,"v":2} at ts[2]. v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[2]) + noteResolved(t, v, `p`, ts[2]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) noteResolved(t, v, `p`, ts[3]) assertValidatorFailures(t, v, @@ -207,8 +216,8 @@ func TestFingerprintValidator(t *testing.T) { ) }) t.Run(`missed_end`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE missed_end (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`missed_end`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `missed_end`, []string{`p`}, testColumns) require.NoError(t, err) noteResolved(t, v, `p`, ts[0]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) @@ -222,8 +231,8 @@ func TestFingerprintValidator(t *testing.T) { ) }) t.Run(`initial_scan`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE initial_scan (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`initial_scan`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `initial_scan`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":3}}`, ts[3]) v.NoteRow(ignored, `[2]`, `{"after": {"k":2,"v":2}}`, ts[3]) @@ -231,16 +240,16 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`unknown_partition`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE unknown_partition (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`unknown_partition`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `unknown_partition`, []string{`p`}, testColumns) require.NoError(t, err) if err := v.NoteResolved(`nope`, ts[1]); !testutils.IsError(err, `unknown partition`) { t.Fatalf(`expected "unknown partition" error got: %+v`, err) } }) t.Run(`resolved_unsorted`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE resolved_unsorted (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}) + sqlDB.Exec(t, createTableStmt(`resolved_unsorted`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `resolved_unsorted`, []string{`p`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) noteResolved(t, v, `p`, ts[1]) @@ -249,8 +258,8 @@ func TestFingerprintValidator(t *testing.T) { assertValidatorFailures(t, v) }) t.Run(`two_partitions`, func(t *testing.T) { - sqlDB.Exec(t, `CREATE TABLE two_partitions (k INT PRIMARY KEY, v INT)`) - v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}) + sqlDB.Exec(t, createTableStmt(`two_partitions`)) + v, err := NewFingerprintValidator(sqlDBRaw, `foo`, `two_partitions`, []string{`p0`, `p1`}, testColumns) require.NoError(t, err) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":1}}`, ts[1]) v.NoteRow(ignored, `[1]`, `{"after": {"k":1,"v":2}}`, ts[2]) diff --git a/pkg/cmd/roachtest/cdc.go b/pkg/cmd/roachtest/cdc.go index 384d7972ec6d..a3f7077b77d1 100644 --- a/pkg/cmd/roachtest/cdc.go +++ b/pkg/cmd/roachtest/cdc.go @@ -314,7 +314,7 @@ func runCDCBank(ctx context.Context, t *test, c *cluster) { } const requestedResolved = 100 - fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions) + fprintV, err := cdctest.NewFingerprintValidator(db, `bank.bank`, `fprint`, tc.partitions, 0) if err != nil { return err } diff --git a/pkg/util/fsm/fsm.go b/pkg/util/fsm/fsm.go index 4ca8f9b86fae..10dc41e52abe 100644 --- a/pkg/util/fsm/fsm.go +++ b/pkg/util/fsm/fsm.go @@ -80,6 +80,11 @@ type Transitions struct { expanded Pattern } +// GetExpanded returns the expanded map of transitions. +func (t Transitions) GetExpanded() Pattern { + return t.expanded +} + // Compile creates a set of state Transitions from a Pattern. This is relatively // expensive so it's expected that Compile is called once for each transition // graph and assigned to a static variable. This variable can then be given to