Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove shadow write from disperser #830

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@ var (
dispersalServer *apiserver.DispersalServer
dispersalServerV2 *apiserver.DispersalServerV2

dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
UUID = uuid.New()
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
shadowMetadataTableName = fmt.Sprintf("test-BlobMetadata-Shadow-%v", UUID)
bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID)
dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
UUID = uuid.New()
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
bucketTableName = fmt.Sprintf("test-BucketStore-%v", UUID)

deployLocalStack bool
localStackPort = "4568"
Expand Down Expand Up @@ -589,7 +588,7 @@ func setup() {

}

err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName)
err = deploy.DeployResources(dockertestPool, localStackPort, metadataTableName, bucketTableName)
if err != nil {
teardown()
panic("failed to deploy AWS resources")
Expand Down Expand Up @@ -636,7 +635,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
if err != nil {
panic("failed to create dynamoDB client")
}
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour)

globalParams := common.GlobalRateParams{
CountFailed: false,
Expand Down
6 changes: 2 additions & 4 deletions disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type Config struct {
RateConfig apiserver.RateConfig
EnableRatelimiter bool
BucketTableName string
ShadowTableName string
BucketStoreSize int
EthClientConfig geth.EthClientConfig
MaxBlobSize int
Expand Down Expand Up @@ -70,9 +69,8 @@ func NewConfig(ctx *cli.Context) (Config, error) {
GrpcTimeout: ctx.GlobalDuration(flags.GrpcTimeoutFlag.Name),
},
BlobstoreConfig: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name),
ShadowTableName: ctx.GlobalString(flags.ShadowTableNameFlag.Name),
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
TableName: ctx.GlobalString(flags.DynamoDBTableNameFlag.Name),
},
LoggerConfig: *loggerConfig,
MetricsConfig: disperser.MetricsConfig{
Expand Down
8 changes: 0 additions & 8 deletions disperser/cmd/apiserver/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ var (
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "DYNAMODB_TABLE_NAME"),
}
ShadowTableNameFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "shadow-table-name"),
Usage: "Name of the dynamodb table to shadow write blob metadata",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "SHADOW_TABLE_NAME"),
Value: "",
}
GrpcPortFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "grpc-port"),
Usage: "Port at which disperser listens for grpc calls",
Expand Down Expand Up @@ -126,7 +119,6 @@ var optionalFlags = []cli.Flag{
EnableRatelimiter,
BucketStoreSize,
GrpcTimeoutFlag,
ShadowTableNameFlag,
MaxBlobSize,
}

Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func RunDisperserServer(ctx *cli.Context) error {

bucketName := config.BlobstoreConfig.BucketName
logger.Info("Creating blob store", "bucket", bucketName)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)

reg := prometheus.NewRegistry()
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func RunBatcher(ctx *cli.Context) error {
if err != nil || storeDurationBlocks == 0 {
return fmt.Errorf("failed to get STORE_DURATION_BLOCKS: %w", err)
}
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
queue := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)

cs := coreeth.NewChainState(tx, client)
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RunDataApi(ctx *cli.Context) error {

var (
promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, config.BlobstoreConfig.ShadowTableName, 0)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, 0)
sharedStorage = blobstore.NewSharedStorage(config.BlobstoreConfig.BucketName, s3Client, blobMetadataStore, logger)
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
Expand Down
30 changes: 9 additions & 21 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,19 @@ const (
// - StatusIndex: (Partition Key: Status, Sort Key: RequestedAt) -> Metadata
// - BatchIndex: (Partition Key: BatchHeaderHash, Sort Key: BlobIndex) -> Metadata
type BlobMetadataStore struct {
dynamoDBClient *commondynamodb.Client
logger logging.Logger
tableName string
shadowTableName string
ttl time.Duration
dynamoDBClient *commondynamodb.Client
logger logging.Logger
tableName string
ttl time.Duration
}

func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, shadowTableName string, ttl time.Duration) *BlobMetadataStore {
func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore {
logger.Debugf("creating blob metadata store with table %s with TTL: %s", tableName, ttl)
if shadowTableName != "" {
logger.Debugf("shadow blob metadata will be written to table %s with TTL: %s", shadowTableName, ttl)
}
return &BlobMetadataStore{
dynamoDBClient: dynamoDBClient,
logger: logger.With("component", "BlobMetadataStore"),
tableName: tableName,
shadowTableName: shadowTableName,
ttl: ttl,
dynamoDBClient: dynamoDBClient,
logger: logger.With("component", "BlobMetadataStore"),
tableName: tableName,
ttl: ttl,
}
}

Expand All @@ -56,13 +51,6 @@ func (s *BlobMetadataStore) QueueNewBlobMetadata(ctx context.Context, blobMetada
return err
}

if s.shadowTableName != "" && s.shadowTableName != s.tableName {
err = s.dynamoDBClient.PutItem(ctx, s.shadowTableName, item)
if err != nil {
s.logger.Error("failed to put item into shadow table %s : %v", s.shadowTableName, err)
}
}

return s.dynamoDBClient.PutItem(ctx, s.tableName, item)
}

