diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 805c094418b47..f36cd3c546d39 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -10408,3 +10408,25 @@ func TestDedup5(t *testing.T) { assert.Error(t, err) assert.NoError(t, insertTxn.Commit(ctx)) } + +func TestReplayDebugLog(t *testing.T) { + ctx := context.Background() + + opts := config.WithLongScanAndCKPOpts(nil) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + schema := catalog.MockSchemaAll(3, 2) + schema.Extra.BlockMaxRows = 5 + schema.Extra.ObjectMaxBlocks = 256 + tae.BindSchema(schema) + bat := catalog.MockBatch(schema, 5) + tae.CreateRelAndAppend(bat, true) + + fault.Enable() + defer fault.Disable() + fault.AddFaultPoint(ctx, "replay debug log", ":::", "echo", 0, "debug") + defer fault.RemoveFaultPoint(ctx, "replay debug log") + + tae.Restart(ctx) + +} diff --git a/pkg/vm/engine/tae/txn/txnimpl/appendcmd.go b/pkg/vm/engine/tae/txn/txnimpl/appendcmd.go index e15bdf31362dd..ec23e59a08ebf 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/appendcmd.go +++ b/pkg/vm/engine/tae/txn/txnimpl/appendcmd.go @@ -93,7 +93,7 @@ func NewAppendCmd(id uint32, node *anode, data *containers.Batch, isTombstone bo return impl } func (c *AppendCmd) Desc() string { - s := fmt.Sprintf("CmdName=InsertNode;ID=%d;TS=%d;Dests=[", c.ID, c.Ts) + s := fmt.Sprintf("CmdName=InsertNode;ID=%d;TS=%v;Dests=[", c.ID, c.Ts.ToString()) for _, info := range c.Infos { s = fmt.Sprintf("%s %s", s, info.Desc()) } @@ -104,7 +104,7 @@ func (c *AppendCmd) Desc() string { return s } func (c *AppendCmd) String() string { - s := fmt.Sprintf("CmdName=InsertNode;ID=%d;TS=%d;Dests=[", c.ID, c.Ts) + s := fmt.Sprintf("CmdName=InsertNode;ID=%d;TS=%v;Dests=[", c.ID, c.Ts.ToString()) for _, info := range c.Infos { s = fmt.Sprintf("%s%s", s, info.String()) } diff --git a/pkg/vm/engine/tae/txn/txnimpl/replaystore.go b/pkg/vm/engine/tae/txn/txnimpl/replaystore.go index 4c298abd75489..bf9cba31e581d 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/replaystore.go +++ b/pkg/vm/engine/tae/txn/txnimpl/replaystore.go @@ -19,6 +19,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" @@ -28,6 +29,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" ) +var ErrDebugReplay = moerr.NewInternalErrorNoCtx("debug") + type replayTxnStore struct { txnbase.NoopTxnStore Cmd *txnbase.TxnCmd @@ -112,23 +115,38 @@ func (store *replayTxnStore) prepareCmd(txncmd txnif.TxnCmd) { *catalog.EntryCommand[*catalog.MetadataMVCCNode, *catalog.BlockNode]: store.catalog.ReplayCmd(txncmd, store.dataFactory, store.Observer) case *AppendCmd: - store.replayAppendData(cmd, store.Observer) + store.replayAppendData( + cmd, store.Observer) case *updates.UpdateCmd: - store.replayDataCmds(cmd, store.Observer) + store.replayDataCmds( + cmd, store.Observer) } } func (store *replayTxnStore) replayAppendData(cmd *AppendCmd, observer wal.ReplayObserver) { hasActive := false + _, sarg, _ := fault.TriggerFault("replay debug log") for _, info := range cmd.Infos { id := info.GetDest() database, err := store.catalog.GetDatabaseByID(id.DbID) + if sarg != "" { + err = ErrDebugReplay + } if err != nil { - panic(err) + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if err != ErrDebugReplay { + panic(err) + } } blk, err := database.GetObjectEntryByID(id, cmd.IsTombstone) + if sarg != "" { + err = ErrDebugReplay + } if err != nil { - panic(err) + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if err != ErrDebugReplay { + panic(err) + } } if !blk.IsActive() { continue @@ -151,12 +169,24 @@ func (store *replayTxnStore) replayAppendData(cmd *AppendCmd, observer wal.Repla for _, info := range cmd.Infos { id := info.GetDest() database, err := store.catalog.GetDatabaseByID(id.DbID) + if sarg != "" { + err = ErrDebugReplay + } if err != nil { - panic(err) + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if err != ErrDebugReplay { + panic(err) + } } blk, err := database.GetObjectEntryByID(id, cmd.IsTombstone) + if sarg != "" { + err = ErrDebugReplay + } if err != nil { - panic(err) + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if err != ErrDebugReplay { + panic(err) + } } if !blk.IsActive() { continue @@ -168,8 +198,11 @@ func (store *replayTxnStore) replayAppendData(cmd *AppendCmd, observer wal.Repla bat := data.CloneWindow(int(start), int(info.GetSrcLen())) bat.Compact() defer bat.Close() - if err = blk.GetObjectData().OnReplayAppendPayload(bat); err != nil { - panic(err) + if err = blk.GetObjectData().OnReplayAppendPayload(bat); err != nil || sarg != "" { + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if sarg == "" { + panic(err) + } } } } @@ -187,12 +220,25 @@ func (store *replayTxnStore) replayAppend(cmd *updates.UpdateCmd, observer wal.R appendNode := cmd.GetAppendNode() id := appendNode.GetID() database, err := store.catalog.GetDatabaseByID(id.DbID) + _, sarg, _ := fault.TriggerFault("replay debug log") + if sarg != "" { + err = ErrDebugReplay + } if err != nil { - panic(err) + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if err != ErrDebugReplay { + panic(err) + } } obj, err := database.GetObjectEntryByID(id, cmd.GetAppendNode().IsTombstone()) + if sarg != "" { + err = ErrDebugReplay + } if err != nil { - panic(err) + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if err != ErrDebugReplay { + panic(err) + } } if !obj.IsActive() { return @@ -200,7 +246,10 @@ func (store *replayTxnStore) replayAppend(cmd *updates.UpdateCmd, observer wal.R if obj.ObjectPersisted() { return } - if err = obj.GetObjectData().OnReplayAppend(appendNode); err != nil { - panic(err) + if err = obj.GetObjectData().OnReplayAppend(appendNode); err != nil || sarg != "" { + logutil.Infof("cmd %v\ncatalog: %v", cmd.String(), store.catalog.SimplePPString(3)) + if sarg == "" { + panic(err) + } } }