diff --git a/tasks/scheduler.go b/tasks/scheduler.go index a1906f5a2..2a40ee4f2 100644 --- a/tasks/scheduler.go +++ b/tasks/scheduler.go @@ -54,6 +54,7 @@ type deliverTxTask struct { AbsoluteIndex int Response *types.ResponseDeliverTx VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore + TxTracer sdk.TxTracer } // AppendDependencies appends the given indexes to the task's dependencies @@ -83,6 +84,10 @@ func (dt *deliverTxTask) Reset() { dt.Abort = nil dt.AbortCh = nil dt.VersionStores = nil + + if dt.TxTracer != nil { + dt.TxTracer.Reset() + } } func (dt *deliverTxTask) Increment() { @@ -187,7 +192,9 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa AbsoluteIndex: r.AbsoluteIndex, Status: statusPending, Dependencies: map[int]struct{}{}, + TxTracer: r.TxTracer, } + tasksMap[r.AbsoluteIndex] = task allTasks = append(allTasks, task) } @@ -198,6 +205,10 @@ func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDel res := make([]types.ResponseDeliverTx, 0, len(tasks)) for _, t := range tasks { res = append(res, *t.Response) + + if t.TxTracer != nil { + t.TxTracer.Commit() + } } return res } @@ -508,6 +519,10 @@ func (s *scheduler) prepareTask(task *deliverTxTask) { ctx = ctx.WithMultiStore(ms) } + if task.TxTracer != nil { + ctx = task.TxTracer.InjectInContext(ctx) + } + task.AbortCh = abortCh task.Ctx = ctx } @@ -518,14 +533,21 @@ func (s *scheduler) executeTask(task *deliverTxTask) { task.Ctx = dCtx // in the synchronous case, we only want to re-execute tasks that need re-executing - // if already validated, then this does another validation - if s.synchronous && task.IsStatus(statusValidated) { - s.shouldRerun(task) + if s.synchronous { + // if already validated, then this does another validation if task.IsStatus(statusValidated) { - return + s.shouldRerun(task) + if task.IsStatus(statusValidated) { + return + } + } + + // waiting transactions may not yet have been reset + // this ensures a task has been reset and incremented + if !task.IsStatus(statusPending) { + task.Reset() + task.Increment() } - task.Reset() - task.Increment() } s.prepareTask(task) diff --git a/tasks/scheduler_test.go b/tasks/scheduler_test.go index 84f3daae0..09edaaad1 100644 --- a/tasks/scheduler_test.go +++ b/tasks/scheduler_test.go @@ -395,6 +395,45 @@ func TestProcessAll(t *testing.T) { }, expectedErr: nil, }, + { + name: "Test tx Reset properly before re-execution via tracer", + workers: 10, + runs: 1, + addStores: true, + requests: addTxTracerToTxEntries(requestList(250)), + deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) { + defer abortRecoveryFunc(&res) + wait := rand.Intn(10) + time.Sleep(time.Duration(wait) * time.Millisecond) + // all txs read and write to the same key to maximize conflicts + kv := ctx.MultiStore().GetKVStore(testStoreKey) + val := string(kv.Get(itemKey)) + time.Sleep(time.Duration(wait) * time.Millisecond) + // write to the store with this tx's index + newVal := val + fmt.Sprintf("%d", ctx.TxIndex()) + kv.Set(itemKey, []byte(newVal)) + + if v, ok := ctx.Context().Value("test_tracer").(*testTxTracer); ok { + v.OnTxExecute() + } + + // return what was read from the store (final attempt should be index-1) + return types.ResponseDeliverTx{ + Info: newVal, + } + }, + assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) { + expected := "" + for idx, response := range res { + expected = expected + fmt.Sprintf("%d", idx) + require.Equal(t, expected, response.Info) + } + // confirm last write made it to the parent store + latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey) + require.Equal(t, expected, string(latest)) + }, + expectedErr: nil, + }, } for _, tt := range tests { @@ -428,3 +467,42 @@ func TestProcessAll(t *testing.T) { }) } } + +func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry) []*sdk.DeliverTxEntry { + for _, txEntry := range txEntries { + txEntry.TxTracer = newTestTxTracer(txEntry.AbsoluteIndex) + } + + return txEntries +} + +var _ sdk.TxTracer = &testTxTracer{} + +func newTestTxTracer(txIndex int) *testTxTracer { + return &testTxTracer{txIndex: txIndex, canExecute: true} +} + +type testTxTracer struct { + txIndex int + canExecute bool +} + +func (t *testTxTracer) Commit() { + t.canExecute = false +} + +func (t *testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context { + return ctx.WithContext(context.WithValue(ctx.Context(), "test_tracer", t)) +} + +func (t *testTxTracer) Reset() { + t.canExecute = true +} + +func (t *testTxTracer) OnTxExecute() { + if !t.canExecute { + panic(fmt.Errorf("task #%d was asked to execute but the tracer is not in the correct state, most probably due to missing Reset call or over execution", t.txIndex)) + } + + t.canExecute = false +} diff --git a/types/tx_batch.go b/types/tx_batch.go index 3a835715e..aade739c9 100644 --- a/types/tx_batch.go +++ b/types/tx_batch.go @@ -6,13 +6,14 @@ import ( ) // DeliverTxEntry represents an individual transaction's request within a batch. -// This can be extended to include tx-level tracing or metadata +// This can be extended to include tx-level metadata type DeliverTxEntry struct { Request abci.RequestDeliverTx SdkTx Tx Checksum [32]byte AbsoluteIndex int EstimatedWritesets MappedWritesets + TxTracer TxTracer } // EstimatedWritesets represents an estimated writeset for a transaction mapped by storekey to the writeset estimate. diff --git a/types/tx_tracer.go b/types/tx_tracer.go new file mode 100644 index 000000000..6f3a074c1 --- /dev/null +++ b/types/tx_tracer.go @@ -0,0 +1,44 @@ +package types + +// TxTracer is an interface for tracing transactions generic +// enough to be used by any transaction processing engine be it +// CoWasm or EVM. +// +// The TxTracer responsibility is to inject itself in the context +// that will be used to process the transaction. How the context +// will be used afterward is up to the transaction processing engine. +// +// Today, only EVM transaction processing engine do something with the +// TxTracer (it inject itself into the EVM execution context for +// go-ethereum level tracing). +// +// The TxTracer receives signals from the scheduler when the tracer +// should be reset because the transaction is being re-executed and +// when the transaction is committed. +type TxTracer interface { + // InjectInContext injects the transaction specific tracer in the context + // that will be used to process the transaction. + // + // For now only the EVM transaction processing engine uses the tracer + // so it only make sense to inject an EVM tracer. Future updates might + // add the possibility to inject a tracer for other transaction kind. + // + // Which tracer implementation to provied and how will be retrieved later on + // from the context is dependent on the transaction processing engine. + InjectInContext(ctx Context) Context + + // Reset is called when the transaction is being re-executed and the tracer + // should be reset. A transaction executed by the OCC parallel engine might + // be re-executed multiple times before being committed, each time `Reset` + // will be called. + // + // When Reset is received, it means everything that was traced before should + // be discarded. + Reset() + + // Commit is called when the transaction is committed. This is the last signal + // the tracer will receive for a given transaction. After this call, the tracer + // should do whatever it needs to forward the tracing information to the + // appropriate place/collector. + Commit() +}