Expand Down
53 changes: 0 additions & 53 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -301,58 +300,6 @@ func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) {
assert.Nil(t, lastEvaluatedKey)
}

func TestShadowWriteBlobMetadata(t *testing.T) {
ctx := context.Background()

blobKey := disperser.BlobKey{
BlobHash: "shadowblob",
MetadataHash: "shadowhash",
}
expiry := uint64(time.Now().Add(time.Hour).Unix())
metadata := &disperser.BlobMetadata{
MetadataHash: blobKey.MetadataHash,
BlobHash: blobKey.BlobHash,
BlobStatus: disperser.Processing,
Expiry: expiry,
NumRetries: 0,
RequestMetadata: &disperser.RequestMetadata{
BlobRequestHeader: blob.RequestHeader,
BlobSize: blobSize,
RequestedAt: 123,
},
ConfirmationInfo: &disperser.ConfirmationInfo{},
}

err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata)
assert.NoError(t, err)
err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing)
assert.NoError(t, err)
primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey)
assert.NoError(t, err)
assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus)

// Check that the shadow metadata exists but status has NOT been updated
shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{
"MetadataHash": &types.AttributeValueMemberS{
Value: blobKey.MetadataHash,
},
"BlobHash": &types.AttributeValueMemberS{
Value: blobKey.BlobHash,
},
})
assert.NoError(t, err)
shadowMetadata := disperser.BlobMetadata{}
err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata)
assert.NoError(t, err)
assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus)
deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash},
"BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash},
},
})
}

func TestFilterOutExpiredBlobMetadata(t *testing.T) {
ctx := context.Background()

Expand Down
10 changes: 4 additions & 6 deletions disperser/common/blobstore/blobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ var (
deployLocalStack bool
localStackPort = "4569"

dynamoClient *dynamodb.Client
blobMetadataStore *blobstore.BlobMetadataStore
shadowBlobMetadataStore *blobstore.BlobMetadataStore
sharedStorage *blobstore.SharedBlobStore
dynamoClient *dynamodb.Client
blobMetadataStore *blobstore.BlobMetadataStore
sharedStorage *blobstore.SharedBlobStore

UUID = uuid.New()
metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID)
Expand Down Expand Up @@ -106,8 +105,7 @@ func setup(m *testing.M) {
panic("failed to create dynamodb client: " + err.Error())
}

blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, metadataTableName, time.Hour)
shadowBlobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, shadowMetadataTableName, time.Hour)
blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour)
sharedStorage = blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)
}

Expand Down
5 changes: 2 additions & 3 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ type SharedBlobStore struct {
}

type Config struct {
BucketName string
TableName string
ShadowTableName string
BucketName string
TableName string
}

// This represents the s3 fetch result for a blob.
Expand Down
7 changes: 3 additions & 4 deletions inabox/deploy/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ var (
localstackFlagName = "localstack-port"
deployResourcesFlagName = "deploy-resources"

metadataTableName = "test-BlobMetadata"
shadowMetadataTableName = "" // not used
bucketTableName = "test-BucketStore"
metadataTableName = "test-BlobMetadata"
bucketTableName = "test-BucketStore"

chainCmdName = "chain"
localstackCmdName = "localstack"
Expand Down Expand Up @@ -138,7 +137,7 @@ func localstack(ctx *cli.Context) error {
}

if ctx.Bool(deployResourcesFlagName) {
return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, shadowMetadataTableName, bucketTableName)
return deploy.DeployResources(pool, ctx.String(localstackFlagName), metadataTableName, bucketTableName)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion inabox/deploy/localstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func StartDockertestWithLocalstackContainer(localStackPort string) (*dockertest.
return pool, resource, nil
}

func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, shadowTableName, bucketTableName string) error {
func DeployResources(pool *dockertest.Pool, localStackPort, metadataTableName, bucketTableName string) error {

if pool == nil {
var err error
Expand Down
21 changes: 10 additions & 11 deletions inabox/tests/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ var (
dockertestResource *dockertest.Resource
localStackPort string

metadataTableName = "test-BlobMetadata"
shadowMetadataTableName = ""
bucketTableName = "test-BucketStore"
logger logging.Logger
ethClient common.EthClient
rpcClient common.RPCEthClient
mockRollup *rollupbindings.ContractMockRollup
retrievalClient clients.RetrievalClient
numConfirmations int = 3
numRetries = 0
metadataTableName = "test-BlobMetadata"
bucketTableName = "test-BucketStore"
logger logging.Logger
ethClient common.EthClient
rpcClient common.RPCEthClient
mockRollup *rollupbindings.ContractMockRollup
retrievalClient clients.RetrievalClient
numConfirmations int = 3
numRetries = 0

cancel context.CancelFunc
)
Expand Down Expand Up @@ -92,7 +91,7 @@ var _ = BeforeSuite(func() {
dockertestPool = pool
dockertestResource = resource

err = deploy.DeployResources(pool, localStackPort, metadataTableName, shadowMetadataTableName, bucketTableName)
err = deploy.DeployResources(pool, localStackPort, metadataTableName, bucketTableName)
Expect(err).To(BeNil())

} else {
Expand Down
Loading