diff --git a/api/errors.go b/api/errors.go index 876edac72b..d31620b290 100644 --- a/api/errors.go +++ b/api/errors.go @@ -8,32 +8,38 @@ import ( // The canonical errors from the EigenDA gRPC API endpoints. // // Notes: -// - Start the error space small (but sufficient), and expand when there is an important -// failure case to separate out. -// - Avoid simply wrapping system-internal errors without checking if they are appropriate -// in user-facing errors defined here. Consider map and convert system-internal errors -// before return to users from APIs. +// - We start with a small (but sufficient) subset of google's error code convention, +// and expand when there is an important failure case to separate out. See: +// https://cloud.google.com/apis/design/errors#handling_errors +// - Make sure that internally propagated errors are eventually wrapped in one of the +// user-facing errors defined here, since grpc otherwise returns an UNKNOWN error code, +// which is harder to debug and understand for users. -func NewGRPCError(code codes.Code, msg string) error { +func newErrorGRPC(code codes.Code, msg string) error { return status.Error(code, msg) } // HTTP Mapping: 400 Bad Request -func NewInvalidArgError(msg string) error { - return NewGRPCError(codes.InvalidArgument, msg) +func NewErrorInvalidArg(msg string) error { + return newErrorGRPC(codes.InvalidArgument, msg) } // HTTP Mapping: 404 Not Found -func NewNotFoundError(msg string) error { - return NewGRPCError(codes.NotFound, msg) +func NewErrorNotFound(msg string) error { + return newErrorGRPC(codes.NotFound, msg) } // HTTP Mapping: 429 Too Many Requests -func NewResourceExhaustedError(msg string) error { - return NewGRPCError(codes.ResourceExhausted, msg) +func NewErrorResourceExhausted(msg string) error { + return newErrorGRPC(codes.ResourceExhausted, msg) } // HTTP Mapping: 500 Internal Server Error -func NewInternalError(msg string) error { - return NewGRPCError(codes.Internal, msg) +func NewErrorInternal(msg string) error { + return newErrorGRPC(codes.Internal, msg) +} + +// HTTP Mapping: 501 Not Implemented +func NewErrorUnimplemented() error { + return newErrorGRPC(codes.Unimplemented, "not implemented") } diff --git a/api/grpc/disperser/disperser_grpc.pb.go b/api/grpc/disperser/disperser_grpc.pb.go index 14f8e9f93a..c6bb719a2b 100644 --- a/api/grpc/disperser/disperser_grpc.pb.go +++ b/api/grpc/disperser/disperser_grpc.pb.go @@ -29,10 +29,18 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type DisperserClient interface { - // This API accepts blob to disperse from clients. + // DisperseBlob accepts a single blob to be dispersed. // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the + // is accepted. The client should use GetBlobStatus() API to poll the // processing status of the blob. + // + // If DisperseBlob returns the following error codes: + // INVALID_ARGUMENT (400): request is invalid for a reason specified in the error msg. + // RESOURCE_EXHAUSTED (429): request is rate limited for the quorum specified in the error msg. + // + // user should retry after the specified duration. + // + // INTERNAL (500): serious error, user should NOT retry. DisperseBlob(ctx context.Context, in *DisperseBlobRequest, opts ...grpc.CallOption) (*DisperseBlobReply, error) // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: @@ -124,10 +132,18 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, in *RetrieveBlobRequ // All implementations must embed UnimplementedDisperserServer // for forward compatibility type DisperserServer interface { - // This API accepts blob to disperse from clients. + // DisperseBlob accepts a single blob to be dispersed. // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the + // is accepted. The client should use GetBlobStatus() API to poll the // processing status of the blob. + // + // If DisperseBlob returns the following error codes: + // INVALID_ARGUMENT (400): request is invalid for a reason specified in the error msg. + // RESOURCE_EXHAUSTED (429): request is rate limited for the quorum specified in the error msg. + // + // user should retry after the specified duration. + // + // INTERNAL (500): serious error, user should NOT retry. DisperseBlob(context.Context, *DisperseBlobRequest) (*DisperseBlobReply, error) // DisperseBlobAuthenticated is similar to DisperseBlob, except that it requires the // client to authenticate itself via the AuthenticationData message. The protoco is as follows: diff --git a/api/proto/disperser/disperser.proto b/api/proto/disperser/disperser.proto index 46f0afa9aa..53768ea9f9 100644 --- a/api/proto/disperser/disperser.proto +++ b/api/proto/disperser/disperser.proto @@ -5,10 +5,16 @@ option go_package = "github.com/Layr-Labs/eigenda/api/grpc/disperser"; // Disperser defines the public APIs for dispersing blobs. service Disperser { - // This API accepts blob to disperse from clients. + // DisperseBlob accepts a single blob to be dispersed. // This executes the dispersal async, i.e. it returns once the request - // is accepted. The client could use GetBlobStatus() API to poll the the + // is accepted. The client should use GetBlobStatus() API to poll the // processing status of the blob. + // + // If DisperseBlob returns the following error codes: + // INVALID_ARGUMENT (400): request is invalid for a reason specified in the error msg. + // RESOURCE_EXHAUSTED (429): request is rate limited for the quorum specified in the error msg. + // user should retry after the specified duration. + // INTERNAL (500): serious error, user should NOT retry. rpc DisperseBlob(DisperseBlobRequest) returns (DisperseBlobReply) {} diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index eeff546e88..4dcbc289f2 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -93,7 +93,7 @@ func (c *Client) DeleteTable(ctx context.Context, tableName string) error { _, err := c.dynamoClient.DeleteTable(ctx, &dynamodb.DeleteTableInput{ TableName: aws.String(tableName)}) if err != nil { - return err + return fmt.Errorf("failed to delete table %s: %w", tableName, err) } return nil } @@ -103,9 +103,8 @@ func (c *Client) PutItem(ctx context.Context, tableName string, item Item) (err TableName: aws.String(tableName), Item: item, }) if err != nil { - return err + return fmt.Errorf("failed to put item in table %s: %w", tableName, err) } - return nil } @@ -115,9 +114,8 @@ func (c *Client) PutItemWithCondition(ctx context.Context, tableName string, ite ConditionExpression: aws.String(condition), }) if err != nil { - return err + return fmt.Errorf("failed to put item in table %s: %w", tableName, err) } - return nil } diff --git a/common/aws/s3/client.go b/common/aws/s3/client.go index 74abcf29ef..e6d7bd824c 100644 --- a/common/aws/s3/client.go +++ b/common/aws/s3/client.go @@ -98,8 +98,8 @@ func (s *client) DownloadObject(ctx context.Context, bucket string, key string) func (s *client) UploadObject(ctx context.Context, bucket string, key string, data []byte) error { var partMiBs int64 = 10 uploader := manager.NewUploader(s.s3Client, func(u *manager.Uploader) { - u.PartSize = partMiBs * 1024 * 1024 // 10MB per part - u.Concurrency = 3 //The number of goroutines to spin up in parallel per call to Upload when sending parts + u.PartSize = partMiBs * 1024 * 1024 // 10MiB per part + u.Concurrency = 3 //The number of goroutines to spin up in parallel per call to upload when sending parts }) _, err := uploader.Upload(ctx, &s3.PutObjectInput{ diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 18cd71ac1d..436ac2d8c5 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -108,14 +108,14 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse if err != nil { s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err)) + return api.NewErrorInvalidArg(fmt.Sprintf("error receiving next message: %v", err)) } request, ok := in.GetPayload().(*pb.AuthenticatedRequest_DisperseRequest) if !ok { s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError("missing DisperseBlobRequest") + return api.NewErrorInvalidArg("missing DisperseBlobRequest") } blob, err := s.validateRequestAndGetBlob(ctx, request.DisperseRequest) @@ -124,7 +124,7 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(request.DisperseRequest.GetData()), "DisperseBlobAuthenticated") } s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError(err.Error()) + return api.NewErrorInvalidArg(err.Error()) } // Get the ethereum address associated with the public key. This is just for convenience so we can put addresses instead of public keys in the allowlist. @@ -133,14 +133,14 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse if err != nil { s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError(fmt.Sprintf("failed to decode account ID (%v): %v", blob.RequestHeader.AccountID, err)) + return api.NewErrorInvalidArg(fmt.Sprintf("failed to decode account ID (%v): %v", blob.RequestHeader.AccountID, err)) } pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) if err != nil { s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError(fmt.Sprintf("failed to decode public key (%v): %v", hexutil.Encode(publicKeyBytes), err)) + return api.NewErrorInvalidArg(fmt.Sprintf("failed to decode public key (%v): %v", hexutil.Encode(publicKeyBytes), err)) } authenticatedAddress := crypto.PubkeyToAddress(*pubKey).String() @@ -176,18 +176,18 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse case err := <-errCh: s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError(fmt.Sprintf("error receiving next message: %v", err)) + return api.NewErrorInvalidArg(fmt.Sprintf("error receiving next message: %v", err)) case <-ctx.Done(): s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError("context deadline exceeded") + return api.NewErrorInvalidArg("context deadline exceeded") } challengeReply, ok := in.GetPayload().(*pb.AuthenticatedRequest_AuthenticationData) if !ok { s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError("expected AuthenticationData") + return api.NewErrorInvalidArg("expected AuthenticationData") } blob.RequestHeader.Nonce = challenge @@ -197,7 +197,7 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse if err != nil { s.metrics.HandleInvalidArgRpcRequest("DisperseBlobAuthenticated") s.metrics.HandleInvalidArgRequest("DisperseBlobAuthenticated") - return api.NewInvalidArgError(fmt.Sprintf("failed to authenticate blob request: %v", err)) + return api.NewErrorInvalidArg(fmt.Sprintf("failed to authenticate blob request: %v", err)) } // Disperse the blob @@ -230,7 +230,7 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(req.GetData()), "DisperseBlob") } s.metrics.HandleInvalidArgRpcRequest("DisperseBlob") - return nil, api.NewInvalidArgError(err.Error()) + return nil, api.NewErrorInvalidArg(err.Error()) } reply, err := s.disperseBlob(ctx, blob, "", "DisperseBlob") @@ -265,7 +265,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName) } s.metrics.HandleInvalidArgRpcRequest(apiMethodName) - return nil, api.NewInvalidArgError(err.Error()) + return nil, api.NewErrorInvalidArg(err.Error()) } s.logger.Debug("received a new blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", ")) @@ -286,7 +286,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut } s.metrics.HandleStoreFailureRpcRequest(apiMethodName) s.logger.Error("failed to store blob", "err", err) - return nil, api.NewInternalError("failed to store blob, please try again later") + return nil, api.NewErrorInternal(fmt.Sprintf("store blob: %v", err)) } for _, param := range securityParams { @@ -440,13 +440,13 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context globalRates, ok := s.rateConfig.QuorumRateInfos[param.QuorumID] if !ok { s.metrics.HandleInternalFailureRpcRequest(apiMethodName) - return api.NewInternalError(fmt.Sprintf("no configured rate exists for quorum %d", param.QuorumID)) + return api.NewErrorInternal(fmt.Sprintf("no configured rate exists for quorum %d", param.QuorumID)) } accountRates, accountKey, err := s.getAccountRate(origin, authenticatedAddress, param.QuorumID) if err != nil { s.metrics.HandleInternalFailureRpcRequest(apiMethodName) - return api.NewInternalError(err.Error()) + return api.NewErrorInternal(err.Error()) } // Note: There's an implicit assumption that an empty name means the account @@ -524,14 +524,14 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context if err != nil { s.metrics.HandleInternalFailureRpcRequest(apiMethodName) s.metrics.HandleFailedRequest(codes.Internal.String(), "", blobSize, apiMethodName) - return api.NewInternalError(err.Error()) + return api.NewErrorInternal(err.Error()) } if !allowed { info, ok := params.Info.(limiterInfo) if !ok { s.metrics.HandleInternalFailureRpcRequest(apiMethodName) - return api.NewInternalError("failed to cast limiterInfo") + return api.NewErrorInternal("failed to cast limiterInfo") } if info.RateType == SystemThroughputType || info.RateType == SystemBlobRateType { s.metrics.HandleSystemRateLimitedRpcRequest(apiMethodName) @@ -542,7 +542,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRatesToHeader(ctx context.Context s.logger.Info("request ratelimited", "requesterName", requesterName, "requesterID", params.RequesterID, "rateType", info.RateType.String(), "quorum", info.QuorumID) } errorString := fmt.Sprintf("request ratelimited: %s for quorum %d", info.RateType.String(), info.QuorumID) - return api.NewResourceExhaustedError(errorString) + return api.NewErrorResourceExhausted(errorString) } return nil @@ -559,7 +559,7 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR if len(requestID) == 0 { s.metrics.HandleInvalidArgRpcRequest("GetBlobStatus") s.metrics.HandleInvalidArgRequest("GetBlobStatus") - return nil, api.NewInvalidArgError("request_id must not be empty") + return nil, api.NewErrorInvalidArg("request_id must not be empty") } s.logger.Info("received a new blob status request", "requestID", string(requestID)) @@ -567,7 +567,7 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR if err != nil { s.metrics.HandleInvalidArgRpcRequest("GetBlobStatus") s.metrics.HandleInvalidArgRequest("GetBlobStatus") - return nil, api.NewInvalidArgError(fmt.Sprintf("failed to parse the requestID: %s", err.Error())) + return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to parse the requestID: %s", err.Error())) } s.logger.Debug("metadataKey", "metadataKey", metadataKey.String()) @@ -576,16 +576,16 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR if errors.Is(err, disperser.ErrMetadataNotFound) { s.metrics.HandleNotFoundRpcRequest("GetBlobStatus") s.metrics.HandleNotFoundRequest("GetBlobStatus") - return nil, api.NewNotFoundError("no metadata found for the requestID") + return nil, api.NewErrorNotFound("no metadata found for the requestID") } s.metrics.HandleInternalFailureRpcRequest("GetBlobStatus") - return nil, api.NewInternalError(fmt.Sprintf("failed to get blob metadata, blobkey: %s", metadataKey.String())) + return nil, api.NewErrorInternal(fmt.Sprintf("failed to get blob metadata, blobkey: %s", metadataKey.String())) } isConfirmed, err := metadata.IsConfirmed() if err != nil { s.metrics.HandleInternalFailureRpcRequest("GetBlobStatus") - return nil, api.NewInternalError(fmt.Sprintf("missing confirmation information: %s", err.Error())) + return nil, api.NewErrorInternal(fmt.Sprintf("missing confirmation information: %s", err.Error())) } s.metrics.HandleSuccessfulRpcRequest("GetBlobStatus") @@ -673,7 +673,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob if err != nil { s.metrics.HandleInvalidArgRpcRequest("RetrieveBlob") s.metrics.HandleInvalidArgRequest("RetrieveBlob") - return nil, api.NewInvalidArgError(err.Error()) + return nil, api.NewErrorInvalidArg(err.Error()) } stageTimer := time.Now() @@ -689,7 +689,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob }) if err != nil { s.metrics.HandleInternalFailureRpcRequest("RetrieveBlob") - return nil, api.NewInternalError(fmt.Sprintf("ratelimiter error: %v", err)) + return nil, api.NewErrorInternal(fmt.Sprintf("ratelimiter error: %v", err)) } if !allowed { s.metrics.HandleRateLimitedRpcRequest("RetrieveBlob") @@ -699,7 +699,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob if ok { errorString += ": " + info } - return nil, api.NewResourceExhaustedError(errorString) + return nil, api.NewErrorResourceExhausted(errorString) } } s.logger.Debug("checked retrieval blob rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalBlobRateType.Plug()), "duration", time.Since(stageTimer).String()) @@ -719,17 +719,17 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob if errors.Is(err, disperser.ErrMetadataNotFound) { s.metrics.HandleNotFoundRpcRequest("RetrieveBlob") s.metrics.HandleNotFoundRequest("RetrieveBlob") - return nil, api.NewNotFoundError("no metadata found for the given batch header hash and blob index") + return nil, api.NewErrorNotFound("no metadata found for the given batch header hash and blob index") } s.metrics.HandleInternalFailureRpcRequest("RetrieveBlob") s.metrics.IncrementFailedBlobRequestNum(codes.Internal.String(), "", "RetrieveBlob") - return nil, api.NewInternalError("failed to get blob metadata, please retry") + return nil, api.NewErrorInternal("failed to get blob metadata, please retry") } if blobMetadata.Expiry < uint64(time.Now().Unix()) { s.metrics.HandleNotFoundRpcRequest("RetrieveBlob") s.metrics.HandleNotFoundRequest("RetrieveBlob") - return nil, api.NewNotFoundError("no metadata found for the given batch header hash and blob index") + return nil, api.NewErrorNotFound("no metadata found for the given batch header hash and blob index") } s.logger.Debug("fetched blob metadata", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "duration", time.Since(stageTimer).String()) @@ -749,7 +749,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob }) if err != nil { s.metrics.HandleInternalFailureRpcRequest("RetrieveBlob") - return nil, api.NewInternalError(fmt.Sprintf("ratelimiter error: %v", err)) + return nil, api.NewErrorInternal(fmt.Sprintf("ratelimiter error: %v", err)) } if !allowed { s.metrics.HandleRateLimitedRpcRequest("RetrieveBlob") @@ -759,7 +759,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob if ok { errorString += ": " + info } - return nil, api.NewResourceExhaustedError(errorString) + return nil, api.NewErrorResourceExhausted(errorString) } } s.logger.Debug("checked retrieval throughput rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalThroughputType.Plug()), "duration (ms)", time.Since(stageTimer).String()) @@ -770,7 +770,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob s.logger.Error("Failed to retrieve blob", "err", err) s.metrics.HandleInternalFailureRpcRequest("RetrieveBlob") s.metrics.HandleFailedRequest(codes.Internal.String(), "", len(data), "RetrieveBlob") - return nil, api.NewInternalError("failed to get blob data, please retry") + return nil, api.NewErrorInternal("failed to get blob data, please retry") } s.metrics.HandleSuccessfulRpcRequest("RetrieveBlob") s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob") @@ -939,7 +939,7 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb _, err := rs.ToFrArray(data) if err != nil { s.logger.Error("failed to convert a 32bytes as a field element", "err", err) - return nil, api.NewInvalidArgError("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617") + return nil, api.NewErrorInvalidArg("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617") } quorumConfig, err := s.updateQuorumConfig(ctx) diff --git a/disperser/apiserver/server_v2.go b/disperser/apiserver/server_v2.go index 5ecebadfb6..1489b4e4c0 100644 --- a/disperser/apiserver/server_v2.go +++ b/disperser/apiserver/server_v2.go @@ -12,7 +12,6 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigensdk-go/logging" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" ) @@ -37,15 +36,15 @@ func NewDispersalServerV2( } func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBlobRequest) (*pb.DisperseBlobReply, error) { - return &pb.DisperseBlobReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") + return &pb.DisperseBlobReply{}, api.NewErrorUnimplemented() } func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatusRequest) (*pb.BlobStatusReply, error) { - return &pb.BlobStatusReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") + return &pb.BlobStatusReply{}, api.NewErrorUnimplemented() } func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobCommitmentRequest) (*pb.BlobCommitmentReply, error) { - return &pb.BlobCommitmentReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") + return &pb.BlobCommitmentReply{}, api.NewErrorUnimplemented() } func (s *DispersalServerV2) Start(ctx context.Context) error { diff --git a/node/grpc/server.go b/node/grpc/server.go index 0cba1b1877..8394951f6f 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -99,34 +99,34 @@ func (s *Server) handleStoreChunksRequest(ctx context.Context, in *pb.StoreChunk func (s *Server) validateStoreChunkRequest(in *pb.StoreChunksRequest) error { if in.GetBatchHeader() == nil { - return api.NewInvalidArgError("missing batch_header in request") + return api.NewErrorInvalidArg("missing batch_header in request") } if in.GetBatchHeader().GetBatchRoot() == nil { - return api.NewInvalidArgError("missing batch_root in request") + return api.NewErrorInvalidArg("missing batch_root in request") } if in.GetBatchHeader().GetReferenceBlockNumber() == 0 { - return api.NewInvalidArgError("missing reference_block_number in request") + return api.NewErrorInvalidArg("missing reference_block_number in request") } if len(in.GetBlobs()) == 0 { - return api.NewInvalidArgError("missing blobs in request") + return api.NewErrorInvalidArg("missing blobs in request") } for _, blob := range in.Blobs { if blob.GetHeader() == nil { - return api.NewInvalidArgError("missing blob header in request") + return api.NewErrorInvalidArg("missing blob header in request") } if node.ValidatePointsFromBlobHeader(blob.GetHeader()) != nil { - return api.NewInvalidArgError("invalid points contained in the blob header in request") + return api.NewErrorInvalidArg("invalid points contained in the blob header in request") } if len(blob.GetHeader().GetQuorumHeaders()) == 0 { - return api.NewInvalidArgError("missing quorum headers in request") + return api.NewErrorInvalidArg("missing quorum headers in request") } if len(blob.GetHeader().GetQuorumHeaders()) != len(blob.GetBundles()) { - return api.NewInvalidArgError("the number of quorums must be the same as the number of bundles") + return api.NewErrorInvalidArg("the number of quorums must be the same as the number of bundles") } for _, q := range blob.GetHeader().GetQuorumHeaders() { if q.GetQuorumId() > core.MaxQuorumID { - return api.NewInvalidArgError(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, q.GetQuorumId())) + return api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, q.GetQuorumId())) } if err := core.ValidateSecurityParam(q.GetConfirmationThreshold(), q.GetAdversaryThreshold()); err != nil { return err @@ -172,35 +172,35 @@ func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*p func (s *Server) validateStoreBlobsRequest(in *pb.StoreBlobsRequest) error { if in.GetReferenceBlockNumber() == 0 { - return api.NewInvalidArgError("missing reference_block_number in request") + return api.NewErrorInvalidArg("missing reference_block_number in request") } if len(in.GetBlobs()) == 0 { - return api.NewInvalidArgError("missing blobs in request") + return api.NewErrorInvalidArg("missing blobs in request") } for _, blob := range in.Blobs { if blob.GetHeader() == nil { - return api.NewInvalidArgError("missing blob header in request") + return api.NewErrorInvalidArg("missing blob header in request") } if node.ValidatePointsFromBlobHeader(blob.GetHeader()) != nil { - return api.NewInvalidArgError("invalid points contained in the blob header in request") + return api.NewErrorInvalidArg("invalid points contained in the blob header in request") } if len(blob.GetHeader().GetQuorumHeaders()) == 0 { - return api.NewInvalidArgError("missing quorum headers in request") + return api.NewErrorInvalidArg("missing quorum headers in request") } if len(blob.GetHeader().GetQuorumHeaders()) != len(blob.GetBundles()) { - return api.NewInvalidArgError("the number of quorums must be the same as the number of bundles") + return api.NewErrorInvalidArg("the number of quorums must be the same as the number of bundles") } for _, q := range blob.GetHeader().GetQuorumHeaders() { if q.GetQuorumId() > core.MaxQuorumID { - return api.NewInvalidArgError(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, q.GetQuorumId())) + return api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, q.GetQuorumId())) } if err := core.ValidateSecurityParam(q.GetConfirmationThreshold(), q.GetAdversaryThreshold()); err != nil { return err } } if in.GetReferenceBlockNumber() != blob.GetHeader().GetReferenceBlockNumber() { - return api.NewInvalidArgError("reference_block_number must be the same for all blobs") + return api.NewErrorInvalidArg("reference_block_number must be the same for all blobs") } } return nil @@ -257,7 +257,7 @@ func (s *Server) AttestBatch(ctx context.Context, in *pb.AttestBatchRequest) (*p blobHeaderHashes := make([][32]byte, len(in.GetBlobHeaderHashes())) for i, hash := range in.GetBlobHeaderHashes() { if len(hash) != 32 { - return nil, api.NewInvalidArgError("invalid blob header hash") + return nil, api.NewErrorInvalidArg("invalid blob header hash") } var h [32]byte copy(h[:], hash) diff --git a/node/grpc/server_v2.go b/node/grpc/server_v2.go index ca690e2943..b2e02027a1 100644 --- a/node/grpc/server_v2.go +++ b/node/grpc/server_v2.go @@ -11,7 +11,6 @@ import ( "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/shirou/gopsutil/mem" - "google.golang.org/grpc/codes" ) // ServerV2 implements the Node v2 proto APIs. @@ -54,9 +53,9 @@ func (s *ServerV2) NodeInfo(ctx context.Context, in *pb.NodeInfoRequest) (*pb.No } func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) { - return &pb.StoreChunksReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") + return &pb.StoreChunksReply{}, api.NewErrorUnimplemented() } func (s *ServerV2) GetChunks(context.Context, *pb.GetChunksRequest) (*pb.GetChunksReply, error) { - return &pb.GetChunksReply{}, api.NewGRPCError(codes.Unimplemented, "not implemented") + return &pb.GetChunksReply{}, api.NewErrorUnimplemented() } diff --git a/node/utils.go b/node/utils.go index 91722e6055..2559dd2e80 100644 --- a/node/utils.go +++ b/node/utils.go @@ -20,7 +20,7 @@ import ( // interface, see grpc.Server.validateStoreChunkRequest. func GetBatchHeader(in *pb.BatchHeader) (*core.BatchHeader, error) { if in == nil || len(in.GetBatchRoot()) == 0 { - return nil, api.NewInvalidArgError("batch header is nil or empty") + return nil, api.NewErrorInvalidArg("batch header is nil or empty") } var batchRoot [32]byte copy(batchRoot[:], in.GetBatchRoot()) @@ -143,7 +143,7 @@ func ValidatePointsFromBlobHeader(h *pb.BlobHeader) error { func GetBlobHeaderFromProto(h *pb.BlobHeader) (*core.BlobHeader, error) { if h == nil { - return nil, api.NewInvalidArgError("GetBlobHeaderFromProto: blob header is nil") + return nil, api.NewErrorInvalidArg("GetBlobHeaderFromProto: blob header is nil") } @@ -184,7 +184,7 @@ func GetBlobHeaderFromProto(h *pb.BlobHeader) (*core.BlobHeader, error) { quorumHeaders := make([]*core.BlobQuorumInfo, len(h.GetQuorumHeaders())) for i, header := range h.GetQuorumHeaders() { if header.GetQuorumId() > core.MaxQuorumID { - return nil, api.NewInvalidArgError(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, header.GetQuorumId())) + return nil, api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, header.GetQuorumId())) } if err := core.ValidateSecurityParam(header.GetConfirmationThreshold(), header.GetAdversaryThreshold()); err != nil { return nil, err diff --git a/operators/churner/churner.go b/operators/churner/churner.go index dd857fbba1..4a6c5453db 100644 --- a/operators/churner/churner.go +++ b/operators/churner/churner.go @@ -113,7 +113,7 @@ func (c *churner) ProcessChurnRequest(ctx context.Context, operatorToRegisterAdd for _, quorumID := range churnRequest.QuorumIDs { for _, quorumIDAlreadyRegisteredFor := range quorumIDsAlreadyRegisteredFor { if quorumIDAlreadyRegisteredFor == quorumID { - return nil, api.NewInvalidArgError("operator is already registered in quorum") + return nil, api.NewErrorInvalidArg("operator is already registered in quorum") } } } @@ -238,7 +238,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op "registering operator address: %s, registering operator stake: %d, " + "stake of lowest-stake operator: %d, operatorId of lowest-stake operator: " + "%x, quorum ID: %d" - return nil, api.NewInvalidArgError(fmt.Sprintf(msg, float64(operatorSetParams.ChurnBIPsOfOperatorStake)/100.0-100.0, currentBlockNumber, operatorToRegisterAddress.Hex(), operatorToRegisterStake, lowestStake, lowestStakeOperatorId, quorumID)) + return nil, api.NewErrorInvalidArg(fmt.Sprintf(msg, float64(operatorSetParams.ChurnBIPsOfOperatorStake)/100.0-100.0, currentBlockNumber, operatorToRegisterAddress.Hex(), operatorToRegisterStake, lowestStake, lowestStakeOperatorId, quorumID)) } // verify the lowest stake against the total stake @@ -254,7 +254,7 @@ func (c *churner) getOperatorsToChurn(ctx context.Context, quorumIDs []uint8, op "Block number used for this decision: %d, operatorId of the operator " + "to churn: %x, stake of the operator to churn: %d, total stake in " + "quorum: %d, quorum ID: %d" - return nil, api.NewInvalidArgError(fmt.Sprintf(msg, float64(operatorSetParams.ChurnBIPsOfTotalStake)/100.0, currentBlockNumber, lowestStakeOperatorId.Hex(), lowestStake, totalStake, quorumID)) + return nil, api.NewErrorInvalidArg(fmt.Sprintf(msg, float64(operatorSetParams.ChurnBIPsOfTotalStake)/100.0, currentBlockNumber, lowestStakeOperatorId.Hex(), lowestStake, totalStake, quorumID)) } operatorToChurnAddress, err := c.Transactor.OperatorIDToAddress(ctx, lowestStakeOperatorId) diff --git a/operators/churner/server.go b/operators/churner/server.go index dc8e26fc90..fc8ba93103 100644 --- a/operators/churner/server.go +++ b/operators/churner/server.go @@ -58,7 +58,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl err := s.validateChurnRequest(ctx, req) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest) - return nil, api.NewInvalidArgError(fmt.Sprintf("invalid request: %s", err.Error())) + return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid request: %s", err.Error())) } timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { @@ -71,26 +71,26 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl // Global rate limiting: check that we are after the previous approval's expiry if now.Unix() < s.latestExpiry { s.metrics.IncrementFailedRequestNum("Churn", FailReasonPrevApprovalNotExpired) - return nil, api.NewResourceExhaustedError(fmt.Sprintf("previous approval not expired, retry in %d seconds", s.latestExpiry-now.Unix())) + return nil, api.NewErrorResourceExhausted(fmt.Sprintf("previous approval not expired, retry in %d seconds", s.latestExpiry-now.Unix())) } request, err := createChurnRequest(req) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidRequest) - return nil, api.NewInvalidArgError(err.Error()) + return nil, api.NewErrorInvalidArg(err.Error()) } operatorToRegisterAddress, err := s.churner.VerifyRequestSignature(ctx, request) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonInvalidSignature) - return nil, api.NewInvalidArgError(fmt.Sprintf("failed to verify request signature: %s", err.Error())) + return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to verify request signature: %s", err.Error())) } // Per-operator rate limiting: check if the request should be rate limited err = s.checkShouldBeRateLimited(now, *request) if err != nil { s.metrics.IncrementFailedRequestNum("Churn", FailReasonRateLimitExceeded) - return nil, api.NewResourceExhaustedError(fmt.Sprintf("rate limiter error: %s", err.Error())) + return nil, api.NewErrorResourceExhausted(fmt.Sprintf("rate limiter error: %s", err.Error())) } response, err := s.churner.ProcessChurnRequest(ctx, operatorToRegisterAddress, request) @@ -99,7 +99,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl return nil, err } s.metrics.IncrementFailedRequestNum("Churn", FailReasonProcessChurnRequestFailed) - return nil, api.NewInternalError(fmt.Sprintf("failed to process churn request: %s", err.Error())) + return nil, api.NewErrorInternal(fmt.Sprintf("failed to process churn request: %s", err.Error())) } // update the latest expiry