Skip to content

Commit

Permalink
Modify max incarnation fallback (#460)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context

## Testing performed to validate your change

---------

Co-authored-by: Steven Landers <[email protected]>
  • Loading branch information
udpatil and stevenlanders authored Mar 14, 2024
1 parent db786e7 commit 7f668bd
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 29 deletions.
2 changes: 2 additions & 0 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,11 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {
if value != nil {
// conflict
// TODO: would we want to return early?
conflictSet[latestValue.Index()] = struct{}{}
valid = false
}
} else if !bytes.Equal(latestValue.Value(), value) {
conflictSet[latestValue.Index()] = struct{}{}
valid = false
}
}
Expand Down
14 changes: 7 additions & 7 deletions store/multiversion/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,20 @@ func TestMultiVersionStoreValidateState(t *testing.T) {
"key3": []byte("value6"),
})

// expect failure with empty conflicts
// expect failure with conflict of tx 2
valid, conflicts = mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Empty(t, conflicts)
require.Equal(t, []int{2}, conflicts)

// add a conflict due to deletion
mvs.SetWriteset(3, 1, map[string][]byte{
"key1": nil,
})

// expect failure with empty conflicts
// expect failure with conflict of tx 2 and 3
valid, conflicts = mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Empty(t, conflicts)
require.Equal(t, []int{2, 3}, conflicts)

