Skip to content

Commit

Permalink
op-service: Test BasicRPCReceiptsFetcher concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst committed Nov 30, 2023
1 parent 5267327 commit e01f9ca
Showing 1 changed file with 85 additions and 0 deletions.
85 changes: 85 additions & 0 deletions op-service/sources/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,3 +570,88 @@ func TestVerifyReceipts(t *testing.T) {
require.ErrorContains(t, err, "must never be removed due to reorg")
})
}

func TestBasicRPCReceiptsFetcherConcurrency(t *testing.T) {
require := require.New(t)
batchSize, txCount := 4, uint64(18) // 4.5 * 4
block, receipts := randomRpcBlockAndReceipts(rand.New(rand.NewSource(123)), txCount)
txHashes := make([]common.Hash, 0, len(receipts))
recMap := make(map[common.Hash]*types.Receipt, len(receipts))
for _, rec := range receipts {
require.NotEqual((common.Hash{}), rec.TxHash)
txHashes = append(txHashes, rec.TxHash)
recMap[rec.TxHash] = rec
}
blockid := eth.BlockID{
Hash: block.Hash,
Number: uint64(block.Number),
}
mrpc := new(mockRPC)
rp := NewBasicRPCReceiptsFetcher(mrpc, batchSize)

// prepare mock
ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
defer done()
var numCalls int
mrpc.On("BatchCallContext", ctx, mock.AnythingOfType("[]rpc.BatchElem")).
Run(func(args mock.Arguments) {
numCalls++
els := args.Get(1).([]rpc.BatchElem)
for _, el := range els {
if el.Method == "eth_getTransactionReceipt" {
txHash := el.Args[0].(common.Hash)
// The IterativeBatchCall expects that the values are written
// to the fields of the allocated *types.Receipt.
**(el.Result.(**types.Receipt)) = *recMap[txHash]
}
}
}).
Return([]error{nil})

// start n fetchers
const n = 32
type fetchResult struct {
rs types.Receipts
err error
}
fetchResults := make(chan fetchResult, n)
barrier := make(chan struct{})
for i := 0; i < n; i++ {
go func() {
<-barrier
recs, err := rp.FetchReceipts(ctx, blockid, txHashes)
fetchResults <- fetchResult{rs: recs, err: err}
}()
}
close(barrier) // Go!

// assert results
for i := 0; i < n; i++ {
select {
case f := <-fetchResults:
json, err := json.MarshalIndent(f.rs, "", " ")
require.NoError(err)
t.Log(string(json))
require.NoError(f.err)
require.Len(f.rs, len(receipts))
for j, r := range receipts {
requireEqualReceipt(t, r, f.rs[j])
}
case <-ctx.Done():
t.Fatal("Test timeout")
}
}

mrpc.AssertExpectations(t)
require.NotZero(numCalls, "BatchCallContext should have been called.")
require.Less(numCalls, n, "Some IterativeBatchCalls should have been shared.")
}

func requireEqualReceipt(t *testing.T, exp, act *types.Receipt) {
t.Helper()
expJson, err := json.MarshalIndent(exp, "", " ")
require.NoError(t, err)
actJson, err := json.MarshalIndent(act, "", " ")
require.NoError(t, err)
require.Equal(t, string(expJson), string(actJson))
}

0 comments on commit e01f9ca

Please sign in to comment.