Skip to content

Commit

Permalink
fix(disperser-client): RetrieveBlob grpc max size regression bug (#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
samlaf authored Oct 31, 2024
1 parent b22ddf2 commit e2ead56
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 25 deletions.
48 changes: 43 additions & 5 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ type Config struct {
// TODO: do we want to add config timeouts for those separate requests?
Timeout time.Duration
UseSecureGrpcFlag bool
// MaxRetrieveBlobSizeBytes is the maximum size of a blob that can be retrieved by using
// the RetrieveBlob method. This is used to set the max message size for the grpc client.
// DisperserClient uses a single underlying grpc channel shared for all methods,
// but all other methods use the default 4MiB max message size, whereas RetrieveBlob
// potentially needs a larger size.
//
// If not set, default value is 100MiB for forward compatibility.
// Check official documentation for current max blob size on mainnet.
MaxRetrieveBlobSizeBytes int
}

// Deprecated: Use &Config{...} directly instead
Expand Down Expand Up @@ -64,6 +73,9 @@ type disperserClient struct {
// This means a conservative estimate of 100-1000MB/sec, which should be amply sufficient.
// If we ever need to increase this, we could either consider asking the disperser to increase its limit,
// or to use a pool of connections here.
// TODO: we should refactor or make a new constructor which allows setting conn and/or client
// via dependency injection. This would allow for testing via https://pkg.go.dev/google.golang.org/grpc/test/bufconn
// instead of a real network connection for eg.
conn *grpc.ClientConn
client disperser_rpc.DisperserClient
}
Expand All @@ -90,12 +102,36 @@ var _ DisperserClient = &disperserClient{}
//
// // Subsequent calls will use the existing connection
// status2, requestId2, err := client.DisperseBlob(ctx, otherData, otherQuorums)
func NewDisperserClient(config *Config, signer core.BlobRequestSigner) *disperserClient {
func NewDisperserClient(config *Config, signer core.BlobRequestSigner) (*disperserClient, error) {
if err := checkConfigAndSetDefaults(config); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
return &disperserClient{
config: config,
signer: signer,
// conn and client are initialized lazily
}, nil
}

func checkConfigAndSetDefaults(c *Config) error {
if c == nil {
return fmt.Errorf("config is nil")
}
if c.Hostname == "" {
return fmt.Errorf("config.Hostname is empty")
}
if c.Port == "" {
return fmt.Errorf("config.Port is empty")
}
if c.Timeout == 0 {
return fmt.Errorf("config.Timeout is 0")
}
if c.MaxRetrieveBlobSizeBytes == 0 {
// Set to 100MiB for forward compatibility.
// Check official documentation for current max blob size on mainnet.
c.MaxRetrieveBlobSizeBytes = 100 * 1024 * 1024
}
return nil
}

// Close closes the grpc connection to the disperser server.
Expand Down Expand Up @@ -281,10 +317,12 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()
reply, err := c.client.RetrieveBlob(ctxTimeout, &disperser_rpc.RetrieveBlobRequest{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
})
reply, err := c.client.RetrieveBlob(ctxTimeout,
&disperser_rpc.RetrieveBlobRequest{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
},
grpc.MaxCallRecvMsgSize(c.config.MaxRetrieveBlobSizeBytes)) // for client
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions api/clients/disperser_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (

func TestPutBlobNoopSigner(t *testing.T) {
config := clients.NewConfig("nohost", "noport", time.Second, false)
disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner())
disperserClient, err := clients.NewDisperserClient(config, auth.NewLocalNoopSigner())
assert.NoError(t, err)

test := []byte("test")
test[0] = 0x00 // make sure the first byte of the requst is always 0
quorums := []uint8{0}
_, _, err := disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums)
_, _, err = disperserClient.DisperseBlobAuthenticated(context.Background(), test, quorums)
st, isGRPCError := status.FromError(err)
assert.True(t, isGRPCError)
assert.Equal(t, codes.InvalidArgument.String(), st.Code().String())
Expand Down
15 changes: 10 additions & 5 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type IEigenDAClient interface {
}

// See the NewEigenDAClient constructor's documentation for details and usage examples.
// TODO: Refactor this struct and interface above to use same naming convention as disperser client.
// Also need to make the fields private and use the constructor in the tests.
type EigenDAClient struct {
// TODO: all of these should be private, to prevent users from using them directly,
// which breaks encapsulation and makes it hard for us to do refactors or changes
Expand Down Expand Up @@ -83,16 +85,16 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
var edasmCaller *edasm.ContractEigenDAServiceManagerCaller
ethClient, err = ethclient.Dial(config.EthRpcUrl)
if err != nil {
return nil, fmt.Errorf("failed to dial ETH RPC node: %w", err)
return nil, fmt.Errorf("dial ETH RPC node: %w", err)
}
edasmCaller, err = edasm.NewContractEigenDAServiceManagerCaller(common.HexToAddress(config.SvcManagerAddr), ethClient)
if err != nil {
return nil, fmt.Errorf("failed to create EigenDAServiceManagerCaller: %w", err)
return nil, fmt.Errorf("new EigenDAServiceManagerCaller: %w", err)
}

host, port, err := net.SplitHostPort(config.RPC)
if err != nil {
return nil, fmt.Errorf("failed to parse EigenDA RPC: %w", err)
return nil, fmt.Errorf("parse EigenDA RPC: %w", err)
}

var signer core.BlobRequestSigner
Expand All @@ -106,11 +108,14 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
}

disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS)
disperserClient := NewDisperserClient(disperserConfig, signer)
disperserClient, err := NewDisperserClient(disperserConfig, signer)
if err != nil {
return nil, fmt.Errorf("new disperser-client: %w", err)
}

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("error initializing EigenDA client: %w", err)
return nil, fmt.Errorf("create low level codec: %w", err)
}

