Skip to content

Commit

Permalink
Remove minibatcher (#783)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored and hopeyen committed Oct 7, 2024
1 parent ff498a4 commit 03900ad
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 3,850 deletions.
78 changes: 74 additions & 4 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"sync"

commonaws "github.com/Layr-Labs/eigenda/common/aws"
Expand Down Expand Up @@ -126,12 +127,13 @@ func (c *Client) PutItems(ctx context.Context, tableName string, items []Item) (
}

func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item Item) (Item, error) {
err := ensureKeyAttributes(key, item)
if err != nil {
return nil, err
}

update := expression.UpdateBuilder{}
for itemKey, itemValue := range item {
if _, ok := key[itemKey]; ok {
// Cannot update the key
continue
}
update = update.Set(expression.Name(itemKey), expression.Value(itemValue))
}

Expand All @@ -156,6 +158,46 @@ func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item
return resp.Attributes, err
}

func (c *Client) UpdateItemIncrement(ctx context.Context, tableName string, key Key, item Item) (Item, error) {
err := ensureKeyAttributes(key, item)
if err != nil {
return nil, err
}

update := expression.UpdateBuilder{}
for itemKey, itemValue := range item {
// ADD numeric values
if n, ok := itemValue.(*types.AttributeValueMemberN); ok {
f, _ := strconv.ParseFloat(n.Value, 64)
update = update.Add(expression.Name(itemKey), expression.Value(aws.Float64(f)))

} else {
// For non-numeric values, use SET as before
update = update.Set(expression.Name(itemKey), expression.Value(itemValue))
}
}

expr, err := expression.NewBuilder().WithUpdate(update).Build()
if err != nil {
return nil, err
}

resp, err := c.dynamoClient.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: aws.String(tableName),
Key: key,
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
UpdateExpression: expr.Update(),
ReturnValues: types.ReturnValueUpdatedNew,
})
if err != nil {
fmt.Println("error updating item", err)
return nil, err
}

return resp.Attributes, nil
}

func (c *Client) GetItem(ctx context.Context, tableName string, key Key) (Item, error) {
resp, err := c.dynamoClient.GetItem(ctx, &dynamodb.GetItemInput{Key: key, TableName: aws.String(tableName)})
if err != nil {
Expand Down Expand Up @@ -191,6 +233,23 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
return response.Items, nil
}

// QueryIndexOrderWithLimit returns all items in the index that match the given key
func (c *Client) QueryIndexOrderWithLimit(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, forward bool, limit int32) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
ScanIndexForward: &forward,
Limit: aws.Int32(limit),
})
if err != nil {
return nil, err
}

return response.Items, nil
}

// Query returns all items in the primary index that match the given expression
func (c *Client) Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpresseionValues) ([]Item, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
Expand Down Expand Up @@ -359,3 +418,14 @@ func (c *Client) readItems(ctx context.Context, tableName string, keys []Key) ([

return items, nil
}

func ensureKeyAttributes(key Key, item Item) error {
for itemKey := range item {
if _, ok := key[itemKey]; !ok {
// Cannot update the key
return fmt.Errorf("cannot update key %s", itemKey)
}
}

return nil
}
Loading

0 comments on commit 03900ad

Please sign in to comment.