Skip to content

Commit

Permalink
go/storage: Propagate node identity to client and skip this node on read
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Aug 22, 2019
1 parent 83e4171 commit 5a62dec
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 20 deletions.
4 changes: 2 additions & 2 deletions go/ekiden/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func testStorageClient(t *testing.T, node *testNode) {
ctx := context.Background()

// Storage client tests.
storageClientTests.ClientWorkerTests(t, node.Identity.TLSCertificate, node.Beacon, timeSource, node.Registry, node.Scheduler)
storageClientTests.ClientWorkerTests(t, node.Identity, node.Beacon, timeSource, node.Registry, node.Scheduler)

// Client storage implementation tests.
config := []struct {
Expand All @@ -348,7 +348,7 @@ func testStorageClient(t *testing.T, node *testNode) {
for _, kv := range config {
viper.Set(kv.key, kv.value)
}
debugClient, err := storageClient.New(ctx, node.Identity.TLSCertificate, nil, nil)
debugClient, err := storageClient.New(ctx, node.Identity, nil, nil)
require.NoError(t, err, "NewDebugStorageClient")
storageTests.StorageImplementationTests(t, debugClient, testNamespace)
}
Expand Down
7 changes: 4 additions & 3 deletions go/storage/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package client
import (
"context"
cryptorand "crypto/rand"
"crypto/tls"
"io"
"math/rand"
"sync"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/oasislabs/ekiden/go/common/crypto/hash"
"github.com/oasislabs/ekiden/go/common/crypto/mathrand"
"github.com/oasislabs/ekiden/go/common/crypto/signature"
"github.com/oasislabs/ekiden/go/common/identity"
"github.com/oasislabs/ekiden/go/common/logging"
"github.com/oasislabs/ekiden/go/common/node"
"github.com/oasislabs/ekiden/go/grpc/storage"
Expand Down Expand Up @@ -67,7 +67,7 @@ type storageClientBackend struct {
runtimeWatchersLock sync.RWMutex
runtimeWatchers map[signature.MapKey]storageWatcher

tlsCertificate *tls.Certificate
identity *identity.Identity

haltCtx context.Context
cancelFn context.CancelFunc
Expand Down Expand Up @@ -111,7 +111,7 @@ func (b *storageClientBackend) WatchRuntime(id signature.PublicKey) error {
}

// Watcher doesn't exist. Start new watcher.
watcher = newWatcher(b.ctx, id, b.tlsCertificate, b.scheduler, b.registry)
watcher = newWatcher(b.ctx, id, b.identity, b.scheduler, b.registry)
b.runtimeWatchers[id.ToMapKey()] = watcher

// Signal init when the first registered runtime is initialized.
Expand Down Expand Up @@ -496,6 +496,7 @@ func (b *storageClientBackend) readWithClient(
var resp interface{}
for _, randIndex := range rng.Perm(n) {
state := clientStates[randIndex]

resp, err = fn(ctx, state.client)
if ctx.Err() != nil {
return nil, ctx.Err()
Expand Down
9 changes: 5 additions & 4 deletions go/storage/client/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/oasislabs/ekiden/go/common/crypto/signature"
memorySigner "github.com/oasislabs/ekiden/go/common/crypto/signature/signers/memory"
commonGrpc "github.com/oasislabs/ekiden/go/common/grpc"
"github.com/oasislabs/ekiden/go/common/identity"
"github.com/oasislabs/ekiden/go/common/logging"
"github.com/oasislabs/ekiden/go/grpc/storage"
registry "github.com/oasislabs/ekiden/go/registry/api"
Expand All @@ -36,7 +37,7 @@ const debugModeFakeRuntimeSeed = "ekiden storage client debug runtime"
// New creates a new storage client.
func New(
ctx context.Context,
tlsCertificate *tls.Certificate,
identity *identity.Identity,
schedulerBackend scheduler.Backend,
registryBackend registry.Backend,
) (api.Backend, error) {
Expand All @@ -58,7 +59,7 @@ func New(
return nil, err
}
// Set client certificate.
tlsConfig.Certificates = []tls.Certificate{*tlsCertificate}
tlsConfig.Certificates = []tls.Certificate{*identity.TLSCertificate}
opts = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
} else {
opts = grpc.WithInsecure()
Expand All @@ -82,7 +83,7 @@ func New(
scheduler: schedulerBackend,
registry: registryBackend,
runtimeWatchers: make(map[signature.MapKey]storageWatcher),
tlsCertificate: tlsCertificate,
identity: identity,
}
state := &clientState{
client: client,
Expand All @@ -100,7 +101,7 @@ func New(
scheduler: schedulerBackend,
registry: registryBackend,
runtimeWatchers: make(map[signature.MapKey]storageWatcher),
tlsCertificate: tlsCertificate,
identity: identity,
}

b.haltCtx, b.cancelFn = context.WithCancel(ctx)
Expand Down
6 changes: 3 additions & 3 deletions go/storage/client/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package tests

import (
"context"
"crypto/tls"
"testing"
"time"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/oasislabs/ekiden/go/common"
"github.com/oasislabs/ekiden/go/common/crypto/hash"
"github.com/oasislabs/ekiden/go/common/crypto/signature"
"github.com/oasislabs/ekiden/go/common/identity"
"github.com/oasislabs/ekiden/go/common/node"
epochtime "github.com/oasislabs/ekiden/go/epochtime/api"
epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests"
Expand All @@ -35,7 +35,7 @@ func runtimeIDToNamespace(t *testing.T, runtimeID signature.PublicKey) (ns commo
// ClientWorkerTests implements tests for client worker.
func ClientWorkerTests(
t *testing.T,
tlsCertificate *tls.Certificate,
identity *identity.Identity,
beacon beacon.Backend,
timeSource epochtime.SetableBackend,
registry registry.Backend,
Expand All @@ -53,7 +53,7 @@ func ClientWorkerTests(

rt.MustRegister(t, registry)
// Initialize storage client
client, err := storageClient.New(ctx, tlsCertificate, schedulerBackend, registry)
client, err := storageClient.New(ctx, identity, schedulerBackend, registry)
require.NoError(err, "NewStorageClient")
err = client.(api.ClientBackend).WatchRuntime(rt.Runtime.ID)
require.NoError(err, "NewStorageClient")
Expand Down
13 changes: 9 additions & 4 deletions go/storage/client/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/oasislabs/ekiden/go/common/crypto/signature"
"github.com/oasislabs/ekiden/go/common/grpc/resolver/manual"
"github.com/oasislabs/ekiden/go/common/identity"
"github.com/oasislabs/ekiden/go/common/logging"
"github.com/oasislabs/ekiden/go/common/node"
"github.com/oasislabs/ekiden/go/grpc/storage"
Expand Down Expand Up @@ -66,7 +67,7 @@ type watcherState struct {

runtimeID signature.MapKey

tlsCertificate *tls.Certificate
identity *identity.Identity

registeredStorageNodes []*node.Node
scheduledNodes map[signature.MapKey]bool
Expand Down Expand Up @@ -154,6 +155,10 @@ func (w *watcherState) updateStorageNodeConnections() {

// Connect to nodes.
for _, node := range nodeList {
if w.identity != nil && node.ID.Equal(w.identity.NodeSigner.Public()) {
continue
}

var opts grpc.DialOption
if node.Committee.Certificate == nil {
// NOTE: This should only happen in tests, where nodes register without a certificate.
Expand All @@ -172,7 +177,7 @@ func (w *watcherState) updateStorageNodeConnections() {
certPool := x509.NewCertPool()
certPool.AddCert(nodeCert)
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{*w.tlsCertificate},
Certificates: []tls.Certificate{*w.identity.TLSCertificate},
RootCAs: certPool,
ServerName: "ekiden-node",
})
Expand Down Expand Up @@ -313,7 +318,7 @@ func (w *watcherState) watch(ctx context.Context) {
func newWatcher(
ctx context.Context,
runtimeID signature.PublicKey,
tlsCertificate *tls.Certificate,
identity *identity.Identity,
schedulerBackend scheduler.Backend,
registryBackend registry.Backend,
) storageWatcher {
Expand All @@ -323,7 +328,7 @@ func newWatcher(
initCh: make(chan struct{}),
logger: logger,
runtimeID: runtimeID.ToMapKey(),
tlsCertificate: tlsCertificate,
identity: identity,
scheduler: schedulerBackend,
registry: registryBackend,
registeredStorageNodes: []*node.Node{},
Expand Down
5 changes: 2 additions & 3 deletions go/storage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func New(
return nil, err
}
}
tlsCert := identity.TLSCertificate

backend := viper.GetString(cfgBackend)
applyLockLRUSlots := uint64(viper.GetInt(cfgLRUSlots))
Expand All @@ -62,10 +61,10 @@ func New(
dbDir := filepath.Join(dataDir, leveldb.DBFile)
impl, err = leveldb.New(dbDir, signer, applyLockLRUSlots, insecureSkipChecks)
case client.BackendName:
impl, err = client.New(ctx, tlsCert, schedulerBackend, registryBackend)
impl, err = client.New(ctx, identity, schedulerBackend, registryBackend)
case cachingclient.BackendName:
var remote api.Backend
remote, err = client.New(ctx, tlsCert, schedulerBackend, registryBackend)
remote, err = client.New(ctx, identity, schedulerBackend, registryBackend)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func NewNode(
node.ctx, node.ctxCancel = context.WithCancel(context.Background())

// Create a new storage client that will be used for remote sync.
scl, err := client.New(node.ctx, node.commonNode.Identity.TLSCertificate, node.commonNode.Scheduler, node.commonNode.Registry)
scl, err := client.New(node.ctx, node.commonNode.Identity, node.commonNode.Scheduler, node.commonNode.Registry)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 5a62dec

Please sign in to comment.