diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index 7446fd9ef6290..e030737c9a58a 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -189,6 +189,9 @@ type Log struct { // readyForQuery is used to determine if all indexes are in place // for event queries. readyForQuery *atomic.Bool + + // isBillingModeProvisioned tracks if the table has provisioned capacity or not. + isBillingModeProvisioned bool } type event struct { @@ -301,6 +304,11 @@ func New(ctx context.Context, cfg Config, backend backend.Backend) (*Log, error) return nil, trace.Wrap(err) } + b.isBillingModeProvisioned, err = b.getBillingModeIsProvisioned(ctx) + if err != nil { + return nil, trace.Wrap(err) + } + // Migrate the table. go b.migrateWithRetry(ctx, []migrationTask{ {b.migrateRFD24, "migrateRFD24"}, @@ -1158,6 +1166,17 @@ func (l *Log) getTableStatus(ctx context.Context, tableName string) (tableStatus return tableStatusOK, nil } +func (l *Log) getBillingModeIsProvisioned(ctx context.Context) (bool, error) { + table, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ + TableName: aws.String(l.Tablename), + }) + if err != nil { + return false, trace.Wrap(err) + } + + return *table.Table.BillingModeSummary.BillingMode == dynamodb.BillingModeProvisioned, nil +} + // indexExists checks if a given index exists on a given table and that it is active or updating. func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (bool, error) { tableDescription, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ @@ -1195,9 +1214,12 @@ func (l *Log) createV2GSI(ctx context.Context) error { return nil } - provisionedThroughput := dynamodb.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(l.ReadCapacityUnits), - WriteCapacityUnits: aws.Int64(l.WriteCapacityUnits), + var provisionedThroughput *dynamodb.ProvisionedThroughput + if l.isBillingModeProvisioned { + provisionedThroughput = &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(l.ReadCapacityUnits), + WriteCapacityUnits: aws.Int64(l.WriteCapacityUnits), + } } // This defines the update event we send to DynamoDB. @@ -1224,7 +1246,7 @@ func (l *Log) createV2GSI(ctx context.Context) error { Projection: &dynamodb.Projection{ ProjectionType: aws.String("ALL"), }, - ProvisionedThroughput: &provisionedThroughput, + ProvisionedThroughput: provisionedThroughput, }, }, },