Skip to content

Commit

Permalink
feat(failover): return 503 to batcher when eigenda is down (#193)
Browse files Browse the repository at this point in the history
* feat(failover): return 503 to batcher when eigenda is down

chore: go mod tidy to generate go.mod

feat: dealing with new eigenda-client grpc errors + ErrorFailover convention

comment: fix typo

feat(handlers): postShared returns 429 when disperser rate limited client

flag(eigenda): rename RetriesBeforeFailover -> PutRetries

reviewer correctly pointed out that retrying was more general than only for failovers

lint: nolint exhaustive switch check for Put case

* flag(eigenda-client): add cli flag for new config ConfirmationTimeout

* tests(handlers): rename servers_test.go -> handlers_test.go + some small refactors

* tests(handlers): add PUT failure tests for all modes

* test(handlers): remove unneeded expectedError in TestHandlerPut

* dep: update eigenda to master head (contains ErrorFailover fix)

* tests(handlers): add tests for error types (including failover)

* fix: errors after rebase

* flags: clearer usage string for eigenda-client ResponseTimeoutFlag

* style: define is503 function to follow isABC pattern

* style: make lint
  • Loading branch information
samlaf authored Nov 11, 2024
1 parent f263439 commit 9f04e56
Show file tree
Hide file tree
Showing 12 changed files with 396 additions and 212 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"go.testFlags": [
"-test.parallel",
"4",
// Comment the following 2 lines to run unit tests.
"-deploy-config",
"../.devnet/devnetL1.json"
]
Expand Down
3 changes: 1 addition & 2 deletions common/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ const (
)

var (
ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size")
ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed")
ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size")
)

func (b BackendType) String() string {
Expand Down
44 changes: 35 additions & 9 deletions flags/eigendaflags/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (

var (
DisperserRPCFlagName = withFlagPrefix("disperser-rpc")
ResponseTimeoutFlagName = withFlagPrefix("response-timeout")
ConfirmationTimeoutFlagName = withFlagPrefix("confirmation-timeout")
StatusQueryRetryIntervalFlagName = withFlagPrefix("status-query-retry-interval")
StatusQueryTimeoutFlagName = withFlagPrefix("status-query-timeout")
DisableTLSFlagName = withFlagPrefix("disable-tls")
ResponseTimeoutFlagName = withFlagPrefix("response-timeout")
CustomQuorumIDsFlagName = withFlagPrefix("custom-quorum-ids")
SignerPrivateKeyHexFlagName = withFlagPrefix("signer-private-key-hex")
PutBlobEncodingVersionFlagName = withFlagPrefix("put-blob-encoding-version")
Expand All @@ -27,6 +28,8 @@ var (
ConfirmationDepthFlagName = withFlagPrefix("confirmation-depth")
EthRPCURLFlagName = withFlagPrefix("eth-rpc")
SvcManagerAddrFlagName = withFlagPrefix("svc-manager-addr")
// Flags that are proxy specific, and not used by the eigenda-client
PutRetriesFlagName = withFlagPrefix("put-retries")
)

func withFlagPrefix(s string) string {
Expand All @@ -46,6 +49,26 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
EnvVars: []string{withEnvPrefix(envPrefix, "DISPERSER_RPC")},
Category: category,
},
&cli.DurationFlag{
Name: ResponseTimeoutFlagName,
Usage: "Flag used to configure the underlying disperser-client. Total time to wait for the disperseBlob call to return or disperseAuthenticatedBlob stream to finish and close.",
Value: 60 * time.Second,
EnvVars: []string{withEnvPrefix(envPrefix, "RESPONSE_TIMEOUT")},
Category: category,
},
&cli.DurationFlag{
Name: ConfirmationTimeoutFlagName,
Usage: `The total amount of time that the client will spend waiting for EigenDA
to "confirm" (include onchain) a blob after it has been dispersed. Note that
we stick to "confirm" here but this really means InclusionTimeout,
not confirmation in the sense of confirmation depth.
If ConfirmationTimeout time passes and the blob is not yet confirmed,
the client will return an api.ErrorFailover to let the caller failover to EthDA.`,
Value: 15 * time.Minute,
EnvVars: []string{withEnvPrefix(envPrefix, "CONFIRMATION_TIMEOUT")},
Category: category,
},
&cli.DurationFlag{
Name: StatusQueryTimeoutFlagName,
Usage: "Duration to wait for a blob to finalize after being sent for dispersal. Default is 30 minutes.",
Expand All @@ -67,13 +90,6 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
EnvVars: []string{withEnvPrefix(envPrefix, "GRPC_DISABLE_TLS")},
Category: category,
},
&cli.DurationFlag{
Name: ResponseTimeoutFlagName,
Usage: "Total time to wait for a response from the EigenDA disperser. Default is 60 seconds.",
Value: 60 * time.Second,
EnvVars: []string{withEnvPrefix(envPrefix, "RESPONSE_TIMEOUT")},
Category: category,
},
&cli.UintSliceFlag{
Name: CustomQuorumIDsFlagName,
Usage: "Custom quorum IDs for writing blobs. Should not include default quorums 0 or 1.",
Expand Down Expand Up @@ -137,17 +153,27 @@ func CLIFlags(envPrefix, category string) []cli.Flag {
Category: category,
Required: true,
},
// Flags that are proxy specific, and not used by the eigenda-client
// TODO: should we move this to a more specific category, like EIGENDA_STORE?
&cli.UintFlag{
Name: PutRetriesFlagName,
Usage: "Number of times to retry blob dispersals.",
Value: 3,
EnvVars: []string{withEnvPrefix(envPrefix, "PUT_RETRIES")},
Category: category,
},
}
}

func ReadConfig(ctx *cli.Context) clients.EigenDAClientConfig {
waitForFinalization, confirmationDepth := parseConfirmationFlag(ctx.String(ConfirmationDepthFlagName))
return clients.EigenDAClientConfig{
RPC: ctx.String(DisperserRPCFlagName),
ResponseTimeout: ctx.Duration(ResponseTimeoutFlagName),
ConfirmationTimeout: ctx.Duration(ConfirmationTimeoutFlagName),
StatusQueryRetryInterval: ctx.Duration(StatusQueryRetryIntervalFlagName),
StatusQueryTimeout: ctx.Duration(StatusQueryTimeoutFlagName),
DisableTLS: ctx.Bool(DisableTLSFlagName),
ResponseTimeout: ctx.Duration(ResponseTimeoutFlagName),
CustomQuorumIDs: ctx.UintSlice(CustomQuorumIDsFlagName),
SignerPrivateKeyHex: ctx.String(SignerPrivateKeyHexFlagName),
PutBlobEncodingVersion: codecs.BlobEncodingVersion(ctx.Uint(PutBlobEncodingVersionFlagName)),
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ go 1.22
toolchain go1.22.0

require (
github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d
github.com/Layr-Labs/eigenda v0.8.5-rc.0.0.20241101212705-fa8776ae648c
github.com/avast/retry-go/v4 v4.6.0
github.com/consensys/gnark-crypto v0.12.1
github.com/ethereum-optimism/optimism v1.9.4-0.20240927020138-a9c7f349d10b
github.com/ethereum/go-ethereum v1.14.11
Expand All @@ -20,6 +21,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/redis v0.33.0
github.com/urfave/cli/v2 v2.27.4
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
google.golang.org/grpc v1.64.1
)

require (
Expand Down Expand Up @@ -283,7 +285,6 @@ require (
golang.org/x/time v0.6.0 // indirect
golang.org/x/tools v0.24.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/grpc v1.64.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE=
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d h1:2JtVArkLjW61kilkvvLyFHXBMp0ClF8PYCAxWqRnoDQ=
github.com/Layr-Labs/eigenda v0.8.5-0.20241031144746-e2ead56a306d/go.mod h1:sqUNf9Ak+EfAX82jDxrb4QbT/g3DViWD3b7YIk36skk=
github.com/Layr-Labs/eigenda v0.8.5-rc.0.0.20241101212705-fa8776ae648c h1:TuvZlhWSrwpG6EPl+xjOo5UCp2QcVGl+EOY+BalqOXg=
github.com/Layr-Labs/eigenda v0.8.5-rc.0.0.20241101212705-fa8776ae648c/go.mod h1:sqUNf9Ak+EfAX82jDxrb4QbT/g3DViWD3b7YIk36skk=
github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a h1:L/UsJFw9M31FD/WgXTPFB0oxbq9Cu4Urea1xWPMQS7Y=
github.com/Layr-Labs/eigensdk-go v0.1.7-0.20240507215523-7e4891d5099a/go.mod h1:OF9lmS/57MKxS0xpSpX0qHZl0SKkDRpvJIvsGvMN1y8=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
Expand All @@ -45,6 +45,8 @@ github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer5
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA=
github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 h1:gTK2uhtAPtFcdRRJilZPx8uJLL2J85xK11nKtWL0wfU=
Expand Down
7 changes: 4 additions & 3 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
MemstoreConfig memstore.Config
StorageConfig store.Config
VerifierConfig verify.Config
PutRetries uint

MemstoreEnabled bool
}
Expand All @@ -28,11 +29,11 @@ func ReadConfig(ctx *cli.Context) Config {
edaClientConfig := eigendaflags.ReadConfig(ctx)
return Config{
EdaClientConfig: edaClientConfig,
MemstoreConfig: memstore.ReadConfig(ctx),
StorageConfig: store.ReadConfig(ctx),
VerifierConfig: verify.ReadConfig(ctx, edaClientConfig),

PutRetries: ctx.Uint(eigendaflags.PutRetriesFlagName),
MemstoreEnabled: ctx.Bool(memstore.EnabledFlagName),
MemstoreConfig: memstore.ReadConfig(ctx),
StorageConfig: store.ReadConfig(ctx),
}
}

Expand Down
29 changes: 29 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package server

import (
"errors"
"fmt"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda/api"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// MetaError includes both an error and commitment metadata
Expand All @@ -22,3 +27,27 @@ func (me MetaError) Error() string {
func (me MetaError) Unwrap() error {
return me.Err
}

func is400(err error) bool {
// proxy requests are super simple (clients basically only pass bytes), so the only 400 possible
// is passing a blob that's too big.
//
// Any 400s returned by the disperser are due to formatting bugs in proxy code, for eg. badly
// IFFT'ing or encoding the blob, so we shouldn't return a 400 to the client.
// See https://github.com/Layr-Labs/eigenda/blob/bee55ed9207f16153c3fd8ebf73c219e68685def/api/errors.go#L22
// for the 400s returned by the disperser server (currently only INVALID_ARGUMENT).
return errors.Is(err, common.ErrProxyOversizedBlob)
}

func is429(err error) bool {
// grpc RESOURCE_EXHAUSTED is returned by the disperser server when the client has sent too many requests
// in a short period of time. This is a client-side issue, so we should return the 429 to the client.
st, isGRPCError := status.FromError(err)
return isGRPCError && st.Code() == codes.ResourceExhausted
}

// 503 is returned to tell the caller (batcher) to failover to ethda b/c eigenda is temporarily down
func is503(err error) bool {
// TODO: would be cleaner to define a sentinel error in eigenda-core and use that instead
return errors.Is(err, &api.ErrorFailover{})
}
13 changes: 8 additions & 5 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"

"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/gorilla/mux"
)

Expand Down Expand Up @@ -181,11 +180,15 @@ func (svr *Server) handlePostShared(w http.ResponseWriter, r *http.Request, comm
Err: fmt.Errorf("put request failed with commitment %v (commitment mode %v): %w", comm, meta.Mode, err),
Meta: meta,
}
if errors.Is(err, common.ErrEigenDAOversizedBlob) || errors.Is(err, common.ErrProxyOversizedBlob) {
// we add here any error that should be returned as a 400 instead of a 500.
// currently only includes oversized blob requests
switch {
case is400(err):
http.Error(w, err.Error(), http.StatusBadRequest)
} else {
case is429(err):
http.Error(w, err.Error(), http.StatusTooManyRequests)
case is503(err):
// this tells the caller (batcher) to failover to ethda b/c eigenda is temporarily down
http.Error(w, err.Error(), http.StatusServiceUnavailable)
default:
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return err
Expand Down
Loading

0 comments on commit 9f04e56

Please sign in to comment.