Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(disperser-client): RetrieveBlob grpc max size regression bug #849

Merged
Merged
48 changes: 43 additions & 5 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ type Config struct {
// TODO: do we want to add config timeouts for those separate requests?
Timeout time.Duration
UseSecureGrpcFlag bool
// MaxRetrieveBlobSizeBytes is the maximum size of a blob that can be retrieved by using
// the RetrieveBlob method. This is used to set the max message size for the grpc client.
// DisperserClient uses a single underlying grpc channel shared for all methods,
// but all other methods use the default 4MiB max message size, whereas RetrieveBlob
bxue-l2 marked this conversation as resolved.
Show resolved Hide resolved
// potentially needs a larger size.
//
// If not set, default value is 100MiB for forward compatibility.
// Check official documentation for current max blob size on mainnet.
MaxRetrieveBlobSizeBytes int
}

// Deprecated: Use &Config{...} directly instead
Expand Down Expand Up @@ -64,6 +73,9 @@ type disperserClient struct {
// This means a conservative estimate of 100-1000MB/sec, which should be amply sufficient.
// If we ever need to increase this, we could either consider asking the disperser to increase its limit,
// or to use a pool of connections here.
// TODO: we should refactor or make a new constructor which allows setting conn and/or client
// via dependency injection. This would allow for testing via https://pkg.go.dev/google.golang.org/grpc/test/bufconn
// instead of a real network connection for eg.
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
}
Expand All @@ -90,12 +102,36 @@ var _ DisperserClient = &disperserClient{}
//
// // Subsequent calls will use the existing connection
// status2, requestId2, err := client.DisperseBlob(ctx, otherData, otherQuorums)
func NewDisperserClient(config *Config, signer core.BlobRequestSigner) *disperserClient {
func NewDisperserClient(config *Config, signer core.BlobRequestSigner) (*disperserClient, error) {
if err := checkConfigAndSetDefaults(config); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
return &disperserClient{
config: config,
signer: signer,
// conn and client are initialized lazily
}, nil
}

func checkConfigAndSetDefaults(c *Config) error {
if c == nil {
return fmt.Errorf("config is nil")
}
if c.Hostname == "" {
return fmt.Errorf("config.Hostname is empty")
}
if c.Port == "" {
return fmt.Errorf("config.Port is empty")
}
if c.Timeout == 0 {
return fmt.Errorf("config.Timeout is 0")
}
if c.MaxRetrieveBlobSizeBytes == 0 {
// Set to 100MiB for forward compatibility.
// Check official documentation for current max blob size on mainnet.
c.MaxRetrieveBlobSizeBytes = 100 * 1024 * 1024
}
return nil
}

// Close closes the grpc connection to the disperser server.
Expand Down Expand Up @@ -281,10 +317,12 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()
reply, err := c.client.RetrieveBlob(ctxTimeout, &disperser_rpc.RetrieveBlobRequest{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
})
reply, err := c.client.RetrieveBlob(ctxTimeout,
&disperser_rpc.RetrieveBlobRequest{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
},
grpc.MaxCallRecvMsgSize(c.config.MaxRetrieveBlobSizeBytes)) // for client
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions api/clients/disperser_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (

func TestPutBlobNoopSigner(t *testing.T) {
config := clients.NewConfig("nohost", "noport", time.Second, false)
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner())
disperserClient, err := clients.NewDisperserClient(config, auth.NewLocalNoopSigner())
assert.NoError(t, err)

test := []byte("test")
test[0] = 0x00 // make sure the first byte of the requst is always 0
quorums := []uint8{0}
_, _, err := disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums)
_, _, err = disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums)
st, isGRPCError := status.FromError(err)
assert.True(t, isGRPCError)
assert.Equal(t, codes.InvalidArgument.String(), st.Code().String())
Expand Down
15 changes: 10 additions & 5 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type IEigenDAClient interface {
}

// See the NewEigenDAClient constructor's documentation for details and usage examples.
// TODO: Refactor this struct and interface above to use same naming convention as disperser client.
// Also need to make the fields private and use the constructor in the tests.
type EigenDAClient struct {
// TODO: all of these should be private, to prevent users from using them directly,
// which breaks encapsulation and makes it hard for us to do refactors or changes
Expand Down Expand Up @@ -83,16 +85,16 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
var edasmCaller *edasm.ContractEigenDAServiceManagerCaller
ethClient, err = ethclient.Dial(config.EthRpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial ETH RPC node: %w", err)
return nil, fmt.Errorf("dial ETH RPC node: %w", err)
}
edasmCaller, err = edasm.NewContractEigenDAServiceManagerCaller(common.HexToAddress(config.SvcManagerAddr), ethClient)
if err != nil {
return nil, fmt.Errorf("failed to create EigenDAServiceManagerCaller: %w", err)
return nil, fmt.Errorf("new EigenDAServiceManagerCaller: %w", err)
}

host, port, err := net.SplitHostPort(config.RPC)
if err != nil {
return nil, fmt.Errorf("failed to parse EigenDA RPC: %w", err)
return nil, fmt.Errorf("parse EigenDA RPC: %w", err)
}

var signer core.BlobRequestSigner
Expand All @@ -106,11 +108,14 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
}

disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS)
disperserClient := NewDisperserClient(disperserConfig, signer)
disperserClient, err := NewDisperserClient(disperserConfig, signer)
if err != nil {
return nil, fmt.Errorf("new disperser-client: %w", err)
}

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("error initializing EigenDA client: %w", err)
return nil, fmt.Errorf("create low level codec: %w", err)
}

var codec codecs.BlobCodec
Expand Down
26 changes: 26 additions & 0 deletions api/clients/mock/disperser_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package mock

import (
"context"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
)

// Currently only implements the RetrieveBlob RPC
type DisperserServer struct {
disperser_rpc.UnimplementedDisperserServer
}

// RetrieveBlob returns a ~5MiB(+header_size) blob. It is used to test that the client correctly sets the max message size,
// to be able to support large blobs (default grpc max message size is 4MiB).
func (m *DisperserServer) RetrieveBlob(ctx context.Context, req *disperser_rpc.RetrieveBlobRequest) (*disperser_rpc.RetrieveBlobReply, error) {
// Create a blob larger than default max size (4MiB)
largeBlob := make([]byte, 5*1024*1024) // 5MiB
for i := range largeBlob {
largeBlob[i] = byte(i % 256)
}

return &disperser_rpc.RetrieveBlobReply{
Data: largeBlob,
}, nil
}
4 changes: 2 additions & 2 deletions inabox/tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ var _ = Describe("Inabox Integration", func() {
privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
signer := auth.NewLocalBlobRequestSigner(privateKeyHex)

disp := clients.NewDisperserClient(&clients.Config{
disp, err := clients.NewDisperserClient(&clients.Config{
Hostname: "localhost",
Port: "32003",
Timeout: 10 * time.Second,
}, signer)

Expect(err).To(BeNil())
Expect(disp).To(Not(BeNil()))

data := make([]byte, 1024)
Expand Down
5 changes: 3 additions & 2 deletions inabox/tests/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

disp := clients.NewDisperserClient(&clients.Config{
disp, err := clients.NewDisperserClient(&clients.Config{
Hostname: "localhost",
Port: testConfig.Dispersers[0].DISPERSER_SERVER_GRPC_PORT,
Timeout: 10 * time.Second,
}, nil)
assert.NoError(t, err)
assert.NotNil(t, disp)

data := make([]byte, c.blobSize)
_, err := rand.Read(data)
_, err = rand.Read(data)
assert.NoError(t, err)

dispersalTicker := time.NewTicker(c.dispersalInterval)
Expand Down
9 changes: 7 additions & 2 deletions tools/traffic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"os"
"os/signal"
"sync"
Expand All @@ -27,12 +28,16 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi
loggerConfig := common.DefaultLoggerConfig()
logger, err := common.NewLogger(loggerConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("new logger: %w", err)
}

dispserserClient, err := clients.NewDisperserClient(&config.Config, signer)
if err != nil {
return nil, fmt.Errorf("new disperser-client: %w", err)
}
return &TrafficGenerator{
Logger: logger,
DisperserClient: clients.NewDisperserClient(&config.Config, signer),
DisperserClient: dispserserClient,
Config: config,
}, nil
}
Expand Down
8 changes: 6 additions & 2 deletions tools/traffic/generator_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) {

unconfirmedKeyChannel := make(chan *workers.UnconfirmedKey, 100)

disperserClient := clients.NewDisperserClient(config.DisperserClientConfig, signer)
disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer)
if err != nil {
cancel()
return nil, fmt.Errorf("new disperser-client: %w", err)
}
statusVerifier := workers.NewBlobStatusTracker(
&ctx,
&waitGroup,
Expand Down Expand Up @@ -134,7 +138,7 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) {
waitGroup: &waitGroup,
generatorMetrics: generatorMetrics,
logger: &logger,
disperserClient: clients.NewDisperserClient(config.DisperserClientConfig, signer),
disperserClient: disperserClient,
eigenDAClient: client,
config: config,
writers: writers,
Expand Down
7 changes: 4 additions & 3 deletions tools/traffic/workers/blob_status_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package workers

import (
"context"
"math/rand"
"sync"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/Layr-Labs/eigenda/tools/traffic/table"
"github.com/Layr-Labs/eigensdk-go/logging"
"math/rand"
"sync"
"time"
)

// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written.
Expand Down
5 changes: 3 additions & 2 deletions tools/traffic/workers/blob_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"crypto/md5"
"crypto/rand"
"fmt"
"sync"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/Layr-Labs/eigensdk-go/logging"
"sync"
"time"
)

// BlobWriter sends blobs to a disperser at a configured rate.
Expand Down
Loading