diff --git a/go.mod b/go.mod index fc2c4d0f3..b578df7d8 100644 --- a/go.mod +++ b/go.mod @@ -149,7 +149,7 @@ require ( github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect - github.com/google/btree v1.1.2 // indirect + github.com/google/btree v1.1.3 // indirect github.com/google/cel-go v0.21.0 // indirect github.com/google/go-containerregistry v0.20.2 // indirect github.com/google/orderedcode v0.0.1 // indirect @@ -249,7 +249,7 @@ require ( github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1 // indirect github.com/tendermint/go-amino v0.16.0 // indirect - github.com/tidwall/btree v1.7.0 // indirect + github.com/tidwall/btree v1.7.0 github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/ulikunitz/xz v0.5.11 // indirect diff --git a/go.sum b/go.sum index cb622e792..c72d3edb1 100644 --- a/go.sum +++ b/go.sum @@ -682,8 +682,8 @@ github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXi github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= -github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI= github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= diff --git a/relayer/app/app.go b/relayer/app/app.go index 3ee34a107..6cc439d29 100644 --- a/relayer/app/app.go +++ b/relayer/app/app.go @@ -67,6 +67,8 @@ func Run(ctx context.Context, cfg Config) error { pricer := newTokenPricer(ctx) pnl := newPnlLogger(network.ID, pricer) + attestStreamer := newLeaderStreamer(cprov, network) + for _, destChain := range network.EVMChains() { // Setup send provider sendProvider := func() (SendAsync, error) { @@ -100,7 +102,8 @@ func Run(ctx context.Context, cfg Config) error { xprov, CreateSubmissions, sendProvider, - awaitValSet) + awaitValSet, + attestStreamer) go worker.Run(ctx) } diff --git a/relayer/app/streambuf.go b/relayer/app/streambuf.go new file mode 100644 index 000000000..67e4792ae --- /dev/null +++ b/relayer/app/streambuf.go @@ -0,0 +1,265 @@ +package relayer + +import ( + "context" + "github.com/omni-network/omni/lib/cchain" + "github.com/omni-network/omni/lib/errors" + "github.com/omni-network/omni/lib/log" + "github.com/omni-network/omni/lib/netconf" + "github.com/omni-network/omni/lib/xchain" + "github.com/tidwall/btree" + "sync" + "time" +) + +// errStop is returned when the leader should stop streaming. +var errStop = errors.New("stop") + +// getLimit returns the limit of attestations to cache per chain version. +func getLimit(network netconf.ID) int { + if network == netconf.Devnet { + return 20 // Tiny buffer in devnet + } + + return 10_000 // 10k atts & 1KB per attestation & 10 chain versions ~= 100MB +} + +// leadChaosTimeout returns a function that returns true if the leader chaos timeout has been reached. +// This ensures we rotate leaders after a certain time (and test leader rotation). +func leadChaosTimeout(network netconf.ID) func() bool { + t0 := time.Now() + return func() bool { + duration := time.Hour + if network == netconf.Devnet { + duration = time.Second * 10 + } else if network == netconf.Staging { + duration = time.Minute + } + + return time.Since(t0) > duration + } + +} + +// leader tracks a worker actively streaming attestations and adding them to the cache. +// It "locks" a range of offsets as leader of that range. +type leader struct { + mu sync.RWMutex + from uint64 + latest uint64 + delete func() +} + +// contains returns true if the offset is within the leader's range. +func (s *leader) contains(offset uint64) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.from <= offset && offset <= s.latest +} + +// IncRange increases the range to the provided height plus one +// since the leader will move on the that height next. +func (s *leader) IncRange(height uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + s.latest = height + 1 +} + +// Delete removes the leader from the buffer. +func (s *leader) Delete() { + s.mu.Lock() + defer s.mu.Unlock() + + s.delete() +} + +func newAttestBuffer(limit int) *attestBuffer { + return &attestBuffer{ + limit: limit, + leaders: make(map[*leader]struct{}), + atts: btree.NewMap[uint64, xchain.Attestation](32), // Degree of 32 is a good default + } +} + +// attestBuffer is an attestations cache being populated by leaders. +// The goal is to avoid overlapping streams. +type attestBuffer struct { + limit int + mu sync.RWMutex + leaders map[*leader]struct{} + atts *btree.Map[uint64, xchain.Attestation] +} + +// Add adds an attestation to the cache if the cache is not full or +// if the attestation is not too old. +// It returns true if it was added and an existing key was replaced. +func (b *attestBuffer) Add(att xchain.Attestation) (replaced bool) { + b.mu.Lock() + defer b.mu.Unlock() + + defer func() { + // Maybe trim if we grew over limit + for b.atts.Len() > b.limit { + height, _, ok := b.atts.PopMin() + if ok && height == att.AttestOffset { + replaced = false // Not added, so not replaced + } + } + }() + + _, replaced = b.atts.Set(att.AttestOffset, att) + + return replaced +} + +// Get returns the attestation at the provided offset or false. +func (b *attestBuffer) Get(offset uint64) (xchain.Attestation, bool) { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.atts.Get(offset) +} + +// containsStreamerUnsafe returns true if any leader contains the offset. +// It is unsafe since it assumes the lock is held. +func (b *attestBuffer) containsStreamerUnsafe(offset uint64) bool { + for s := range b.leaders { + if s.contains(offset) { + return true + } + } + + return false +} + +// MaybeNewLeader returns a new leader if no other leader is already streaming the offset or false. +func (b *attestBuffer) MaybeNewLeader(offset uint64) (*leader, bool) { + b.mu.RLock() + contains := b.containsStreamerUnsafe(offset) + b.mu.RUnlock() + if contains { + return nil, false + } + + b.mu.Lock() + defer b.mu.Unlock() + + // Double check in case another goroutine added a leader + for s := range b.leaders { + if s.contains(offset) { + return nil, false + } + } + + s := &leader{ + from: offset, + latest: offset, + } + s.delete = func() { + b.mu.Lock() + defer b.mu.Unlock() + + delete(b.leaders, s) + } + + b.leaders[s] = struct{}{} + + return s, true +} + +// attestStreamer is a function that streams attestations from a specific offset. +// It abstracts cchain.Provider.StreamAttestations. +type attestStreamer func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, workerName string, callback cchain.ProviderCallback) error + +// newLeaderStreamer returns a new attestStreamer that avoids multiple overlapping streaming queries +// by selecting a leader to query each range of offsets. +func newLeaderStreamer(cprov cchain.Provider, network netconf.Network) attestStreamer { + var buffers sync.Map // map[xchain.ChainVer]*attestBuffer + + return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, workerName string, callback cchain.ProviderCallback) error { + anyBuffer, _ := buffers.LoadOrStore(chainVer, newAttestBuffer(getLimit(network.ID))) + buffer := anyBuffer.(*attestBuffer) + + name := network.ChainVersionName(chainVer) + + // Track the offset of the last attestation we "processed" + prevOffset := fromOffset - 1 + + // lead blocks and streams attestations from the provided height using the provided leader. + // It populates the cache with fetched attestations. + // It returns nil if it detects an overlap with another leader or on chaos timeout. + lead := func(l *leader, from uint64) error { + defer l.Delete() + log.Debug(ctx, "Starting attest stream", "chain", name, "offset", from, "worker", workerName) + timeout := leadChaosTimeout(network.ID) + + err := cprov.StreamAttestations(ctx, chainVer, from, workerName, func(ctx context.Context, att xchain.Attestation) error { + l.IncRange(att.AttestOffset) + replaced := buffer.Add(att) + + if err := callback(ctx, att); err != nil { + return err + } + + prevOffset = att.AttestOffset + + if replaced { + // Another leader already cached this att, so switch to reading from cache + log.Debug(ctx, "Stopping overlapping attest stream", "chain", name, "offset", att.AttestOffset, "worker", workerName) + return errStop + } else if timeout() { + log.Debug(ctx, "Stopping timed-out attest stream", "chain", name, "offset", att.AttestOffset, "worker", workerName) + return errStop + } + + return nil + }) + if errors.Is(err, errStop) { + return nil + } + + return err + } + + timer := time.NewTimer(0) + defer timer.Stop() + + // Loop until the context is closed or error is encountered + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + // Check if the cache is populated + if att, ok := buffer.Get(prevOffset + 1); ok { + // Got it, process the attestation + if err := callback(ctx, att); err != nil { + return err + } + + prevOffset = att.AttestOffset + timer.Reset(0) // Immediately go to next + + continue + } + + // Cache isn't populated, check if we need to start streaming as a new leader + if l, ok := buffer.MaybeNewLeader(prevOffset + 1); ok { + if err := lead(l, prevOffset+1); err != nil { + return err + } + + // Leader stopped gracefully, so try cache immediately (with probably updated prevOffset) + timer.Reset(0) + + continue + } + + // Otherwise, wait a bit, and try the same offset again + timer.Reset(time.Second) + } + } + } +} diff --git a/relayer/app/worker.go b/relayer/app/worker.go index 7818c2f4e..1188c2cbb 100644 --- a/relayer/app/worker.go +++ b/relayer/app/worker.go @@ -2,6 +2,7 @@ package relayer import ( "context" + "golang.org/x/sync/errgroup" "sync/atomic" "time" @@ -22,28 +23,30 @@ const ( ) type Worker struct { - destChain netconf.Chain // Destination chain - network netconf.Network - cProvider cchain.Provider - xProvider xchain.Provider - creator CreateFunc - sendProvider func() (SendAsync, error) - awaitValSet awaitValSet + destChain netconf.Chain // Destination chain + network netconf.Network + cProvider cchain.Provider + xProvider xchain.Provider + attestStreamer attestStreamer + creator CreateFunc + sendProvider func() (SendAsync, error) + awaitValSet awaitValSet } // NewWorker creates a new worker for a single destination chain. func NewWorker(destChain netconf.Chain, network netconf.Network, cProvider cchain.Provider, xProvider xchain.Provider, creator CreateFunc, sendProvider func() (SendAsync, error), - awaitValSet awaitValSet, + awaitValSet awaitValSet, attestStreamer attestStreamer, ) *Worker { return &Worker{ - destChain: destChain, - network: network, - cProvider: cProvider, - xProvider: xProvider, - creator: creator, - sendProvider: sendProvider, - awaitValSet: awaitValSet, + destChain: destChain, + network: network, + cProvider: cProvider, + xProvider: xProvider, + creator: creator, + sendProvider: sendProvider, + awaitValSet: awaitValSet, + attestStreamer: attestStreamer, } } @@ -99,6 +102,9 @@ func (w *Worker) runOnce(ctx context.Context) error { return err } + // Errgroup to manage all goroutines: streams and buffer + eg, ctx := errgroup.WithContext(ctx) + var logAttrs []any //nolint:prealloc // Not worth it for chainVer, fromOffset := range attestOffsets { if chainVer.ID == w.destChain.ID { // Sanity check @@ -107,14 +113,24 @@ func (w *Worker) runOnce(ctx context.Context) error { callback := w.newCallback(msgFilter, buf.AddInput, newMsgStreamMapper(w.network)) - w.cProvider.StreamAsync(ctx, chainVer, fromOffset, w.destChain.Name, callback) + eg.Go(func() error { + return w.attestStreamer(ctx, chainVer, fromOffset, w.destChain.Name, callback) + }) logAttrs = append(logAttrs, w.network.ChainVersionName(chainVer), fromOffset) } + eg.Go(func() error { + return buf.Run(ctx) + }) + log.Info(ctx, "Worker subscribed to chains", logAttrs...) - return buf.Run(ctx) + if err := eg.Wait(); err != nil { + return errors.Wrap(err, "worker error") + } + + return nil } // awaitValSet blocks until the portal is aware of this validator set ID. @@ -192,8 +208,21 @@ func (w *Worker) newCallback( ) cchain.ProviderCallback { var cachedValSetID uint64 var cachedValSet []cchain.PortalValidator + var prevOffset uint64 + + return func(ctx context.Context, att xchain.Attestation) (err error) { + // Sanity check strictly sequential offsets. + { + if prevOffset != 0 && att.AttestOffset != prevOffset+1 { + return errors.New("non-sequential attestation offset [BUG]", "prev", prevOffset, "curr", att.AttestOffset) + } + defer func() { + if err == nil { + prevOffset = att.AttestOffset + } + }() + } - return func(ctx context.Context, att xchain.Attestation) error { block, ok, err := fetchXBlock(ctx, w.xProvider, att) if err != nil { return err