// add a conflict due to estimate
mvs.SetEstimatedWriteset(4, 1, map[string][]byte{
Expand All @@ -230,7 +230,7 @@ func TestMultiVersionStoreValidateState(t *testing.T) {
// expect index 4 to be returned
valid, conflicts = mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Equal(t, []int{4}, conflicts)
require.Equal(t, []int{2, 3, 4}, conflicts)
}

func TestMultiVersionStoreParentValidationMismatch(t *testing.T) {
Expand Down Expand Up @@ -436,10 +436,10 @@ func TestMVSIteratorValidationWithKeySwitch(t *testing.T) {
writeset2["key3"] = []byte("valueX")
mvs.SetWriteset(2, 2, writeset2)

// should be invalid
// should be invalid with conflict of 2
valid, conflicts := mvs.ValidateTransactionState(5)
require.False(t, valid)
require.Empty(t, conflicts)
require.Equal(t, []int{2}, conflicts)
}

func TestMVSIteratorValidationWithKeyAdded(t *testing.T) {
Expand Down
58 changes: 37 additions & 21 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (
statusValidated status = "validated"
// statusWaiting tasks are waiting for another tx to complete
statusWaiting status = "waiting"
// maximumIncarnation before we revert to sequential (for high conflict rates)
maximumIncarnation = 5
// maximumIterations before we revert to sequential (for high conflict rates)
maximumIterations = 10
)

type deliverTxTask struct {
Expand All @@ -48,7 +48,6 @@ type deliverTxTask struct {
Status status
Dependencies map[int]struct{}
Abort *occ.Abort
Index int
Incarnation int
Request types.RequestDeliverTx
SdkTx sdk.Tx
Expand Down Expand Up @@ -181,13 +180,12 @@ func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) {
func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) {
tasksMap := make(map[int]*deliverTxTask)
allTasks := make([]*deliverTxTask, 0, len(reqs))
for idx, r := range reqs {
for _, r := range reqs {
task := &deliverTxTask{
Request: r.Request,
SdkTx: r.SdkTx,
Checksum: r.Checksum,
AbsoluteIndex: r.AbsoluteIndex,
Index: idx,
Status: statusPending,
Dependencies: map[int]struct{}{},
}
Expand Down Expand Up @@ -273,6 +271,8 @@ func (s *scheduler) emitMetrics() {
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) {
var iterations int

// initialize mutli-version stores if they haven't been initialized yet
s.tryInitMultiVersionStore(ctx)
// prefill estimates
Expand Down Expand Up @@ -302,35 +302,33 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t

toExecute := tasks
for !allValidated(tasks) {
// if the max incarnation >= 5, we should revert to synchronous
if s.maxIncarnation >= maximumIncarnation {
// if the max incarnation >= x, we should revert to synchronous
if iterations >= maximumIterations {
// process synchronously
s.synchronous = true
// execute all non-validated tasks (no more "waiting" status)
toExecute = filterTasks(tasks, func(t *deliverTxTask) bool {
return !t.IsStatus(statusValidated)
})
startIdx, anyLeft := s.findFirstNonValidated()
if !anyLeft {
break
}
toExecute = tasks[startIdx:]
}

var err error

// execute sets statuses of tasks to either executed or aborted
if len(toExecute) > 0 {
err = s.executeAll(ctx, toExecute)
if err != nil {
return nil, err
}
if err := s.executeAll(ctx, toExecute); err != nil {
return nil, err
}

// validate returns any that should be re-executed
// note this processes ALL tasks, not just those recently executed
toExecute, err = s.validateAll(ctx, tasks)
toExecute, err := s.validateAll(ctx, tasks)
if err != nil {
return nil, err
}
// these are retries which apply to metrics
s.metrics.retries += len(toExecute)
iterations++
}

for _, mv := range s.multiVersionStores {
mv.WriteLatestToStore()
}
Expand Down Expand Up @@ -434,7 +432,12 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del

// ExecuteAll executes all tasks concurrently
func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
if len(tasks) == 0 {
return nil
}

ctx, span := s.traceSpan(ctx, "SchedulerExecuteAll", nil)
span.SetAttributes(attribute.Bool("synchronous", s.synchronous))
defer span.End()

// validationWg waits for all validations to complete
Expand Down Expand Up @@ -467,7 +470,6 @@ func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask)
spanCtx, span := s.tracingInfo.StartWithContext(name, ctx.TraceSpanContext())
if task != nil {
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(task.Request.Tx))))
span.SetAttributes(attribute.Int("txIndex", task.Index))
span.SetAttributes(attribute.Int("absoluteIndex", task.AbsoluteIndex))
span.SetAttributes(attribute.Int("txIncarnation", task.Incarnation))
}
Expand Down Expand Up @@ -514,6 +516,17 @@ func (s *scheduler) executeTask(task *deliverTxTask) {
defer dSpan.End()
task.Ctx = dCtx

// in the synchronous case, we only want to re-execute tasks that need re-executing
// if already validated, then this does another validation
if s.synchronous && task.IsStatus(statusValidated) {
s.shouldRerun(task)
if task.IsStatus(statusValidated) {
return
}
task.Reset()
task.Increment()
}

s.prepareTask(task)

resp := s.deliverTx(task.Ctx, task.Request, task.SdkTx, task.Checksum)
Expand All @@ -528,10 +541,13 @@ func (s *scheduler) executeTask(task *deliverTxTask) {
abort, ok := <-task.AbortCh
if ok {
// if there is an abort item that means we need to wait on the dependent tx
task.SetStatus(statusWaiting)
task.SetStatus(statusAborted)
task.Abort = &abort
task.AppendDependencies([]int{abort.DependentTxIdx})
}
for _, v := range task.VersionStores {
v.WriteEstimatesToMultiVersionStore()
}
return
}

Expand Down
75 changes: 74 additions & 1 deletion tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
_ "net/http/pprof"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -322,6 +324,77 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test every tx accesses same key with estimated writesets",
workers: 50,
runs: 1,
addStores: true,
requests: requestListWithEstimatedWritesets(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))

// write to the store with this tx's index
kv.Set(itemKey, req.Tx)

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: val,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
for idx, response := range res {
if idx == 0 {
require.Equal(t, "", response.Info)
} else {
// the info is what was read from the kv store by the tx
// each tx writes its own index, so the info should be the index of the previous tx
require.Equal(t, fmt.Sprintf("%d", idx-1), response.Info)
}
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, []byte(fmt.Sprintf("%d", len(res)-1)), latest)
},
expectedErr: nil,
},
{
name: "Test every tx accesses same key with delays",
workers: 50,
runs: 1,
addStores: true,
requests: requestList(1000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
time.Sleep(time.Duration(wait) * time.Millisecond)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
time.Sleep(time.Duration(wait) * time.Millisecond)
// write to the store with this tx's index
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: newVal,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
expected := ""
for idx, response := range res {
expected = expected + fmt.Sprintf("%d", idx)
require.Equal(t, expected, response.Info)
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, expected, string(latest))
},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand All @@ -343,7 +416,7 @@ func TestProcessAll(t *testing.T) {
}

res, err := s.ProcessAll(ctx, tt.requests)
require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIncarnation)
require.LessOrEqual(t, s.(*scheduler).maxIncarnation, maximumIterations)
require.Len(t, res, len(tt.requests))

if !errors.Is(err, tt.expectedErr) {
Expand Down

0 comments on commit 7f668bd

Please sign in to comment.