Skip to content

Commit

Permalink
Randomize ordering of rollupUrl failover
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianBland committed May 30, 2024
1 parent 0dcb1b2 commit ca92b66
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 33 deletions.
23 changes: 11 additions & 12 deletions op-service/dial/active_l2_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ type ethDialer func(ctx context.Context, timeout time.Duration, log log.Logger,
type ActiveL2EndpointProvider struct {
ActiveL2RollupProvider
currentEthClient EthClientInterface
ethClientIndex int
ethDialer ethDialer
ethUrls []string
ethUrls indexedIterable[string]
}

// NewActiveL2EndpointProvider creates a new ActiveL2EndpointProvider
Expand All @@ -43,26 +42,26 @@ func NewActiveL2EndpointProvider(ctx context.Context,
) (RollupClientInterface, error) {
return DialRollupClientWithTimeout(ctx, timeout, log, url)
}
return newActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, checkDuration, networkTimeout, logger, ethDialer, rollupDialer)
return newActiveL2EndpointProvider(ctx, newRandomizedIterable(ethUrls), newRandomizedIterable(rollupUrls), checkDuration, networkTimeout, logger, ethDialer, rollupDialer)
}

func newActiveL2EndpointProvider(
ctx context.Context,
ethUrls, rollupUrls []string,
ethUrls, rollupUrls indexedIterable[string],
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
ethDialer ethDialer,
rollupDialer rollupDialer,
) (*ActiveL2EndpointProvider, error) {
if len(rollupUrls) == 0 {
if rollupUrls.Len() == 0 {
return nil, errors.New("empty rollup urls list, expected at least one URL")
}
if len(ethUrls) != len(rollupUrls) {
return nil, fmt.Errorf("number of eth urls (%d) and rollup urls (%d) mismatch", len(ethUrls), len(rollupUrls))
if ethUrls.Len() != rollupUrls.Len() {
return nil, fmt.Errorf("number of eth urls (%d) and rollup urls (%d) mismatch", ethUrls.Len(), rollupUrls.Len())
}

rollupProvider, err := newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
rollupProvider, err := newActiveL2RollupProvider(ctx, (rollupUrls), checkDuration, networkTimeout, logger, rollupDialer)
if err != nil {
return nil, err
}
Expand All @@ -86,12 +85,12 @@ func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInte
if err != nil {
return nil, err
}
if p.ethClientIndex != p.rollupIndex || p.currentEthClient == nil {
if p.ethUrls.CurrentIndex() != p.rollupUrls.CurrentIndex() || p.currentEthClient == nil {
// we changed sequencers, dial a new EthClient
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()
idx := p.rollupIndex
ep := p.ethUrls[idx]
idx := p.rollupUrls.CurrentIndex()
ep := p.ethUrls.Get(idx)
log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep)
ethClient, err := p.ethDialer(cctx, p.networkTimeout, p.log, ep)
if err != nil {
Expand All @@ -100,7 +99,7 @@ func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInte
if p.currentEthClient != nil {
p.currentEthClient.Close()
}
p.ethClientIndex = idx
p.ethUrls.SetCurrentIndex(idx)
p.currentEthClient = ethClient
}
return p.currentEthClient, nil
Expand Down
6 changes: 3 additions & 3 deletions op-service/dial/active_l2_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Dur

return newActiveL2RollupProvider(
context.Background(),
rollupUrls,
newOrderedIterable(rollupUrls),
checkDuration,
1*time.Minute,
testlog.Logger(et.t, log.LevelDebug),
Expand Down Expand Up @@ -112,8 +112,8 @@ func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.D

return newActiveL2EndpointProvider(
context.Background(),
ethUrls,
rollupUrls,
newOrderedIterable(ethUrls),
newOrderedIterable(rollupUrls),
checkDuration,
1*time.Minute,
testlog.Logger(et.t, log.LevelDebug),
Expand Down
122 changes: 104 additions & 18 deletions op-service/dial/active_rollup_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,98 @@ import (

type rollupDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error)

type indexedIterable[T any] interface {
Len() int
CurrentIndex() int
SetCurrentIndex(int)
Get(i int) T
Iter() <-chan int
}

func newOrderedIterable[T any](list []T) indexedIterable[T] {
return &orderedIterableList[T]{list: list}
}

type orderedIterableList[T any] struct {
list []T
idx int
}

func (r *orderedIterableList[T]) Len() int {
return len(r.list)
}

func (r *orderedIterableList[T]) CurrentIndex() int {
return r.idx
}

func (r *orderedIterableList[T]) SetCurrentIndex(i int) {
r.idx = i
}

func (r *orderedIterableList[T]) Get(i int) T {
return r.list[i]
}

func (r *orderedIterableList[T]) Iter() <-chan int {
ch := make(chan int, r.Len())
go func() {
for offset := range r.list {
idx := (r.CurrentIndex() + offset) % r.Len()
ch <- idx
}
close(ch)
}()
return ch
}

func newRandomizedIterable[T any](list []T) indexedIterable[T] {
set := make(map[int]T, len(list))
for idx, v := range list {
set[idx] = v
}
idx := 0
for i := range set {
idx = i
break
}
return &randomizedIterableList[T]{set: set, idx: idx}
}

type randomizedIterableList[T any] struct {
set map[int]T
idx int
}

func (r randomizedIterableList[T]) Len() int {
return len(r.set)
}

func (r *randomizedIterableList[T]) CurrentIndex() int {
return r.idx
}

func (r *randomizedIterableList[T]) SetCurrentIndex(i int) {
r.idx = i
}

func (r *randomizedIterableList[T]) Get(i int) T {
return r.set[i]
}
func (r randomizedIterableList[T]) Iter() <-chan int {
ch := make(chan int, r.Len())
go func() {
ch <- r.CurrentIndex()
for idx, _ := range r.set {
if idx != r.CurrentIndex() {
ch <- idx
}
}
close(ch)
}()
return ch
}

// ActiveL2EndpointProvider is an interface for providing a RollupClient
// It manages the lifecycle of the RollupClient for callers
// It does this by failing over down the list of rollupUrls if the current one is inactive or broken
Expand All @@ -22,10 +114,9 @@ type ActiveL2RollupProvider struct {

activeTimeout time.Time

rollupUrls []string
rollupUrls indexedIterable[string]
rollupDialer rollupDialer
currentRollupClient RollupClientInterface
rollupIndex int
clientLock *sync.Mutex
}

Expand All @@ -34,7 +125,7 @@ type ActiveL2RollupProvider struct {
// provide a checkDuration of 0 to check every time
func NewActiveL2RollupProvider(
ctx context.Context,
rollupUrls []string,
rollupUrls indexedIterable[string],
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
Expand All @@ -49,13 +140,13 @@ func NewActiveL2RollupProvider(

func newActiveL2RollupProvider(
ctx context.Context,
rollupUrls []string,
rollupUrls indexedIterable[string],
checkDuration time.Duration,
networkTimeout time.Duration,
logger log.Logger,
dialer rollupDialer,
) (*ActiveL2RollupProvider, error) {
if len(rollupUrls) == 0 {
if rollupUrls.Len() == 0 {
return nil, errors.New("empty rollup urls list")
}
p := &ActiveL2RollupProvider{
Expand Down Expand Up @@ -101,24 +192,23 @@ func (p *ActiveL2RollupProvider) shouldCheck() bool {
}

func (p *ActiveL2RollupProvider) findActiveEndpoints(ctx context.Context) error {
startIdx := p.rollupIndex
var errs error
for offset := range p.rollupUrls {
idx := (startIdx + offset) % p.numEndpoints()
if offset != 0 || p.currentRollupClient == nil {
for idx := range p.rollupUrls.Iter() {
ep := p.rollupUrls.Get(idx)

if idx != p.rollupUrls.CurrentIndex() || p.currentRollupClient == nil {
if err := p.dialSequencer(ctx, idx); err != nil {
errs = errors.Join(errs, err)
p.log.Warn("Error dialing next sequencer.", "err", err, "index", p.rollupIndex)
p.log.Warn("Error dialing next sequencer.", "err", err, "index", p.rollupUrls.CurrentIndex(), "url", ep)
continue
}
}

ep := p.rollupUrls[idx]
if active, err := p.checkCurrentSequencer(ctx); err != nil {
errs = errors.Join(errs, err)
p.log.Warn("Error querying active sequencer, trying next.", "err", err, "index", idx, "url", ep)
} else if active {
if offset == 0 {
if idx == p.rollupUrls.CurrentIndex() {
p.log.Debug("Current sequencer active.", "index", idx, "url", ep)
} else {
p.log.Info("Found new active sequencer.", "index", idx, "url", ep)
Expand All @@ -137,18 +227,14 @@ func (p *ActiveL2RollupProvider) checkCurrentSequencer(ctx context.Context) (boo
return p.currentRollupClient.SequencerActive(cctx)
}

func (p *ActiveL2RollupProvider) numEndpoints() int {
return len(p.rollupUrls)
}

// dialSequencer dials the sequencer for the url at the given index.
// If successful, the currentRollupClient and rollupIndex are updated and the
// old rollup client is closed.
func (p *ActiveL2RollupProvider) dialSequencer(ctx context.Context, idx int) error {
cctx, cancel := context.WithTimeout(ctx, p.networkTimeout)
defer cancel()

ep := p.rollupUrls[idx]
ep := p.rollupUrls.Get(idx)
p.log.Info("Dialing next sequencer.", "index", idx, "url", ep)
rollupClient, err := p.rollupDialer(cctx, p.networkTimeout, p.log, ep)
if err != nil {
Expand All @@ -157,7 +243,7 @@ func (p *ActiveL2RollupProvider) dialSequencer(ctx context.Context, idx int) err
if p.currentRollupClient != nil {
p.currentRollupClient.Close()
}
p.rollupIndex = idx
p.rollupUrls.SetCurrentIndex(idx)
p.currentRollupClient = rollupClient
return nil
}
Expand Down

0 comments on commit ca92b66

Please sign in to comment.