Skip to content

Commit

Permalink
Handle DynamoDB pay-per-request mode correctly (#12295)
Browse files Browse the repository at this point in the history
  • Loading branch information
xacrimon committed May 5, 2022
1 parent 96d0638 commit 682b7a0
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ type Log struct {
// readyForQuery is used to determine if all indexes are in place
// for event queries.
readyForQuery *atomic.Bool

// isBillingModeProvisioned tracks if the table has provisioned capacity or not.
isBillingModeProvisioned bool
}

type event struct {
Expand Down Expand Up @@ -301,6 +304,11 @@ func New(ctx context.Context, cfg Config, backend backend.Backend) (*Log, error)
return nil, trace.Wrap(err)
}

b.isBillingModeProvisioned, err = b.getBillingModeIsProvisioned(ctx)
if err != nil {
return nil, trace.Wrap(err)
}

// Migrate the table.
go b.migrateWithRetry(ctx, []migrationTask{
{b.migrateRFD24, "migrateRFD24"},
Expand Down Expand Up @@ -1158,6 +1166,17 @@ func (l *Log) getTableStatus(ctx context.Context, tableName string) (tableStatus
return tableStatusOK, nil
}

func (l *Log) getBillingModeIsProvisioned(ctx context.Context) (bool, error) {
table, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(l.Tablename),
})
if err != nil {
return false, trace.Wrap(err)
}

return *table.Table.BillingModeSummary.BillingMode == dynamodb.BillingModeProvisioned, nil
}

// indexExists checks if a given index exists on a given table and that it is active or updating.
func (l *Log) indexExists(ctx context.Context, tableName, indexName string) (bool, error) {
tableDescription, err := l.svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{
Expand Down Expand Up @@ -1195,9 +1214,12 @@ func (l *Log) createV2GSI(ctx context.Context) error {
return nil
}

provisionedThroughput := dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(l.ReadCapacityUnits),
WriteCapacityUnits: aws.Int64(l.WriteCapacityUnits),
var provisionedThroughput *dynamodb.ProvisionedThroughput
if l.isBillingModeProvisioned {
provisionedThroughput = &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(l.ReadCapacityUnits),
WriteCapacityUnits: aws.Int64(l.WriteCapacityUnits),
}
}

// This defines the update event we send to DynamoDB.
Expand All @@ -1224,7 +1246,7 @@ func (l *Log) createV2GSI(ctx context.Context) error {
Projection: &dynamodb.Projection{
ProjectionType: aws.String("ALL"),
},
ProvisionedThroughput: &provisionedThroughput,
ProvisionedThroughput: provisionedThroughput,
},
},
},
Expand Down

0 comments on commit 682b7a0

Please sign in to comment.