Skip to content

Commit

Permalink
Merge pull request #2669 from oasislabs/kostko/fix/storage-client-retry
Browse files Browse the repository at this point in the history
go/storage/client: Retry storage ops on specific errors
  • Loading branch information
kostko authored Feb 12, 2020
2 parents f4fc2c3 + 6ab29f8 commit 933e53e
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 58 deletions.
1 change: 1 addition & 0 deletions .changelog/1865.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/storage/client: Retry storage ops on specific errors
12 changes: 6 additions & 6 deletions go/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,23 @@ type Proof = syncer.Proof
// ApplyOp is an apply operation within a batch of apply operations.
type ApplyOp struct {
// SrcRound is the source root round.
SrcRound uint64
SrcRound uint64 `json:"src_round"`
// SrcRoot is the merkle root to apply the operations against. It may
// refer to a nil node (empty hash) in which case a new root will be
// created.
SrcRoot hash.Hash
SrcRoot hash.Hash `json:"src_root"`
// DstRoot is the expected merkle root after applying the write log.
DstRoot hash.Hash
DstRoot hash.Hash `json:"dst_root"`
// WriteLog is a write log of operations to apply.
WriteLog WriteLog
WriteLog WriteLog `json:"writelog"`
}

// MergeOps is a merge operation within a batch of merge operations.
type MergeOp struct {
// Base is the base root for the merge.
Base hash.Hash
Base hash.Hash `json:"base"`
// Others is a list of roots derived from base that should be merged.
Others []hash.Hash
Others []hash.Hash `json:"others"`
}

// ApplyRequest is an Apply request.
Expand Down
11 changes: 5 additions & 6 deletions go/storage/api/root_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package api

