Skip to content

Commit

Permalink
fix(bigtable): retry on grpc internal err
Browse files Browse the repository at this point in the history
  • Loading branch information
Tangui-Bitfly committed Nov 26, 2024
1 parent caf4ce3 commit 5b7b9d2
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions db2/store/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"slices"
"strings"
"time"

"cloud.google.com/go/bigtable"
Expand All @@ -15,7 +16,8 @@ import (
var ErrNotFound = fmt.Errorf("not found")

const (
timeout = time.Minute // Timeout duration for Bigtable operations
timeout = time.Minute // Timeout duration for Bigtable operations
maxRetries = 5
)

type TableWrapper struct {
Expand Down Expand Up @@ -64,6 +66,8 @@ func (w TableWrapper) GetRowsRange(high, low string) (map[string]map[string][]by
type BigTableStore struct {
client *bigtable.Client
admin *bigtable.AdminClient

maxRetries int
}

func NewBigTableWithClient(ctx context.Context, client *bigtable.Client, adminClient *bigtable.AdminClient, tablesAndFamilies map[string][]string) (*BigTableStore, error) {
Expand All @@ -72,7 +76,7 @@ func NewBigTableWithClient(ctx context.Context, client *bigtable.Client, adminCl
return nil, err
}

return &BigTableStore{client: client, admin: adminClient}, nil
return &BigTableStore{client: client, admin: adminClient, maxRetries: maxRetries}, nil
}

// NewBigTable initializes a new BigTableStore
Expand Down Expand Up @@ -260,6 +264,25 @@ func (b BigTableStore) GetRow(table, key string) (map[string][]byte, error) {
}

func (b BigTableStore) GetRowsRange(table, high, low string) (map[string]map[string][]byte, error) {
var err error
var data map[string]map[string][]byte
for i := 0; i < b.maxRetries; i++ {
data, err = b.getRowsRange(table, high, low)
if err == nil {
if len(data) == 0 {
return nil, ErrNotFound
}
return data, nil
}
// return directly if error is not grpc Internal
if !strings.Contains(err.Error(), codes.Internal.String()) {
return nil, fmt.Errorf("could not read rows: %v", err)
}
}
return nil, fmt.Errorf("could not get rows after %d tries: %v", b.maxRetries, err)
}

func (b BigTableStore) getRowsRange(table, high, low string) (map[string]map[string][]byte, error) {
tbl := b.client.Open(table)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
Expand All @@ -276,24 +299,15 @@ func (b BigTableStore) GetRowsRange(table, high, low string) (map[string]map[str
return true
})

if err != nil {
return nil, fmt.Errorf("could not read rows: %v", err)
}
if len(data) == 0 {
return nil, ErrNotFound
}

return data, nil
return data, err
}

func (b BigTableStore) GetRowKeys(table, prefix string) ([]string, error) {
// Open the transfer table for reading
tbl := b.client.Open(table)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var data []string
// Read all rows from the table and collect all the row keys
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
data = append(data, row.Key())
return true
Expand Down

0 comments on commit 5b7b9d2

Please sign in to comment.