Skip to content

Commit

Permalink
feat(relayer): add leader streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
corverroos committed Nov 1, 2024
1 parent ac1f411 commit 8b7bd28
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 23 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ require (
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/cel-go v0.21.0 // indirect
github.com/google/go-containerregistry v0.20.2 // indirect
github.com/google/orderedcode v0.0.1 // indirect
Expand Down Expand Up @@ -249,7 +249,7 @@ require (
github.com/supranational/blst v0.3.13 // indirect
github.com/syndtr/goleveldb v1.0.1 // indirect
github.com/tendermint/go-amino v0.16.0 // indirect
github.com/tidwall/btree v1.7.0 // indirect
github.com/tidwall/btree v1.7.0
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/ulikunitz/xz v0.5.11 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -682,8 +682,8 @@ github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb h1:PBC98N2aIaM3XXi
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/cel-go v0.21.0 h1:cl6uW/gxN+Hy50tNYvI691+sXxioCnstFzLp2WO4GCI=
github.com/google/cel-go v0.21.0/go.mod h1:rHUlWCcBKgyEk+eV03RPdZUekPp6YcJwV0FxuUksYxc=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down
5 changes: 4 additions & 1 deletion relayer/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func Run(ctx context.Context, cfg Config) error {
pricer := newTokenPricer(ctx)
pnl := newPnlLogger(network.ID, pricer)

attestStreamer := newLeaderStreamer(cprov, network)

for _, destChain := range network.EVMChains() {
// Setup send provider
sendProvider := func() (SendAsync, error) {
Expand Down Expand Up @@ -100,7 +102,8 @@ func Run(ctx context.Context, cfg Config) error {
xprov,
CreateSubmissions,
sendProvider,
awaitValSet)
awaitValSet,
attestStreamer)

go worker.Run(ctx)
}
Expand Down
265 changes: 265 additions & 0 deletions relayer/app/streambuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
package relayer

import (
"context"
"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/xchain"
"github.com/tidwall/btree"
"sync"
"time"
)

// errStop is returned when the leader should stop streaming.
var errStop = errors.New("stop")

// getLimit returns the limit of attestations to cache per chain version.
func getLimit(network netconf.ID) int {
if network == netconf.Devnet {
return 20 // Tiny buffer in devnet
}

return 10_000 // 10k atts & 1KB per attestation & 10 chain versions ~= 100MB
}

// leadChaosTimeout returns a function that returns true if the leader chaos timeout has been reached.
// This ensures we rotate leaders after a certain time (and test leader rotation).
func leadChaosTimeout(network netconf.ID) func() bool {
t0 := time.Now()
return func() bool {
duration := time.Hour
if network == netconf.Devnet {
duration = time.Second * 10
} else if network == netconf.Staging {
duration = time.Minute
}

return time.Since(t0) > duration
}

}

// leader tracks a worker actively streaming attestations and adding them to the cache.
// It "locks" a range of offsets as leader of that range.
type leader struct {
mu sync.RWMutex
from uint64
latest uint64
delete func()
}

// contains returns true if the offset is within the leader's range.
func (s *leader) contains(offset uint64) bool {
s.mu.RLock()
defer s.mu.RUnlock()

return s.from <= offset && offset <= s.latest
}

// IncRange increases the range to the provided height plus one
// since the leader will move on the that height next.
func (s *leader) IncRange(height uint64) {
s.mu.Lock()
defer s.mu.Unlock()

s.latest = height + 1
}

// Delete removes the leader from the buffer.
func (s *leader) Delete() {
s.mu.Lock()
defer s.mu.Unlock()

s.delete()
}

func newAttestBuffer(limit int) *attestBuffer {
return &attestBuffer{
limit: limit,
leaders: make(map[*leader]struct{}),
atts: btree.NewMap[uint64, xchain.Attestation](32), // Degree of 32 is a good default
}
}

// attestBuffer is an attestations cache being populated by leaders.
// The goal is to avoid overlapping streams.
type attestBuffer struct {
limit int
mu sync.RWMutex
leaders map[*leader]struct{}
atts *btree.Map[uint64, xchain.Attestation]
}

// Add adds an attestation to the cache if the cache is not full or
// if the attestation is not too old.
// It returns true if it was added and an existing key was replaced.
func (b *attestBuffer) Add(att xchain.Attestation) (replaced bool) {
b.mu.Lock()
defer b.mu.Unlock()

defer func() {
// Maybe trim if we grew over limit
for b.atts.Len() > b.limit {
height, _, ok := b.atts.PopMin()
if ok && height == att.AttestOffset {
replaced = false // Not added, so not replaced
}
}
}()

_, replaced = b.atts.Set(att.AttestOffset, att)

return replaced
}

// Get returns the attestation at the provided offset or false.
func (b *attestBuffer) Get(offset uint64) (xchain.Attestation, bool) {
b.mu.RLock()
defer b.mu.RUnlock()

return b.atts.Get(offset)
}

// containsStreamerUnsafe returns true if any leader contains the offset.
// It is unsafe since it assumes the lock is held.
func (b *attestBuffer) containsStreamerUnsafe(offset uint64) bool {
for s := range b.leaders {
if s.contains(offset) {
return true
}
}

return false
}

// MaybeNewLeader returns a new leader if no other leader is already streaming the offset or false.
func (b *attestBuffer) MaybeNewLeader(offset uint64) (*leader, bool) {
b.mu.RLock()
contains := b.containsStreamerUnsafe(offset)
b.mu.RUnlock()
if contains {
return nil, false
}

b.mu.Lock()
defer b.mu.Unlock()

// Double check in case another goroutine added a leader
for s := range b.leaders {
if s.contains(offset) {
return nil, false
}
}

s := &leader{
from: offset,
latest: offset,
}
s.delete = func() {
b.mu.Lock()
defer b.mu.Unlock()

delete(b.leaders, s)
}

b.leaders[s] = struct{}{}

return s, true
}

// attestStreamer is a function that streams attestations from a specific offset.
// It abstracts cchain.Provider.StreamAttestations.
type attestStreamer func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, workerName string, callback cchain.ProviderCallback) error

// newLeaderStreamer returns a new attestStreamer that avoids multiple overlapping streaming queries
// by selecting a leader to query each range of offsets.
func newLeaderStreamer(cprov cchain.Provider, network netconf.Network) attestStreamer {
var buffers sync.Map // map[xchain.ChainVer]*attestBuffer

return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, workerName string, callback cchain.ProviderCallback) error {
anyBuffer, _ := buffers.LoadOrStore(chainVer, newAttestBuffer(getLimit(network.ID)))
buffer := anyBuffer.(*attestBuffer)

name := network.ChainVersionName(chainVer)

// Track the offset of the last attestation we "processed"
prevOffset := fromOffset - 1

Check failure on line 188 in relayer/app/streambuf.go

View workflow job for this annotation

GitHub Actions / pre-commit / pre-commit

uint subtraction

// lead blocks and streams attestations from the provided height using the provided leader.
// It populates the cache with fetched attestations.
// It returns nil if it detects an overlap with another leader or on chaos timeout.
lead := func(l *leader, from uint64) error {
defer l.Delete()
log.Debug(ctx, "Starting attest stream", "chain", name, "offset", from, "worker", workerName)
timeout := leadChaosTimeout(network.ID)

err := cprov.StreamAttestations(ctx, chainVer, from, workerName, func(ctx context.Context, att xchain.Attestation) error {
l.IncRange(att.AttestOffset)
replaced := buffer.Add(att)

if err := callback(ctx, att); err != nil {
return err
}

prevOffset = att.AttestOffset

if replaced {
// Another leader already cached this att, so switch to reading from cache
log.Debug(ctx, "Stopping overlapping attest stream", "chain", name, "offset", att.AttestOffset, "worker", workerName)
return errStop
} else if timeout() {
log.Debug(ctx, "Stopping timed-out attest stream", "chain", name, "offset", att.AttestOffset, "worker", workerName)
return errStop
}

return nil
})
if errors.Is(err, errStop) {
return nil
}

return err
}

timer := time.NewTimer(0)
defer timer.Stop()

// Loop until the context is closed or error is encountered
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
// Check if the cache is populated
if att, ok := buffer.Get(prevOffset + 1); ok {
// Got it, process the attestation
if err := callback(ctx, att); err != nil {
return err
}

prevOffset = att.AttestOffset
timer.Reset(0) // Immediately go to next

continue
}

// Cache isn't populated, check if we need to start streaming as a new leader
if l, ok := buffer.MaybeNewLeader(prevOffset + 1); ok {
if err := lead(l, prevOffset+1); err != nil {
return err
}

// Leader stopped gracefully, so try cache immediately (with probably updated prevOffset)
timer.Reset(0)

continue
}

// Otherwise, wait a bit, and try the same offset again
timer.Reset(time.Second)
}
}
}
}
Loading

0 comments on commit 8b7bd28

Please sign in to comment.