import (
"context"
"fmt"
"sync"

"github.com/pkg/errors"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/cache/lru"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
Expand Down Expand Up @@ -73,18 +72,18 @@ func (rc *RootCache) Merge(
for _, rootHash := range others[1:] {
it, err := rc.localDB.GetWriteLog(ctx, baseRoot, Root{Namespace: ns, Round: round + 1, Hash: rootHash})
if err != nil {
return nil, errors.Wrap(err, "storage/rootcache: failed to read write log")
return nil, fmt.Errorf("storage/rootcache: failed to read write log: %w", err)
}

if err = tree.ApplyWriteLog(ctx, it); err != nil {
return nil, errors.Wrap(err, "storage/rootcache: failed to apply write log")
return nil, fmt.Errorf("storage/rootcache: failed to apply write log: %w", err)
}
}

var mergedRoot hash.Hash
var err error
if _, mergedRoot, err = tree.Commit(ctx, ns, round+1); err != nil {
return nil, errors.Wrap(err, "storage/rootcache: failed to commit write log")
return nil, fmt.Errorf("storage/rootcache: failed to commit write log: %w", err)
}

return &mergedRoot, nil
Expand Down Expand Up @@ -189,7 +188,7 @@ func NewRootCache(
) (*RootCache, error) {
applyLocks, err := lru.New(lru.Capacity(applyLockLRUSlots, false))
if err != nil {
return nil, errors.Wrap(err, "storage/rootcache: failed to create applyLocks")
return nil, fmt.Errorf("storage/rootcache: failed to create applyLocks: %w", err)
}

// When we implement a caching client again, we want to persist
Expand Down
79 changes: 52 additions & 27 deletions go/storage/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package client
import (
"context"
cryptorand "crypto/rand"
"errors"
"math/rand"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -93,11 +93,25 @@ func (b *storageClientBackend) writeWithClient(
op := func() error {
var rerr error
resp, rerr = fn(ctx, api.NewStorageClient(conn.ClientConn), conn.Node)
if status.Code(rerr) == codes.PermissionDenied && numRetries < maxRetries {
// Writes can fail around an epoch transition due to policy errors,
// make sure to retry in this case (up to maxRetries).
if numRetries < maxRetries {
numRetries++
return rerr

switch {
case status.Code(rerr) == codes.Unavailable:
// Storage node may be temporarily unavailable.
return rerr
case status.Code(rerr) == codes.PermissionDenied:
// Writes can fail around an epoch transition due to policy errors.
return rerr
case errors.Is(rerr, api.ErrPreviousRoundMismatch):
// Storage node may not have yet processed the epoch transition.
return rerr
case errors.Is(rerr, api.ErrRootNotFound):
// Storage node may not have yet processed the epoch transition.
return rerr
default:
// All other errors are permanent.
}
}
return backoff.Permanent(rerr)
}
Expand Down Expand Up @@ -284,33 +298,44 @@ func (b *storageClientBackend) readWithClient(
return nil, ErrStorageNotAvailable
}

// TODO: Use a more clever approach to choose the order in which to read
// from the connected nodes:
// https://github.com/oasislabs/oasis-core/issues/1815.
rng := rand.New(mathrand.New(cryptorand.Reader))

var (
err error
resp interface{}
numRetries int
resp interface{}
)
for _, randIndex := range rng.Perm(n) {
conn := conns[randIndex]

resp, err = fn(ctx, api.NewStorageClient(conn.ClientConn))
if ctx.Err() != nil {
return nil, ctx.Err()
op := func() error {
// TODO: Use a more clever approach to choose the order in which to read
// from the connected nodes:
// https://github.com/oasislabs/oasis-core/issues/1815.
rng := rand.New(mathrand.New(cryptorand.Reader))

var err error
for _, randIndex := range rng.Perm(n) {
conn := conns[randIndex]

resp, err = fn(ctx, api.NewStorageClient(conn.ClientConn))
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
}
if err != nil {
b.logger.Error("failed to get response from a storage node",
"node", conn.Node,
"err", err,
"runtime_id", ns,
)
continue
}
return nil
}
if err != nil {
b.logger.Error("failed to get response from a storage node",
"node", conn.Node,
"err", err,
"runtime_id", ns,
)
continue
if numRetries < maxRetries {
numRetries++
return err
}
return resp, err
return backoff.Permanent(err)
}
return nil, err

sched := backoff.NewConstantBackOff(retryInterval)
err := backoff.Retry(op, backoff.WithContext(sched, ctx))
return resp, err
}

func (b *storageClientBackend) SyncGet(ctx context.Context, request *api.GetRequest) (*api.ProofResponse, error) {
Expand Down
16 changes: 8 additions & 8 deletions go/storage/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package database

import (
"context"

"github.com/pkg/errors"
"errors"
"fmt"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
Expand Down Expand Up @@ -56,13 +56,13 @@ func New(cfg *api.Config) (api.Backend, error) {
err = errors.New("storage/database: unsupported backend")
}
if err != nil {
return nil, errors.Wrap(err, "storage/database: failed to create node database")
return nil, fmt.Errorf("storage/database: failed to create node database: %w", err)
}

rootCache, err := api.NewRootCache(ndb, nil, cfg.ApplyLockLRUSlots, cfg.InsecureSkipChecks)
if err != nil {
ndb.Close()
return nil, errors.Wrap(err, "storage/database: failed to create root cache")
return nil, fmt.Errorf("storage/database: failed to create root cache: %w", err)
}

// Satisfy the interface....
Expand All @@ -88,7 +88,7 @@ func (ba *databaseBackend) Apply(ctx context.Context, request *api.ApplyRequest)
request.WriteLog,
)
if err != nil {
return nil, errors.Wrap(err, "storage/database: failed to Apply")
return nil, fmt.Errorf("storage/database: failed to Apply: %w", err)
}

receipt, err := api.SignReceipt(ba.signer, request.Namespace, request.DstRound, []hash.Hash{*newRoot})
Expand All @@ -100,7 +100,7 @@ func (ba *databaseBackend) ApplyBatch(ctx context.Context, request *api.ApplyBat
for _, op := range request.Ops {
newRoot, err := ba.rootCache.Apply(ctx, request.Namespace, op.SrcRound, op.SrcRoot, request.DstRound, op.DstRoot, op.WriteLog)
if err != nil {
return nil, errors.Wrap(err, "storage/database: failed to Apply, op")
return nil, fmt.Errorf("storage/database: failed to Apply, op: %w", err)
}
newRoots = append(newRoots, *newRoot)
}
Expand All @@ -112,7 +112,7 @@ func (ba *databaseBackend) ApplyBatch(ctx context.Context, request *api.ApplyBat
func (ba *databaseBackend) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) {
newRoot, err := ba.rootCache.Merge(ctx, request.Namespace, request.Round, request.Base, request.Others)
if err != nil {
return nil, errors.Wrap(err, "storage/database: failed to Merge")
return nil, fmt.Errorf("storage/database: failed to Merge: %w", err)
}

receipt, err := api.SignReceipt(ba.signer, request.Namespace, request.Round+1, []hash.Hash{*newRoot})
Expand All @@ -124,7 +124,7 @@ func (ba *databaseBackend) MergeBatch(ctx context.Context, request *api.MergeBat
for _, op := range request.Ops {
newRoot, err := ba.rootCache.Merge(ctx, request.Namespace, request.Round, op.Base, op.Others)
if err != nil {
return nil, errors.Wrap(err, "storage/database: failed to Merge, op")
return nil, fmt.Errorf("storage/database: failed to Merge, op: %w", err)
}
newRoots = append(newRoots, *newRoot)
}
Expand Down
25 changes: 14 additions & 11 deletions go/storage/mkvs/urkel/db/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,44 @@ package api

import (
"context"
"errors"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crypto/hash"
"github.com/oasislabs/oasis-core/go/common/errors"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/node"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/writelog"
)

// ModuleName is the module name.
const ModuleName = "storage/mkvs/db"

var (
// ErrNodeNotFound indicates that a node with the specified hash couldn't be found
// in the database.
ErrNodeNotFound = errors.New("urkel: node not found in node db")
ErrNodeNotFound = errors.New(ModuleName, 1, "urkel: node not found in node db")
// ErrWriteLogNotFound indicates that a write log for the specified storage hashes
// couldn't be found.
ErrWriteLogNotFound = errors.New("urkel: write log not found in node db")
ErrWriteLogNotFound = errors.New(ModuleName, 2, "urkel: write log not found in node db")
// ErrNotFinalized indicates that the operation requires a round to be finalized
// but the round is not yet finalized.
ErrNotFinalized = errors.New("urkel: round is not yet finalized")
ErrNotFinalized = errors.New(ModuleName, 3, "urkel: round is not yet finalized")
// ErrAlreadyFinalized indicates that the given round has already been finalized.
ErrAlreadyFinalized = errors.New("urkel: round has already been finalized")
ErrAlreadyFinalized = errors.New(ModuleName, 4, "urkel: round has already been finalized")
// ErrRoundNotFound indicates that the given round cannot be found.
ErrRoundNotFound = errors.New("urkel: round not found")
ErrRoundNotFound = errors.New(ModuleName, 5, "urkel: round not found")
// ErrPreviousRoundMismatch indicates that the round given for the old root does
// not match the previous round.
ErrPreviousRoundMismatch = errors.New("urkel: previous round mismatch")
ErrPreviousRoundMismatch = errors.New(ModuleName, 6, "urkel: previous round mismatch")
// ErrRoundWentBackwards indicates that the new round is earlier than an already
// inserted round.
ErrRoundWentBackwards = errors.New("urkel: round went backwards")
ErrRoundWentBackwards = errors.New(ModuleName, 7, "urkel: round went backwards")
// ErrRootNotFound indicates that the given root cannot be found.
ErrRootNotFound = errors.New("urkel: root not found")
ErrRootNotFound = errors.New(ModuleName, 8, "urkel: root not found")
// ErrRootMustFollowOld indicates that the passed new root does not follow old root.
ErrRootMustFollowOld = errors.New("urkel: root must follow old root")
ErrRootMustFollowOld = errors.New(ModuleName, 9, "urkel: root must follow old root")
// ErrBadNamespace indicates that the passed namespace does not match what is
// actually contained within the database.
ErrBadNamespace = errors.New("urkel: bad namespace")
ErrBadNamespace = errors.New(ModuleName, 10, "urkel: bad namespace")
)

// Config is the node database backend configuration.
Expand Down

0 comments on commit 933e53e

Please sign in to comment.