diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index dc937db33..c2422ea00 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -26,6 +26,15 @@ type Config struct { // TODO: do we want to add config timeouts for those separate requests? Timeout time.Duration UseSecureGrpcFlag bool + // MaxRetrieveBlobSizeBytes is the maximum size of a blob that can be retrieved by using + // the RetrieveBlob method. This is used to set the max message size for the grpc client. + // DisperserClient uses a single underlying grpc channel shared for all methods, + // but all other methods use the default 4MiB max message size, whereas RetrieveBlob + // potentially needs a larger size. + // + // If not set, default value is 100MiB for forward compatibility. + // Check official documentation for current max blob size on mainnet. + MaxRetrieveBlobSizeBytes int } // Deprecated: Use &Config{...} directly instead @@ -64,6 +73,9 @@ type disperserClient struct { // This means a conservative estimate of 100-1000MB/sec, which should be amply sufficient. // If we ever need to increase this, we could either consider asking the disperser to increase its limit, // or to use a pool of connections here. + // TODO: we should refactor or make a new constructor which allows setting conn and/or client + // via dependency injection. This would allow for testing via https://pkg.go.dev/google.golang.org/grpc/test/bufconn + // instead of a real network connection for eg. conn *grpc.ClientConn client disperser_rpc.DisperserClient } @@ -90,12 +102,36 @@ var _ DisperserClient = &disperserClient{} // // // Subsequent calls will use the existing connection // status2, requestId2, err := client.DisperseBlob(ctx, otherData, otherQuorums) -func NewDisperserClient(config *Config, signer core.BlobRequestSigner) *disperserClient { +func NewDisperserClient(config *Config, signer core.BlobRequestSigner) (*disperserClient, error) { + if err := checkConfigAndSetDefaults(config); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } return &disperserClient{ config: config, signer: signer, // conn and client are initialized lazily + }, nil +} + +func checkConfigAndSetDefaults(c *Config) error { + if c == nil { + return fmt.Errorf("config is nil") + } + if c.Hostname == "" { + return fmt.Errorf("config.Hostname is empty") + } + if c.Port == "" { + return fmt.Errorf("config.Port is empty") } + if c.Timeout == 0 { + return fmt.Errorf("config.Timeout is 0") + } + if c.MaxRetrieveBlobSizeBytes == 0 { + // Set to 100MiB for forward compatibility. + // Check official documentation for current max blob size on mainnet. + c.MaxRetrieveBlobSizeBytes = 100 * 1024 * 1024 + } + return nil } // Close closes the grpc connection to the disperser server. @@ -281,10 +317,12 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60) defer cancel() - reply, err := c.client.RetrieveBlob(ctxTimeout, &disperser_rpc.RetrieveBlobRequest{ - BatchHeaderHash: batchHeaderHash, - BlobIndex: blobIndex, - }) + reply, err := c.client.RetrieveBlob(ctxTimeout, + &disperser_rpc.RetrieveBlobRequest{ + BatchHeaderHash: batchHeaderHash, + BlobIndex: blobIndex, + }, + grpc.MaxCallRecvMsgSize(c.config.MaxRetrieveBlobSizeBytes)) // for client if err != nil { return nil, err } diff --git a/api/clients/disperser_client_test.go b/api/clients/disperser_client_test.go index 28d98dea7..00277657b 100644 --- a/api/clients/disperser_client_test.go +++ b/api/clients/disperser_client_test.go @@ -14,12 +14,13 @@ import ( func TestPutBlobNoopSigner(t *testing.T) { config := clients.NewConfig("nohost", "noport", time.Second, false) - disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner()) + disperserClient, err := clients.NewDisperserClient(config, auth.NewLocalNoopSigner()) + assert.NoError(t, err) test := []byte("test") test[0] = 0x00 // make sure the first byte of the requst is always 0 quorums := []uint8{0} - _, _, err := disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums) + _, _, err = disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums) st, isGRPCError := status.FromError(err) assert.True(t, isGRPCError) assert.Equal(t, codes.InvalidArgument.String(), st.Code().String()) diff --git a/api/clients/eigenda_client.go b/api/clients/eigenda_client.go index 5d12a3f54..81d771680 100644 --- a/api/clients/eigenda_client.go +++ b/api/clients/eigenda_client.go @@ -33,6 +33,8 @@ type IEigenDAClient interface { } // See the NewEigenDAClient constructor's documentation for details and usage examples. +// TODO: Refactor this struct and interface above to use same naming convention as disperser client. +// Also need to make the fields private and use the constructor in the tests. type EigenDAClient struct { // TODO: all of these should be private, to prevent users from using them directly, // which breaks encapsulation and makes it hard for us to do refactors or changes @@ -83,16 +85,16 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien var edasmCaller *edasm.ContractEigenDAServiceManagerCaller ethClient, err = ethclient.Dial(config.EthRpcUrl) if err != nil { - return nil, fmt.Errorf("failed to dial ETH RPC node: %w", err) + return nil, fmt.Errorf("dial ETH RPC node: %w", err) } edasmCaller, err = edasm.NewContractEigenDAServiceManagerCaller(common.HexToAddress(config.SvcManagerAddr), ethClient) if err != nil { - return nil, fmt.Errorf("failed to create EigenDAServiceManagerCaller: %w", err) + return nil, fmt.Errorf("new EigenDAServiceManagerCaller: %w", err) } host, port, err := net.SplitHostPort(config.RPC) if err != nil { - return nil, fmt.Errorf("failed to parse EigenDA RPC: %w", err) + return nil, fmt.Errorf("parse EigenDA RPC: %w", err) } var signer core.BlobRequestSigner @@ -106,11 +108,14 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien } disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS) - disperserClient := NewDisperserClient(disperserConfig, signer) + disperserClient, err := NewDisperserClient(disperserConfig, signer) + if err != nil { + return nil, fmt.Errorf("new disperser-client: %w", err) + } lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion) if err != nil { - return nil, fmt.Errorf("error initializing EigenDA client: %w", err) + return nil, fmt.Errorf("create low level codec: %w", err) } var codec codecs.BlobCodec diff --git a/api/clients/mock/disperser_server.go b/api/clients/mock/disperser_server.go new file mode 100644 index 000000000..4ebcf44b5 --- /dev/null +++ b/api/clients/mock/disperser_server.go @@ -0,0 +1,26 @@ +package mock + +import ( + "context" + + disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" +) + +// Currently only implements the RetrieveBlob RPC +type DisperserServer struct { + disperser_rpc.UnimplementedDisperserServer +} + +// RetrieveBlob returns a ~5MiB(+header_size) blob. It is used to test that the client correctly sets the max message size, +// to be able to support large blobs (default grpc max message size is 4MiB). +func (m *DisperserServer) RetrieveBlob(ctx context.Context, req *disperser_rpc.RetrieveBlobRequest) (*disperser_rpc.RetrieveBlobReply, error) { + // Create a blob larger than default max size (4MiB) + largeBlob := make([]byte, 5*1024*1024) // 5MiB + for i := range largeBlob { + largeBlob[i] = byte(i % 256) + } + + return &disperser_rpc.RetrieveBlobReply{ + Data: largeBlob, + }, nil +} diff --git a/inabox/tests/integration_test.go b/inabox/tests/integration_test.go index 4bfdfa22f..d7dd44a7c 100644 --- a/inabox/tests/integration_test.go +++ b/inabox/tests/integration_test.go @@ -36,12 +36,12 @@ var _ = Describe("Inabox Integration", func() { privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" signer := auth.NewLocalBlobRequestSigner(privateKeyHex) - disp := clients.NewDisperserClient(&clients.Config{ + disp, err := clients.NewDisperserClient(&clients.Config{ Hostname: "localhost", Port: "32003", Timeout: 10 * time.Second, }, signer) - + Expect(err).To(BeNil()) Expect(disp).To(Not(BeNil())) data := make([]byte, 1024) diff --git a/inabox/tests/ratelimit_test.go b/inabox/tests/ratelimit_test.go index 47a93decb..1f72cbe01 100644 --- a/inabox/tests/ratelimit_test.go +++ b/inabox/tests/ratelimit_test.go @@ -107,15 +107,16 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - disp := clients.NewDisperserClient(&clients.Config{ + disp, err := clients.NewDisperserClient(&clients.Config{ Hostname: "localhost", Port: testConfig.Dispersers[0].DISPERSER_SERVER_GRPC_PORT, Timeout: 10 * time.Second, }, nil) + assert.NoError(t, err) assert.NotNil(t, disp) data := make([]byte, c.blobSize) - _, err := rand.Read(data) + _, err = rand.Read(data) assert.NoError(t, err) dispersalTicker := time.NewTicker(c.dispersalInterval) diff --git a/tools/traffic/generator.go b/tools/traffic/generator.go index 2f8731ffd..5afdf0b56 100644 --- a/tools/traffic/generator.go +++ b/tools/traffic/generator.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "fmt" "os" "os/signal" "sync" @@ -27,12 +28,16 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi loggerConfig := common.DefaultLoggerConfig() logger, err := common.NewLogger(loggerConfig) if err != nil { - return nil, err + return nil, fmt.Errorf("new logger: %w", err) } + dispserserClient, err := clients.NewDisperserClient(&config.Config, signer) + if err != nil { + return nil, fmt.Errorf("new disperser-client: %w", err) + } return &TrafficGenerator{ Logger: logger, - DisperserClient: clients.NewDisperserClient(&config.Config, signer), + DisperserClient: dispserserClient, Config: config, }, nil } diff --git a/tools/traffic/generator_v2.go b/tools/traffic/generator_v2.go index 7b2a92159..61575b9c0 100644 --- a/tools/traffic/generator_v2.go +++ b/tools/traffic/generator_v2.go @@ -88,7 +88,11 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { unconfirmedKeyChannel := make(chan *workers.UnconfirmedKey, 100) - disperserClient := clients.NewDisperserClient(config.DisperserClientConfig, signer) + disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer) + if err != nil { + cancel() + return nil, fmt.Errorf("new disperser-client: %w", err) + } statusVerifier := workers.NewBlobStatusTracker( &ctx, &waitGroup, @@ -134,7 +138,7 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) { waitGroup: &waitGroup, generatorMetrics: generatorMetrics, logger: &logger, - disperserClient: clients.NewDisperserClient(config.DisperserClientConfig, signer), + disperserClient: disperserClient, eigenDAClient: client, config: config, writers: writers, diff --git a/tools/traffic/workers/blob_status_tracker.go b/tools/traffic/workers/blob_status_tracker.go index 8209b2e6e..15bcca84b 100644 --- a/tools/traffic/workers/blob_status_tracker.go +++ b/tools/traffic/workers/blob_status_tracker.go @@ -2,15 +2,16 @@ package workers import ( "context" + "math/rand" + "sync" + "time" + "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/api/grpc/disperser" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/Layr-Labs/eigenda/tools/traffic/table" "github.com/Layr-Labs/eigensdk-go/logging" - "math/rand" - "sync" - "time" ) // BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written. diff --git a/tools/traffic/workers/blob_writer.go b/tools/traffic/workers/blob_writer.go index a8a8e71ad..e0add4b35 100644 --- a/tools/traffic/workers/blob_writer.go +++ b/tools/traffic/workers/blob_writer.go @@ -5,13 +5,14 @@ import ( "crypto/md5" "crypto/rand" "fmt" + "sync" + "time" + "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/encoding/utils/codec" "github.com/Layr-Labs/eigenda/tools/traffic/config" "github.com/Layr-Labs/eigenda/tools/traffic/metrics" "github.com/Layr-Labs/eigensdk-go/logging" - "sync" - "time" ) // BlobWriter sends blobs to a disperser at a configured rate.