Skip to content

Commit

Permalink
Do not retry dial attempts in active l2 providers
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianBland committed May 31, 2024
1 parent 0dcb1b2 commit df8ee62
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 51 deletions.
2 changes: 1 addition & 1 deletion op-proposer/proposer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (ps *ProposerService) initRPCClients(ctx context.Context, cfg *CLIConfig) e
var rollupProvider dial.RollupProvider
if strings.Contains(cfg.RollupRpc, ",") {
rollupUrls := strings.Split(cfg.RollupRpc, ",")
rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, cfg.ActiveSequencerCheckDuration, dial.DefaultDialTimeout, ps.Log)
rollupProvider, err = dial.NewActiveL2RollupProvider(ctx, rollupUrls, cfg.ActiveSequencerCheckDuration, 2*time.Second, ps.Log)
} else {
rollupProvider, err = dial.NewStaticL2RollupProvider(ctx, ps.Log, cfg.RollupRpc)
}
Expand Down
41 changes: 21 additions & 20 deletions op-service/client/dial_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"context"
"fmt"
"net"
"strings"
Expand All @@ -19,38 +20,38 @@ func TestIsURLAvailableLocal(t *testing.T) {
addr := fmt.Sprintf("http://localhost:%s", parts[1])

// True & False with ports
require.True(t, IsURLAvailable(addr))
require.False(t, IsURLAvailable("http://localhost:0"))
require.True(t, IsURLAvailable(context.Background(), addr))
require.False(t, IsURLAvailable(context.Background(), "http://localhost:0"))

// Fail open if we don't recognize the scheme
require.True(t, IsURLAvailable("mailto://example.com"))
require.True(t, IsURLAvailable(context.Background(), "mailto://example.com"))

}

