Skip to content

Commit

Permalink
go/storage/client: Add WithBackendOverride option
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jun 22, 2021
1 parent 3d559ed commit 113ebfc
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
1 change: 1 addition & 0 deletions .changelog/4061.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/executor: Do not route local storage applies via gRPC
40 changes: 37 additions & 3 deletions go/storage/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/common/crypto/mathrand"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
Expand All @@ -39,6 +40,21 @@ const (
maxRetries = 15
)

// Option is a storage client option.
type Option func(b *storageClientBackend)

// WithBackendOverride overrides the storage backend for the specified node. The passed backend is
// used instead of the gRPC backend when performing storage requests.
func WithBackendOverride(nodeID signature.PublicKey, backend api.Backend) Option {
return func(b *storageClientBackend) {
if b.backendOverrides == nil {
b.backendOverrides = make(map[signature.PublicKey]api.Backend)
}

b.backendOverrides[nodeID] = backend
}
}

// storageClientBackend contains all information about the client storage API
// backend, including the backend state and the connected storage nodes' state.
type storageClientBackend struct {
Expand All @@ -48,6 +64,8 @@ type storageClientBackend struct {

nodesClient grpc.NodesClient
runtime registry.RuntimeDescriptorProvider

backendOverrides map[signature.PublicKey]api.Backend
}

// Implements api.StorageClient.
Expand Down Expand Up @@ -90,7 +108,7 @@ func (b *storageClientBackend) writeWithClient( // nolint: gocyclo

// Determine the minimum replication factor. In case we don't have a runtime descriptor provider
// we make the safe choice of assuming that the replication factor is the same as the number of
// conencted nodes.
// connected nodes.
minWriteReplication := n
if b.runtime != nil {
rt, err := b.runtime.ActiveDescriptor(ctx)
Expand All @@ -113,8 +131,16 @@ func (b *storageClientBackend) writeWithClient( // nolint: gocyclo
go func(conn *grpc.ConnWithNodeMeta) {
var resp interface{}
op := func() error {
// If a backend override is configured, use it instead of going through gRPC.
var backend api.Backend
if override, ok := b.backendOverrides[conn.Node.ID]; ok {
backend = override
} else {
backend = api.NewStorageClient(conn.ClientConn)
}

var rerr error
resp, rerr = fn(ctx, api.NewStorageClient(conn.ClientConn), conn.Node)
resp, rerr = fn(ctx, backend, conn.Node)
if rerr != nil {
b.logger.Debug("storage write request error",
"err", rerr,
Expand Down Expand Up @@ -347,7 +373,15 @@ func (b *storageClientBackend) readWithClient(

var err error
for _, conn := range nodes {
resp, err = fn(ctx, api.NewStorageClient(conn.ClientConn))
// If a backend override is configured, use it instead of going through gRPC.
var backend api.Backend
if override, ok := b.backendOverrides[conn.Node.ID]; ok {
backend = override
} else {
backend = api.NewStorageClient(conn.ClientConn)
}

resp, err = fn(ctx, backend)
if ctx.Err() != nil {
return backoff.Permanent(ctx.Err())
}
Expand Down
15 changes: 12 additions & 3 deletions go/storage/client/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ func NewForNodesClient(
ctx context.Context,
client grpc.NodesClient,
runtime registry.RuntimeDescriptorProvider,
opts ...Option,
) (api.Backend, error) {
b := &storageClientBackend{
ctx: ctx,
logger: logging.GetLogger("storage/client"),
nodesClient: client,
runtime: runtime,
}

for _, opt := range opts {
opt(b)
}

return api.NewMetricsWrapper(b), nil
}

Expand All @@ -41,12 +47,13 @@ func NewForNodes(
ident *identity.Identity,
nodes nodes.NodeDescriptorLookup,
runtime registry.RuntimeDescriptorProvider,
opts ...Option,
) (api.Backend, error) {
client, err := grpc.NewNodesClient(ctx, nodes, grpc.WithClientAuthentication(ident))
if err != nil {
return nil, fmt.Errorf("storage/client: failed to create committee client: %w", err)
}
return NewForNodesClient(ctx, client, runtime)
return NewForNodesClient(ctx, client, runtime, opts...)
}

// NewForPublicStorage creates a new storage client that automatically follows a given runtime's storage nodes
Expand All @@ -57,6 +64,7 @@ func NewForPublicStorage(
ident *identity.Identity,
registryBackend registry.Backend,
runtime registry.RuntimeDescriptorProvider,
opts ...Option,
) (api.Backend, error) {
nl, err := nodes.NewRuntimeNodeLookup(
ctx,
Expand All @@ -76,7 +84,7 @@ func NewForPublicStorage(
),
)

return NewForNodes(ctx, ident, publicStorageNl, runtime)
return NewForNodes(ctx, ident, publicStorageNl, runtime, opts...)
}

// NewStatic creates a new storage client that only follows a specific storage node.
Expand All @@ -87,13 +95,14 @@ func NewStatic(
ident *identity.Identity,
registryBackend registry.Backend,
nodeID signature.PublicKey,
opts ...Option,
) (api.Backend, error) {
nw, err := nodes.NewVersionedNodeDescriptorWatcher(ctx, registryBackend)
if err != nil {
return nil, fmt.Errorf("storage/client: failed to create node descriptor watcher: %w", err)
}

client, err := NewForNodes(ctx, ident, nw, nil)
client, err := NewForNodes(ctx, ident, nw, nil, opts...)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions go/worker/common/committee/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,11 @@ func (g *Group) Start() error {
// Check if we have the local storage backend available (e.g., this node is also a storage node
// for this runtime). In this case we override the storage client's backend so that any updates
// don't go via gRPC but are redirected directly to the local backend instead.
var scOpts []storageClient.Option
var localStorageBackend storage.LocalBackend
if lsb, ok := g.runtime.Storage().(storage.LocalBackend); ok && g.runtime.HasRoles(node.RoleStorageWorker) {
localStorageBackend = lsb
scOpts = append(scOpts, storageClient.WithBackendOverride(g.identity.NodeSigner.Public(), lsb))
}

// Create the storage client.
Expand All @@ -563,6 +565,7 @@ func (g *Group) Start() error {
g.identity,
nodes.NewFilteredNodeLookup(g.nodes, nodes.TagFilter(TagForCommittee(scheduler.KindStorage))),
g.runtime,
scOpts...,
)
if err != nil {
return fmt.Errorf("group: failed to create storage client: %w", err)
Expand Down

0 comments on commit 113ebfc

Please sign in to comment.