Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Nov 3, 2024
1 parent 8b7bd28 commit 7f5035e
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 25 deletions.
2 changes: 1 addition & 1 deletion relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Run(ctx context.Context, cfg Config) error {
pricer := newTokenPricer(ctx)
pnl := newPnlLogger(network.ID, pricer)

attestStreamer := newLeaderStreamer(cprov, network)
attestStreamer := newLeaderStreamer(cprov.StreamAttestations, network.ID)

for _, destChain := range network.EVMChains() {
// Setup send provider
Expand Down
8 changes: 4 additions & 4 deletions relayer/app/cursors_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ const mockValSetID = 99

type mockProvider struct {
cchain.Provider
SubscribeFn func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback)
StreamFunc func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback) error
}

func (m *mockProvider) StreamAsync(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64,
func (m *mockProvider) StreamAttestations(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64,
_ string, callback cchain.ProviderCallback,
) {
m.SubscribeFn(ctx, chainVer, attestOffset, callback)
) error {
return m.StreamFunc(ctx, chainVer, attestOffset, callback)
}

func (m *mockProvider) PortalValidatorSet(ctx context.Context, valSetID uint64) ([]cchain.PortalValidator, bool, error) {
Expand Down
40 changes: 28 additions & 12 deletions relayer/app/streambuf.go → relayer/app/leadstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package relayer

import (
"context"
"sync"
"time"

"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/umath"
"github.com/omni-network/omni/lib/xchain"

"github.com/tidwall/btree"
"sync"
"time"
)

// errStop is returned when the leader should stop streaming.
Expand All @@ -24,12 +27,21 @@ func getLimit(network netconf.ID) int {
return 10_000 // 10k atts & 1KB per attestation & 10 chain versions ~= 100MB
}

// getBackoff returns the duration to backoff before querying the cache again.
func getBackoff(network netconf.ID) time.Duration {
if network == netconf.Simnet {
return time.Millisecond // No backoff in tests
}

return time.Second // Default 1 second blocks otherwise
}

// leadChaosTimeout returns a function that returns true if the leader chaos timeout has been reached.
// This ensures we rotate leaders after a certain time (and test leader rotation).
func leadChaosTimeout(network netconf.ID) func() bool {
t0 := time.Now()
return func() bool {
duration := time.Hour
duration := time.Hour // Default 1 hour timeout
if network == netconf.Devnet {
duration = time.Second * 10
} else if network == netconf.Staging {
Expand All @@ -38,7 +50,6 @@ func leadChaosTimeout(network netconf.ID) func() bool {

return time.Since(t0) > duration
}

}

// leader tracks a worker actively streaming attestations and adding them to the cache.
Expand Down Expand Up @@ -95,6 +106,8 @@ type attestBuffer struct {
// Add adds an attestation to the cache if the cache is not full or
// if the attestation is not too old.
// It returns true if it was added and an existing key was replaced.
//
//nolint:nonamedreturns // Name for clarify of API.
func (b *attestBuffer) Add(att xchain.Attestation) (replaced bool) {
b.mu.Lock()
defer b.mu.Unlock()
Expand Down Expand Up @@ -175,27 +188,30 @@ type attestStreamer func(ctx context.Context, chainVer xchain.ChainVersion, atte

// newLeaderStreamer returns a new attestStreamer that avoids multiple overlapping streaming queries
// by selecting a leader to query each range of offsets.
func newLeaderStreamer(cprov cchain.Provider, network netconf.Network) attestStreamer {
func newLeaderStreamer(upstream attestStreamer, network netconf.ID) attestStreamer {
var buffers sync.Map // map[xchain.ChainVer]*attestBuffer

return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, workerName string, callback cchain.ProviderCallback) error {
anyBuffer, _ := buffers.LoadOrStore(chainVer, newAttestBuffer(getLimit(network.ID)))
buffer := anyBuffer.(*attestBuffer)
anyBuffer, _ := buffers.LoadOrStore(chainVer, newAttestBuffer(getLimit(network)))
buffer := anyBuffer.(*attestBuffer) //nolint:revive,forcetypeassert // Type is known

name := network.ChainVersionName(chainVer)
name := netconf.ChainVersionNamer(network)(chainVer)

// Track the offset of the last attestation we "processed"
prevOffset := fromOffset - 1
prevOffset, ok := umath.Subtract(fromOffset, 1)
if !ok {
return errors.New("attest from offset zero [BUG]", "from", fromOffset)
}

// lead blocks and streams attestations from the provided height using the provided leader.
// It populates the cache with fetched attestations.
// It returns nil if it detects an overlap with another leader or on chaos timeout.
lead := func(l *leader, from uint64) error {
defer l.Delete()
log.Debug(ctx, "Starting attest stream", "chain", name, "offset", from, "worker", workerName)
timeout := leadChaosTimeout(network.ID)
timeout := leadChaosTimeout(network)

err := cprov.StreamAttestations(ctx, chainVer, from, workerName, func(ctx context.Context, att xchain.Attestation) error {
err := upstream(ctx, chainVer, from, workerName, func(ctx context.Context, att xchain.Attestation) error {
l.IncRange(att.AttestOffset)
replaced := buffer.Add(att)

Expand Down Expand Up @@ -258,7 +274,7 @@ func newLeaderStreamer(cprov cchain.Provider, network netconf.Network) attestStr
}

// Otherwise, wait a bit, and try the same offset again
timer.Reset(time.Second)
timer.Reset(getBackoff(network))
}
}
}
Expand Down
147 changes: 147 additions & 0 deletions relayer/app/leadstream_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package relayer

import (
"context"
"maps"
"sync"
"testing"

"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/xchain"

"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

// TestLeaderStreamer tests the leader streamer.
func TestLeaderStreamer(t *testing.T) {
t.Parallel()

// upstream tracks upstream requests and returns test responses
upstream := newUpstream()
upstreamFunc := func(
ctx context.Context,
chainVer xchain.ChainVersion,
attestOffset uint64,
workerName string,
callback cchain.ProviderCallback,
) error {
resps := upstream.Req(workerName, attestOffset)
for {
select {
case <-ctx.Done():
return ctx.Err()
case resp := <-resps:
err := callback(ctx, xchain.Attestation{AttestHeader: xchain.AttestHeader{ChainVersion: chainVer, AttestOffset: resp}})
if err != nil {
return err
}
}
}
}

streamer := newLeaderStreamer(upstreamFunc, netconf.Simnet)

ctx := context.Background()
var eg errgroup.Group
errDone := errors.New("done")

// startStream starts a worker stream from `from` to `to`, ensuring strictly sequential attestations.
startStream := func(worker string, from uint64, to uint64) error {
next := from
return streamer(ctx, xchain.ChainVersion{}, from, worker, func(ctx context.Context, att xchain.Attestation) error {
if att.AttestOffset != next {
return errors.New("unexpected offset")
}
next++

if att.AttestOffset == to {
return errDone
}

return nil
})
}

w1 := "worker1"
w2 := "worker2"
w3 := "worker3"

eg.Go(func() error {
// worker 1 streams from 3 to 5 as leader
return startStream(w1, 3, 5)
})
upstream.Respond(w1, 3) // w1 starts leader streaming at 3
eg.Go(func() error {
// worker 2 streams from 1 to 7, 1-4 as leader, 4-5 from cache, 6-7 as leader
return startStream(w2, 1, 7)
})
upstream.Respond(w2, 1) // w2 starts leader streaming at 1
upstream.Respond(w1, 4) // w1 continues leader streaming
upstream.Respond(w2, 2) // w2 continues leader streaming
upstream.Respond(w2, 3) // w2 overlaps w1, switch to cache
upstream.Respond(w1, 5) // w1 is done
upstream.Respond(w2, 6) // w2 switches back to lead streaming
upstream.Respond(w2, 7) // w2 is done

eg.Go(func() error {
// worker 3 streams from 8 to 8 as leader
return startStream(w3, 1, 8)
})
upstream.Respond(w3, 8) // w3 starts leader streaming at 8 and is done

require.ErrorIs(t, eg.Wait(), errDone)
require.EqualValues(t, map[string][]uint64{
w1: {3},
w2: {1, 6},
w3: {8},
}, upstream.Reqs())
}

func newUpstream() *upstream {
return &upstream{
reqs: make(map[string][]uint64),
resps: make(map[string]chan uint64),
}
}

type upstream struct {
mu sync.Mutex
reqs map[string][]uint64
resps map[string]chan uint64
}

func (u *upstream) Respond(worker string, offset uint64) {
u.mu.Lock()
resp, ok := u.resps[worker]
if !ok {
resp = make(chan uint64)
u.resps[worker] = resp
}
u.mu.Unlock()

resp <- offset
}

func (u *upstream) Reqs() map[string][]uint64 {
u.mu.Lock()
defer u.mu.Unlock()

return maps.Clone(u.reqs)
}

func (u *upstream) Req(worker string, from uint64) chan uint64 {
u.mu.Lock()
u.reqs[worker] = append(u.reqs[worker], from)

resp, ok := u.resps[worker]
if !ok {
resp = make(chan uint64)
u.resps[worker] = resp
}
u.mu.Unlock()

return resp
}
1 change: 0 additions & 1 deletion relayer/app/types_internal_test.go

This file was deleted.

4 changes: 3 additions & 1 deletion relayer/app/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package relayer

import (
"context"
"golang.org/x/sync/errgroup"
"sync/atomic"
"time"

Expand All @@ -15,6 +14,8 @@ import (
"github.com/omni-network/omni/lib/xchain"

"github.com/ethereum/go-ethereum/accounts/abi/bind"

"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -210,6 +211,7 @@ func (w *Worker) newCallback(
var cachedValSet []cchain.PortalValidator
var prevOffset uint64

//nolint:nonamedreturns // Required for defer.
return func(ctx context.Context, att xchain.Attestation) (err error) {
// Sanity check strictly sequential offsets.
{
Expand Down
20 changes: 14 additions & 6 deletions relayer/app/worker_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/xchain"

Expand Down Expand Up @@ -96,12 +97,18 @@ func TestWorker_Run(t *testing.T) {

// Provider mock attestations as requested until context canceled.
mockProvider := &mockProvider{
SubscribeFn: func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback) {
StreamFunc: func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, callback cchain.ProviderCallback) error {
if chainVer.ID != srcChain {
return // Only subscribe to source chain.
return errors.New("not source chain")
}
if attestOffset == 1 {
// Block other streams
<-ctx.Done()
return ctx.Err()
}

if attestOffset != destChainACursor && attestOffset != destChainBCursor {
return
return errors.New("unexpected offset", "offset", attestOffset, "chain_a", destChainACursor, "chain_b", destChainBCursor)
}

offset := attestOffset
Expand Down Expand Up @@ -132,10 +139,10 @@ func TestWorker_Run(t *testing.T) {
}
}

for ctx.Err() == nil {
for {
err := callback(ctx, nextAtt())
if ctx.Err() != nil {
return
return nil //nolint:nilerr // Return nil as per contract
}
require.NoError(t, err)
}
Expand All @@ -158,7 +165,8 @@ func TestWorker_Run(t *testing.T) {
mockXClient,
mockCreateFunc,
func() (SendAsync, error) { return mockSender.SendTransaction, nil },
noAwait)
noAwait,
mockProvider.StreamAttestations)
go w.Run(ctx)
}

Expand Down

0 comments on commit 7f5035e

Please sign in to comment.