func TestIsURLAvailableNonLocal(t *testing.T) {
if !IsURLAvailable("http://example.com") {
if !IsURLAvailable(context.Background(), "http://example.com") {
t.Skip("No internet connection found, skipping this test")
}

// True without ports. http & https
require.True(t, IsURLAvailable("http://example.com"))
require.True(t, IsURLAvailable("http://example.com/hello"))
require.True(t, IsURLAvailable("https://example.com"))
require.True(t, IsURLAvailable("https://example.com/hello"))
require.True(t, IsURLAvailable(context.Background(), "http://example.com"))
require.True(t, IsURLAvailable(context.Background(), "http://example.com/hello"))
require.True(t, IsURLAvailable(context.Background(), "https://example.com"))
require.True(t, IsURLAvailable(context.Background(), "https://example.com/hello"))

// True without ports. ws & wss
require.True(t, IsURLAvailable("ws://example.com"))
require.True(t, IsURLAvailable("ws://example.com/hello"))
require.True(t, IsURLAvailable("wss://example.com"))
require.True(t, IsURLAvailable("wss://example.com/hello"))
require.True(t, IsURLAvailable(context.Background(), "ws://example.com"))
require.True(t, IsURLAvailable(context.Background(), "ws://example.com/hello"))
require.True(t, IsURLAvailable(context.Background(), "wss://example.com"))
require.True(t, IsURLAvailable(context.Background(), "wss://example.com/hello"))

// False without ports
require.False(t, IsURLAvailable("http://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("http://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable("https://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("https://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable("ws://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("ws://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable("wss://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable("wss://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable(context.Background(), "http://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable(context.Background(), "http://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable(context.Background(), "https://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable(context.Background(), "https://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable(context.Background(), "ws://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable(context.Background(), "ws://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
require.False(t, IsURLAvailable(context.Background(), "wss://fakedomainnamethatdoesnotexistandshouldneverexist.com"))
require.False(t, IsURLAvailable(context.Background(), "wss://fakedomainnamethatdoesnotexistandshouldneverexist.com/hello"))
}
7 changes: 4 additions & 3 deletions op-service/client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewRPCWithClient(ctx context.Context, lgr log.Logger, addr string, underlyi
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := retry.Exponential()
return retry.Do(ctx, attempts, bOff, func() (*rpc.Client, error) {
if !IsURLAvailable(addr) {
if !IsURLAvailable(ctx, addr) {
log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr)
}
Expand All @@ -125,7 +125,7 @@ func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string,
})
}

func IsURLAvailable(address string) bool {
func IsURLAvailable(ctx context.Context, address string) bool {
u, err := url.Parse(address)
if err != nil {
return false
Expand All @@ -142,7 +142,8 @@ func IsURLAvailable(address string) bool {
return true
}
}
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
dialer := net.Dialer{Timeout: 5 * time.Second}
conn, err := dialer.DialContext(ctx, "tcp", addr)
if err != nil {
return false
}
Expand Down
29 changes: 19 additions & 10 deletions op-service/dial/active_l2_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"fmt"
"time"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
)

const DefaultActiveSequencerFollowerCheckDuration = 2 * DefaultDialTimeout

type ethDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error)
type ethDialer func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error)

// ActiveL2EndpointProvider is an interface for providing a RollupClient and l2 eth client
// It manages the lifecycle of the RollupClient and eth client for callers
Expand All @@ -33,15 +36,21 @@ func NewActiveL2EndpointProvider(ctx context.Context,
networkTimeout time.Duration,
logger log.Logger,
) (*ActiveL2EndpointProvider, error) {
ethDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
) (EthClientInterface, error) {
return DialEthClientWithTimeout(ctx, timeout, log, url)
ethDialer := func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error) {
rpcCl, err := dialRPCClient(ctx, log, url)
if err != nil {
return nil, err
}

return ethclient.NewClient(rpcCl), nil
}
rollupDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
) (RollupClientInterface, error) {
return DialRollupClientWithTimeout(ctx, timeout, log, url)
rollupDialer := func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error) {
rpcCl, err := dialRPCClient(ctx, log, url)
if err != nil {
return nil, err
}

return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
}
return newActiveL2EndpointProvider(ctx, ethUrls, rollupUrls, checkDuration, networkTimeout, logger, ethDialer, rollupDialer)
}
Expand Down Expand Up @@ -93,7 +102,7 @@ func (p *ActiveL2EndpointProvider) EthClient(ctx context.Context) (EthClientInte
idx := p.rollupIndex
ep := p.ethUrls[idx]
log.Info("sequencer changed (or ethClient was nil due to startup), dialing new eth client", "new_index", idx, "new_url", ep)
ethClient, err := p.ethDialer(cctx, p.networkTimeout, p.log, ep)
ethClient, err := p.ethDialer(cctx, p.log, ep)
if err != nil {
return nil, fmt.Errorf("dialing eth client: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions op-service/dial/active_l2_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func setupEndpointProviderTest(t *testing.T, numSequencers int) *endpointProvide

// newActiveL2EndpointProvider constructs a new ActiveL2RollupProvider using the test harness setup.
func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Duration) (*ActiveL2RollupProvider, error) {
mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) {
mockRollupDialer := func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error) {
for i, client := range et.rollupClients {
if url == fmt.Sprintf("rollup%d", i) {
if !et.rollupDialOutcomes[i] {
Expand Down Expand Up @@ -74,7 +74,7 @@ func (et *endpointProviderTest) newActiveL2RollupProvider(checkDuration time.Dur

// newActiveL2EndpointProvider constructs a new ActiveL2EndpointProvider using the test harness setup.
func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.Duration) (*ActiveL2EndpointProvider, error) {
mockRollupDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error) {
mockRollupDialer := func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error) {
for i, client := range et.rollupClients {
if url == fmt.Sprintf("rollup%d", i) {
if !et.rollupDialOutcomes[i] {
Expand All @@ -86,7 +86,7 @@ func (et *endpointProviderTest) newActiveL2EndpointProvider(checkDuration time.D
return nil, fmt.Errorf("unknown test url: %s", url)
}

mockEthDialer := func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (EthClientInterface, error) {
mockEthDialer := func(ctx context.Context, log log.Logger, url string) (EthClientInterface, error) {
for i, client := range et.ethClients {
if url == fmt.Sprintf("eth%d", i) {
if !et.ethDialOutcomes[i] {
Expand Down
16 changes: 11 additions & 5 deletions op-service/dial/active_rollup_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum/go-ethereum/log"
)

type rollupDialer func(ctx context.Context, timeout time.Duration, log log.Logger, url string) (RollupClientInterface, error)
type rollupDialer func(ctx context.Context, log log.Logger, url string) (RollupClientInterface, error)

// ActiveL2EndpointProvider is an interface for providing a RollupClient
// It manages the lifecycle of the RollupClient for callers
Expand Down Expand Up @@ -39,10 +41,14 @@ func NewActiveL2RollupProvider(
networkTimeout time.Duration,
logger log.Logger,
) (*ActiveL2RollupProvider, error) {
rollupDialer := func(ctx context.Context, timeout time.Duration,
log log.Logger, url string,
rollupDialer := func(ctx context.Context, log log.Logger, url string,
) (RollupClientInterface, error) {
return DialRollupClientWithTimeout(ctx, timeout, log, url)
rpcCl, err := dialRPCClient(ctx, log, url)
if err != nil {
return nil, err
}

return sources.NewRollupClient(client.NewBaseRPCClient(rpcCl)), nil
}
return newActiveL2RollupProvider(ctx, rollupUrls, checkDuration, networkTimeout, logger, rollupDialer)
}
Expand Down Expand Up @@ -150,7 +156,7 @@ func (p *ActiveL2RollupProvider) dialSequencer(ctx context.Context, idx int) err

ep := p.rollupUrls[idx]
p.log.Info("Dialing next sequencer.", "index", idx, "url", ep)
rollupClient, err := p.rollupDialer(cctx, p.networkTimeout, p.log, ep)
rollupClient, err := p.rollupDialer(cctx, p.log, ep)
if err != nil {
return fmt.Errorf("dialing rollup client: %w", err)
}
Expand Down
23 changes: 14 additions & 9 deletions op-service/dial/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ func DialRPCClientWithTimeout(ctx context.Context, timeout time.Duration, log lo
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) {
bOff := retry.Fixed(defaultRetryTime)
return retry.Do(ctx, defaultRetryCount, bOff, func() (*rpc.Client, error) {
if !client.IsURLAvailable(addr) {
log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr)
}
client, err := rpc.DialOptions(ctx, addr)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
return client, nil
return dialRPCClient(ctx, log, addr)
})
}

// Dials a JSON-RPC endpoint once.
func dialRPCClient(ctx context.Context, log log.Logger, addr string) (*rpc.Client, error) {
if !client.IsURLAvailable(ctx, addr) {
log.Warn("failed to dial address, but may connect later", "addr", addr)
return nil, fmt.Errorf("address unavailable (%s)", addr)
}
client, err := rpc.DialOptions(ctx, addr)
if err != nil {
return nil, fmt.Errorf("failed to dial address (%s): %w", addr, err)
}
return client, nil
}

0 comments on commit df8ee62

Please sign in to comment.