var codec codecs.BlobCodec
Expand Down
26 changes: 26 additions & 0 deletions api/clients/mock/disperser_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package mock

import (
"context"

disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
)

// Currently only implements the RetrieveBlob RPC
type DisperserServer struct {
disperser_rpc.UnimplementedDisperserServer
}

// RetrieveBlob returns a ~5MiB(+header_size) blob. It is used to test that the client correctly sets the max message size,
// to be able to support large blobs (default grpc max message size is 4MiB).
func (m *DisperserServer) RetrieveBlob(ctx context.Context, req *disperser_rpc.RetrieveBlobRequest) (*disperser_rpc.RetrieveBlobReply, error) {
// Create a blob larger than default max size (4MiB)
largeBlob := make([]byte, 5*1024*1024) // 5MiB
for i := range largeBlob {
largeBlob[i] = byte(i % 256)
}

return &disperser_rpc.RetrieveBlobReply{
Data: largeBlob,
}, nil
}
4 changes: 2 additions & 2 deletions inabox/tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ var _ = Describe("Inabox Integration", func() {
privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
signer := auth.NewLocalBlobRequestSigner(privateKeyHex)

disp := clients.NewDisperserClient(&clients.Config{
disp, err := clients.NewDisperserClient(&clients.Config{
Hostname: "localhost",
Port: "32003",
Timeout: 10 * time.Second,
}, signer)

Expect(err).To(BeNil())
Expect(disp).To(Not(BeNil()))

data := make([]byte, 1024)
Expand Down
5 changes: 3 additions & 2 deletions inabox/tests/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,16 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

disp := clients.NewDisperserClient(&clients.Config{
disp, err := clients.NewDisperserClient(&clients.Config{
Hostname: "localhost",
Port: testConfig.Dispersers[0].DISPERSER_SERVER_GRPC_PORT,
Timeout: 10 * time.Second,
}, nil)
assert.NoError(t, err)
assert.NotNil(t, disp)

data := make([]byte, c.blobSize)
_, err := rand.Read(data)
_, err = rand.Read(data)
assert.NoError(t, err)

dispersalTicker := time.NewTicker(c.dispersalInterval)
Expand Down
9 changes: 7 additions & 2 deletions tools/traffic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"os"
"os/signal"
"sync"
Expand All @@ -27,12 +28,16 @@ func NewTrafficGenerator(config *Config, signer core.BlobRequestSigner) (*Traffi
loggerConfig := common.DefaultLoggerConfig()
logger, err := common.NewLogger(loggerConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("new logger: %w", err)
}

dispserserClient, err := clients.NewDisperserClient(&config.Config, signer)
if err != nil {
return nil, fmt.Errorf("new disperser-client: %w", err)
}
return &TrafficGenerator{
Logger: logger,
DisperserClient: clients.NewDisperserClient(&config.Config, signer),
DisperserClient: dispserserClient,
Config: config,
}, nil
}
Expand Down
8 changes: 6 additions & 2 deletions tools/traffic/generator_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) {

unconfirmedKeyChannel := make(chan *workers.UnconfirmedKey, 100)

disperserClient := clients.NewDisperserClient(config.DisperserClientConfig, signer)
disperserClient, err := clients.NewDisperserClient(config.DisperserClientConfig, signer)
if err != nil {
cancel()
return nil, fmt.Errorf("new disperser-client: %w", err)
}
statusVerifier := workers.NewBlobStatusTracker(
&ctx,
&waitGroup,
Expand Down Expand Up @@ -134,7 +138,7 @@ func NewTrafficGeneratorV2(config *config.Config) (*Generator, error) {
waitGroup: &waitGroup,
generatorMetrics: generatorMetrics,
logger: &logger,
disperserClient: clients.NewDisperserClient(config.DisperserClientConfig, signer),
disperserClient: disperserClient,
eigenDAClient: client,
config: config,
writers: writers,
Expand Down
7 changes: 4 additions & 3 deletions tools/traffic/workers/blob_status_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package workers

import (
"context"
"math/rand"
"sync"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/Layr-Labs/eigenda/tools/traffic/table"
"github.com/Layr-Labs/eigensdk-go/logging"
"math/rand"
"sync"
"time"
)

// BlobStatusTracker periodically polls the disperser service to verify the status of blobs that were recently written.
Expand Down
5 changes: 3 additions & 2 deletions tools/traffic/workers/blob_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"crypto/md5"
"crypto/rand"
"fmt"
"sync"
"time"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/Layr-Labs/eigenda/tools/traffic/config"
"github.com/Layr-Labs/eigenda/tools/traffic/metrics"
"github.com/Layr-Labs/eigensdk-go/logging"
"sync"
"time"
)

// BlobWriter sends blobs to a disperser at a configured rate.
Expand Down

0 comments on commit e2ead56

Please sign in to comment.