Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-service: add configurable client timeout #12074

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion op-service/client/lazy_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (l *LazyRPC) dial(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
l.inner = &BaseRPCClient{c: underlying}
l.inner = NewBaseRPCClient(underlying)
return nil
}

Expand All @@ -66,6 +66,7 @@ func (l *LazyRPC) CallContext(ctx context.Context, result any, method string, ar
if err := l.dial(ctx); err != nil {
return err
}
fmt.Println("checkpoin 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops yea good catch. Cleanup pr here: #12083

return l.inner.CallContext(ctx, result, method, args...)
}

Expand Down
34 changes: 29 additions & 5 deletions op-service/client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,26 @@ type rpcConfig struct {
limit float64
burst int
lazy bool
callTimeout time.Duration
batchCallTimeout time.Duration
}

type RPCOption func(cfg *rpcConfig) error

func WithCallTimeout(d time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
cfg.callTimeout = d
return nil
}
}

func WithBatchCallTimeout(d time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
cfg.batchCallTimeout = d
return nil
}
}
Comment on lines +45 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I see correctly that these aren't used anywhere (yet)?

Copy link
Contributor Author

@bitwiseguy bitwiseguy Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the new NewEngineAPIClientWithTimeout function is used by the replayor tool here for benchmark testing.

These two functions also used in the replayor here. Not currently used in any monorepo code


// WithDialBackoff configures the number of attempts for the initial dial to the RPC,
// attempts are executed with an exponential backoff strategy.
func WithDialBackoff(attempts int) RPCOption {
Expand Down Expand Up @@ -98,6 +114,12 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial.
cfg.backoffAttempts = 1
}
if cfg.callTimeout == 0 {
cfg.callTimeout = 10 * time.Second
}
if cfg.batchCallTimeout == 0 {
cfg.batchCallTimeout = 20 * time.Second
}

var wrapped RPC
if cfg.lazy {
Expand All @@ -107,7 +129,7 @@ func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption)
if err != nil {
return nil, err
}
wrapped = &BaseRPCClient{c: underlying}
wrapped = &BaseRPCClient{c: underlying, callTimeout: cfg.callTimeout, batchCallTimeout: cfg.batchCallTimeout}
}

if cfg.limit != 0 {
Expand Down Expand Up @@ -171,25 +193,27 @@ func IsURLAvailable(ctx context.Context, address string) bool {
// with the client.RPC interface.
// It sets a timeout of 10s on CallContext & 20s on BatchCallContext made through it.
type BaseRPCClient struct {
c *rpc.Client
c *rpc.Client
batchCallTimeout time.Duration
callTimeout time.Duration
}

func NewBaseRPCClient(c *rpc.Client) *BaseRPCClient {
return &BaseRPCClient{c: c}
return &BaseRPCClient{c: c, callTimeout: 10 * time.Second, batchCallTimeout: 20 * time.Second}
}

func (b *BaseRPCClient) Close() {
b.c.Close()
}

func (b *BaseRPCClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
cCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
cCtx, cancel := context.WithTimeout(ctx, b.callTimeout)
defer cancel()
return b.c.CallContext(cCtx, result, method, args...)
}

func (b *BaseRPCClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
cCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
cCtx, cancel := context.WithTimeout(ctx, b.batchCallTimeout)
defer cancel()
return b.c.BatchCallContext(cCtx, batch)
}
Expand Down
27 changes: 19 additions & 8 deletions op-service/sources/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ func NewEngineClient(client client.RPC, log log.Logger, metrics caching.Metrics,

// EngineAPIClient is an RPC client for the Engine API functions.
type EngineAPIClient struct {
RPC client.RPC
log log.Logger
evp EngineVersionProvider
RPC client.RPC
log log.Logger
evp EngineVersionProvider
timeout time.Duration
}

type EngineVersionProvider interface {
Expand All @@ -63,9 +64,19 @@ type EngineVersionProvider interface {

func NewEngineAPIClient(rpc client.RPC, l log.Logger, evp EngineVersionProvider) *EngineAPIClient {
return &EngineAPIClient{
RPC: rpc,
log: l,
evp: evp,
RPC: rpc,
log: l,
evp: evp,
timeout: time.Second * 5,
}
}

func NewEngineAPIClientWithTimeout(rpc client.RPC, l log.Logger, evp EngineVersionProvider, timeout time.Duration) *EngineAPIClient {
return &EngineAPIClient{
RPC: rpc,
log: l,
evp: evp,
timeout: timeout,
}
}
Comment on lines 65 to 81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use the options pattern here instead, by creating a type EngineAPIClientOption func(*EngineAPIClient) and then receiving a variadic parameter of options and applying them. Then you could create a WithTimeout(time.Duration) option.


Expand All @@ -84,7 +95,7 @@ func (s *EngineAPIClient) ForkchoiceUpdate(ctx context.Context, fc *eth.Forkchoi
llog := s.log.New("state", fc) // local logger
tlog := llog.New("attr", attributes) // trace logger
tlog.Trace("Sharing forkchoice-updated signal")
fcCtx, cancel := context.WithTimeout(ctx, time.Second*5)
fcCtx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
var result eth.ForkchoiceUpdatedResult
method := s.evp.ForkchoiceUpdatedVersion(attributes)
Expand Down Expand Up @@ -120,7 +131,7 @@ func (s *EngineAPIClient) NewPayload(ctx context.Context, payload *eth.Execution
e := s.log.New("block_hash", payload.BlockHash)
e.Trace("sending payload for execution")

execCtx, cancel := context.WithTimeout(ctx, time.Second*5)
execCtx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
var result eth.PayloadStatusV1

Expand Down