Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TxTracer to DeliverTxEntry which is the first step to support tracing when using OCC #478

Merged
merged 2 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type deliverTxTask struct {
AbsoluteIndex int
Response *types.ResponseDeliverTx
VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore
TxTracer sdk.TxTracer
}

// AppendDependencies appends the given indexes to the task's dependencies
Expand Down Expand Up @@ -83,6 +84,10 @@ func (dt *deliverTxTask) Reset() {
dt.Abort = nil
dt.AbortCh = nil
dt.VersionStores = nil

if dt.TxTracer != nil {
dt.TxTracer.Reset()
}
}

func (dt *deliverTxTask) Increment() {
Expand Down Expand Up @@ -187,7 +192,9 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa
AbsoluteIndex: r.AbsoluteIndex,
Status: statusPending,
Dependencies: map[int]struct{}{},
TxTracer: r.TxTracer,
}

tasksMap[r.AbsoluteIndex] = task
allTasks = append(allTasks, task)
}
Expand All @@ -198,6 +205,10 @@ func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDel
res := make([]types.ResponseDeliverTx, 0, len(tasks))
for _, t := range tasks {
res = append(res, *t.Response)

if t.TxTracer != nil {
t.TxTracer.Commit()
}
}
return res
}
Expand Down Expand Up @@ -508,6 +519,10 @@ func (s *scheduler) prepareTask(task *deliverTxTask) {
ctx = ctx.WithMultiStore(ms)
}

if task.TxTracer != nil {
ctx = task.TxTracer.InjectInContext(ctx)
}

task.AbortCh = abortCh
task.Ctx = ctx
}
Expand Down
78 changes: 78 additions & 0 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,45 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test tx Reset properly before re-execution via tracer",
workers: 10,
runs: 1,
addStores: true,
requests: addTxTracerToTxEntries(requestList(250)),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
time.Sleep(time.Duration(wait) * time.Millisecond)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
time.Sleep(time.Duration(wait) * time.Millisecond)
// write to the store with this tx's index
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

if v, ok := ctx.Context().Value("test_tracer").(*testTxTracer); ok {
v.OnTxExecute()
}

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: newVal,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
expected := ""
for idx, response := range res {
expected = expected + fmt.Sprintf("%d", idx)
require.Equal(t, expected, response.Info)
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, expected, string(latest))
},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -428,3 +467,42 @@ func TestProcessAll(t *testing.T) {
})
}
}

func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry) []*sdk.DeliverTxEntry {
for _, txEntry := range txEntries {
txEntry.TxTracer = newTestTxTracer(txEntry.AbsoluteIndex)
}

return txEntries
}

var _ sdk.TxTracer = &testTxTracer{}

func newTestTxTracer(txIndex int) *testTxTracer {
return &testTxTracer{txIndex: txIndex, canExecute: true}
}

type testTxTracer struct {
txIndex int
canExecute bool
}

func (t *testTxTracer) Commit() {
t.canExecute = false
}

func (t *testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context {
return ctx.WithContext(context.WithValue(ctx.Context(), "test_tracer", t))
}

func (t *testTxTracer) Reset() {
t.canExecute = true
}

func (t *testTxTracer) OnTxExecute() {
if !t.canExecute {
panic(fmt.Errorf("task #%d was asked to execute but the tracer is not in the correct state, most probably due to missing Reset call or over execution", t.txIndex))
}

t.canExecute = false
}
3 changes: 2 additions & 1 deletion types/tx_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
)

// DeliverTxEntry represents an individual transaction's request within a batch.
// This can be extended to include tx-level tracing or metadata
// This can be extended to include tx-level metadata
type DeliverTxEntry struct {
Request abci.RequestDeliverTx
SdkTx Tx
Checksum [32]byte
AbsoluteIndex int
EstimatedWritesets MappedWritesets
TxTracer TxTracer
}

// EstimatedWritesets represents an estimated writeset for a transaction mapped by storekey to the writeset estimate.
Expand Down
44 changes: 44 additions & 0 deletions types/tx_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package types

// TxTracer is an interface for tracing transactions generic
// enough to be used by any transaction processing engine be it
// CoWasm or EVM.
//
// The TxTracer responsibility is to inject itself in the context
// that will be used to process the transaction. How the context
// will be used afterward is up to the transaction processing engine.
//
// Today, only EVM transaction processing engine do something with the
// TxTracer (it inject itself into the EVM execution context for
// go-ethereum level tracing).
//
// The TxTracer receives signals from the scheduler when the tracer
// should be reset because the transaction is being re-executed and
// when the transaction is committed.
type TxTracer interface {
// InjectInContext injects the transaction specific tracer in the context
// that will be used to process the transaction.
//
// For now only the EVM transaction processing engine uses the tracer
// so it only make sense to inject an EVM tracer. Future updates might
// add the possibility to inject a tracer for other transaction kind.
//
// Which tracer implementation to provied and how will be retrieved later on
// from the context is dependent on the transaction processing engine.
InjectInContext(ctx Context) Context

// Reset is called when the transaction is being re-executed and the tracer
// should be reset. A transaction executed by the OCC parallel engine might
// be re-executed multiple times before being committed, each time `Reset`
// will be called.
//
// When Reset is received, it means everything that was traced before should
// be discarded.
Reset()

// Commit is called when the transaction is committed. This is the last signal
// the tracer will receive for a given transaction. After this call, the tracer
// should do whatever it needs to forward the tracing information to the
// appropriate place/collector.
Commit()
}
Loading