From 5fa72e98f11dd9ab17b9bbcf5f6b240d4de60cbd Mon Sep 17 00:00:00 2001 From: ptrus Date: Mon, 8 Jul 2019 13:36:45 +0200 Subject: [PATCH] go/registry: decouple from 'global epoch' --- go/grpc/registry/entity.proto | 3 +- go/registry/api/api.go | 4 +- go/registry/grpc.go | 3 +- go/registry/tendermint/tendermint.go | 197 +++--------------------- go/registry/tests/tester.go | 47 +----- go/storage/client/client.go | 7 +- go/tendermint/apps/registry/api.go | 3 + go/tendermint/apps/registry/registry.go | 20 ++- 8 files changed, 52 insertions(+), 232 deletions(-) diff --git a/go/grpc/registry/entity.proto b/go/grpc/registry/entity.proto index 92748796efe..d442dd9786e 100644 --- a/go/grpc/registry/entity.proto +++ b/go/grpc/registry/entity.proto @@ -116,6 +116,5 @@ message WatchNodeListRequest { } message WatchNodeListResponse { - uint64 epoch = 1; - repeated common.Node node = 2; + repeated common.Node node = 1; } diff --git a/go/registry/api/api.go b/go/registry/api/api.go index cd16cf72ae6..c41abfd121c 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -14,7 +14,6 @@ import ( "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" "github.com/oasislabs/ekiden/go/common/pubsub" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" ) const ( @@ -155,7 +154,7 @@ type Backend interface { // all runtimes will be sent immediately. WatchRuntimes() (<-chan *Runtime, *pubsub.Subscription) - // Cleanup cleans up the regsitry backend. + // Cleanup cleans up the registry backend. Cleanup() } @@ -182,7 +181,6 @@ type NodeEvent struct { // NodeList is a per-epoch immutable node list. type NodeList struct { - Epoch epochtime.EpochTime Nodes []*node.Node } diff --git a/go/registry/grpc.go b/go/registry/grpc.go index 2e7e1917173..cbc8f8cc36f 100644 --- a/go/registry/grpc.go +++ b/go/registry/grpc.go @@ -253,8 +253,7 @@ func (s *grpcServer) WatchNodeList(req *pb.WatchNodeListRequest, stream pb.Entit nodes = append(nodes, n.ToProto()) } resp := &pb.WatchNodeListResponse{ - Epoch: uint64(nl.Epoch), - Node: nodes, + Node: nodes, } if err := stream.Send(resp); err != nil { return err diff --git a/go/registry/tendermint/tendermint.go b/go/registry/tendermint/tendermint.go index e05be42819f..e285c5e43f6 100644 --- a/go/registry/tendermint/tendermint.go +++ b/go/registry/tendermint/tendermint.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "encoding/hex" - "sync" "github.com/eapache/channels" "github.com/pkg/errors" @@ -35,8 +34,7 @@ var ( type tendermintBackend struct { logger *logging.Logger - timeSource epochtime.Backend - service service.TendermintService + service service.TendermintService cfg *api.Config @@ -44,16 +42,6 @@ type tendermintBackend struct { nodeNotifier *pubsub.Broker nodeListNotifier *pubsub.Broker runtimeNotifier *pubsub.Broker - - cached struct { - sync.Mutex - nodeLists map[epochtime.EpochTime]*api.NodeList - runtimes map[epochtime.EpochTime][]*api.Runtime - } - lastEpoch epochtime.EpochTime - - closeOnce sync.Once - closedWg sync.WaitGroup } func (r *tendermintBackend) RegisterEntity(ctx context.Context, sigEnt *entity.SignedEntity) error { @@ -248,18 +236,10 @@ func (r *tendermintBackend) WatchRuntimes() (<-chan *api.Runtime, *pubsub.Subscr } func (r *tendermintBackend) GetNodeList(ctx context.Context, height int64) (*api.NodeList, error) { - epoch, err := r.timeSource.GetEpoch(ctx, height) - if err != nil { - return nil, err - } - - return r.getNodeList(ctx, epoch) + return r.getNodeList(ctx, height) } func (r *tendermintBackend) Cleanup() { - r.closeOnce.Do(func() { - r.closedWg.Wait() - }) } func (r *tendermintBackend) GetRuntimes(ctx context.Context, height int64) ([]*api.Runtime, error) { @@ -276,9 +256,7 @@ func (r *tendermintBackend) GetRuntimes(ctx context.Context, height int64) ([]*a return runtimes, nil } -func (r *tendermintBackend) workerEvents(ctx context.Context) { - defer r.closedWg.Done() - +func (r *tendermintBackend) worker(ctx context.Context) { // Subscribe to transactions which modify state. sub, err := r.service.Subscribe("registry-worker", app.QueryApp) if err != nil { @@ -316,7 +294,6 @@ func (r *tendermintBackend) workerEvents(ctx context.Context) { func (r *tendermintBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes.EventDataNewBlock) { events := ev.ResultBeginBlock.GetEvents() events = append(events, ev.ResultEndBlock.GetEvents()...) - for _, tmEv := range events { if tmEv.GetType() != tmapi.EventTypeEkiden { continue @@ -378,6 +355,16 @@ func (r *tendermintBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes. Entity: ent, IsRegistration: true, }) + } else if bytes.Equal(pair.GetKey(), app.TagRegistryNodeListEpoch) { + nl, err := r.getNodeList(ctx, ev.Block.Header.Height) + if err != nil { + r.logger.Error("worker: failed to get node list", + "height", ev.Block.Header.Height, + "err", err, + ) + continue + } + r.nodeListNotifier.Broadcast(nl) } } } @@ -425,163 +412,23 @@ func (r *tendermintBackend) onEventDataTx(tx tmtypes.EventDataTx) { } } -func (r *tendermintBackend) workerPerEpochList(ctx context.Context) { - defer r.closedWg.Done() - - epochEvents, sub := r.timeSource.WatchEpochs() - defer sub.Close() - for { - var newEpoch epochtime.EpochTime - var ok bool - - select { - case newEpoch, ok = <-epochEvents: - if !ok { - r.logger.Debug("worker: terminating") - return - } - case <-ctx.Done(): - return - } - - r.logger.Debug("worker: epoch transition", - "prev_epoch", r.lastEpoch, - "epoch", newEpoch, - ) - - if newEpoch == r.lastEpoch { - continue - } - - nl, err := r.getNodeList(ctx, newEpoch) - if err != nil { - r.logger.Error("worker: failed to generate node list for epoch", - "err", err, - "epoch", newEpoch, - ) - continue - } - - r.logger.Debug("worker: built node list", - "new_epoch", newEpoch, - "nodes_len", len(nl.Nodes), - ) - r.nodeListNotifier.Broadcast(nl) - - rl, err := r.getRuntimes(ctx, newEpoch) - if err != nil { - r.logger.Error("worker: failed to generate runtime list for epoch", - "err", err, - "epoch", newEpoch, - ) - continue - } - - r.logger.Debug("worker: built runtime list", - "new_epoch", newEpoch, - "runtimes_len", len(rl), - ) - - r.sweepCache(newEpoch) - r.lastEpoch = newEpoch - } -} - -func (r *tendermintBackend) getNodeList(ctx context.Context, epoch epochtime.EpochTime) (*api.NodeList, error) { - r.cached.Lock() - defer r.cached.Unlock() - - // Service the request from the cache if possible. - nl, ok := r.cached.nodeLists[epoch] - if ok { - return nl, nil - } - +func (r *tendermintBackend) getNodeList(ctx context.Context, height int64) (*api.NodeList, error) { // Generate the nodelist. - height, err := r.timeSource.GetEpochBlock(ctx, epoch) - if err != nil { - return nil, errors.Wrap(err, "registry: failed to query block height") - } - response, err := r.service.Query(app.QueryGetNodes, nil, height) if err != nil { return nil, errors.Wrap(err, "registry: failed to query nodes") } - var nodes, tmp []*node.Node - if err := cbor.Unmarshal(response, &tmp); err != nil { + var nodes []*node.Node + if err := cbor.Unmarshal(response, &nodes); err != nil { return nil, errors.Wrap(err, "registry: failed node deserialization") } - for _, v := range tmp { - if epochtime.EpochTime(v.Expiration) < epoch { - continue - } - nodes = append(nodes, v) - } api.SortNodeList(nodes) - nl = &api.NodeList{ - Epoch: epoch, + return &api.NodeList{ Nodes: nodes, - } - - r.cached.nodeLists[epoch] = nl - - return nl, nil -} - -func (r *tendermintBackend) getRuntimes(ctx context.Context, epoch epochtime.EpochTime) ([]*api.Runtime, error) { - r.cached.Lock() - defer r.cached.Unlock() - - // Service the request from the cache if possible. - rl, ok := r.cached.runtimes[epoch] - if ok { - return rl, nil - } - - // Generate the runtime list. - height, err := r.timeSource.GetEpochBlock(ctx, epoch) - if err != nil { - return nil, errors.Wrap(err, "registry: failed to query block height") - } - - response, err := r.service.Query(app.QueryGetRuntimes, nil, height) - if err != nil { - return nil, errors.Wrap(err, "registry: failed to query runtimes") - } - - var runtimes []*api.Runtime - if err := cbor.Unmarshal(response, &runtimes); err != nil { - return nil, errors.Wrap(err, "registry: get runtimes malformed response") - } - - r.cached.runtimes[epoch] = runtimes - - return runtimes, nil -} - -func (r *tendermintBackend) sweepCache(epoch epochtime.EpochTime) { - const nrKept = 3 - - if epoch < nrKept { - return - } - - r.cached.Lock() - defer r.cached.Unlock() - - for k := range r.cached.nodeLists { - if k < epoch-nrKept { - delete(r.cached.nodeLists, k) - } - } - for k := range r.cached.runtimes { - if k < epoch-nrKept { - delete(r.cached.runtimes, k) - } - } + }, nil } // New constructs a new tendermint backed registry Backend instance. @@ -594,16 +441,12 @@ func New(ctx context.Context, timeSource epochtime.Backend, service service.Tend r := &tendermintBackend{ logger: logging.GetLogger("registry/tendermint"), - timeSource: timeSource, service: service, cfg: cfg, entityNotifier: pubsub.NewBroker(false), nodeNotifier: pubsub.NewBroker(false), nodeListNotifier: pubsub.NewBroker(true), - lastEpoch: epochtime.EpochInvalid, } - r.cached.nodeLists = make(map[epochtime.EpochTime]*api.NodeList) - r.cached.runtimes = make(map[epochtime.EpochTime][]*api.Runtime) r.runtimeNotifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { wr := ch.In() runtimes, err := r.GetRuntimes(ctx, 0) @@ -619,9 +462,7 @@ func New(ctx context.Context, timeSource epochtime.Backend, service service.Tend } }) - r.closedWg.Add(2) - go r.workerEvents(ctx) - go r.workerPerEpochList(ctx) + go r.worker(ctx) return r, nil } diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index 282fdbde494..ad9b876ad90 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -27,10 +27,10 @@ const recvTimeout = 1 * time.Second // // WARNING: This assumes that the registry is empty, and will leave // a Runtime registered. -func RegistryImplementationTests(t *testing.T, backend api.Backend, epochtime epochtime.SetableBackend) { +func RegistryImplementationTests(t *testing.T, backend api.Backend, timeSource epochtime.SetableBackend) { EnsureRegistryEmpty(t, backend) - testRegistryEntityNodes(t, backend, epochtime) + testRegistryEntityNodes(t, backend, timeSource) // Runtime registry tests are after the entity/node tests to avoid // interacting with the scheduler as much as possible. @@ -155,27 +155,11 @@ func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource epoch require := require.New(t) expectedNodeList := getExpectedNodeList() - ch, sub := backend.WatchNodeList() - defer sub.Close() - epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) - recvLoop: - for { - select { - case ev := <-ch: - // Skip the old node list. - if ev.Epoch < epoch { - continue - } - - require.Equal(epoch, ev.Epoch, "node list epoch") - require.EqualValues(expectedNodeList, ev.Nodes, "node list") - break recvLoop - case <-time.After(recvTimeout): - t.Fatalf("failed to recevive node list event") - } - } + registeredNodes, nerr := backend.GetNodes(context.Background()) + require.NoError(nerr, "GetNodes") + require.EqualValues(expectedNodeList, registeredNodes, "node list") }) t.Run("NodeExpiration", func(t *testing.T) { @@ -186,9 +170,6 @@ func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource epoch expectedDeregEvents := len(nodes[0]) deregisteredNodes := make(map[signature.MapKey]*node.Node) - ch, sub := backend.WatchNodeList() - defer sub.Close() - epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) for i := 0; i < expectedDeregEvents; i++ { @@ -216,21 +197,9 @@ func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource epoch // Ensure the node list doesn't have the expired nodes. expectedNodeList := getExpectedNodeList() - recvLoop: - for { - select { - case ev := <-ch: - if ev.Epoch < epoch { - continue - } - - require.Equal(epoch, ev.Epoch, "node list epoch") - require.EqualValues(expectedNodeList, ev.Nodes, "node list") - break recvLoop - case <-time.After(recvTimeout): - t.Fatalf("failed to recevive node list event") - } - } + registeredNodes, nerr := backend.GetNodes(context.Background()) + require.NoError(nerr, "GetNodes") + require.EqualValues(expectedNodeList, registeredNodes, "node list") // Ensure that registering an expired node will fail. err := backend.RegisterNode(context.Background(), expiredNode.SignedRegistration) diff --git a/go/storage/client/client.go b/go/storage/client/client.go index 2f7f6e55ad4..fa5b390860d 100644 --- a/go/storage/client/client.go +++ b/go/storage/client/client.go @@ -660,13 +660,18 @@ func (b *storageClientBackend) watcher(ctx context.Context) { if ev == nil { continue } - b.logger.Debug("got new storage node list") + b.logger.Debug("got new storage node list", ev.Nodes) if err := b.state.updateStorageNodeList(ctx, ev.Nodes); err != nil { b.logger.Error("worker: failed to update storage list", "err", err, ) continue } + // Update storage node connection. + b.updateNodeConnections() + + b.logger.Debug("updated connections to nodes") + case committee := <-schedCh: b.logger.Debug("worker: scheduler committee for epoch", "committee", committee, diff --git a/go/tendermint/apps/registry/api.go b/go/tendermint/apps/registry/api.go index e1c1ed2e9de..bae2ef0d818 100644 --- a/go/tendermint/apps/registry/api.go +++ b/go/tendermint/apps/registry/api.go @@ -31,6 +31,9 @@ var ( // descriptors). TagNodesExpired = []byte("registry.nodes.expired") + // TagRegistryNodeListEpoch is an ABCI tag for registry epochs. + TagRegistryNodeListEpoch = []byte("registry.nodes.epoch") + // QueryApp is a query for filtering events processed by // the registry application. QueryApp = api.QueryForEvent([]byte(AppName), api.TagAppNameValue) diff --git a/go/tendermint/apps/registry/registry.go b/go/tendermint/apps/registry/registry.go index c2ce7c66b4b..f66db89e4bc 100644 --- a/go/tendermint/apps/registry/registry.go +++ b/go/tendermint/apps/registry/registry.go @@ -176,8 +176,9 @@ func (app *registryApplication) InitChain(ctx *abci.Context, request types.Reque } func (app *registryApplication) BeginBlock(ctx *abci.Context, request types.RequestBeginBlock) error { - if changed, epoch := app.state.EpochChanged(app.timeSource); changed { - return app.onEpochChange(ctx, epoch) + // XXX: With PR#1889 this can be a differnet interval. + if changed, registryEpoch := app.state.EpochChanged(app.timeSource); changed { + return app.onRegistryEpochChanged(ctx, registryEpoch) } return nil } @@ -205,32 +206,37 @@ func (app *registryApplication) EndBlock(request types.RequestEndBlock) (types.R func (app *registryApplication) FireTimer(*abci.Context, *abci.Timer) { } -func (app *registryApplication) onEpochChange(ctx *abci.Context, epoch epochtime.EpochTime) error { +func (app *registryApplication) onRegistryEpochChanged(ctx *abci.Context, registryEpoch epochtime.EpochTime) error { state := NewMutableState(app.state.DeliverTxTree()) nodes, err := state.GetNodes() if err != nil { - app.logger.Error("onEpochChange: failed to get nodes", + app.logger.Error("onRegistryEpochChanged: failed to get nodes", "err", err, ) - return errors.Wrap(err, "registry: onEpochChange: failed to get nodes") + return errors.Wrap(err, "registry: onRegistryEpochChanged: failed to get nodes") } var expiredNodes []*node.Node for _, node := range nodes { - if epochtime.EpochTime(node.Expiration) >= epoch { + if epochtime.EpochTime(node.Expiration) >= registryEpoch { continue } expiredNodes = append(expiredNodes, node) state.removeNode(node) } + + // Emit the TagRegistryNodeListEpoch notification. + ctx.EmitTag([]byte(app.Name()), api.TagAppNameValue) + // Dummy value, should be ignored. + ctx.EmitTag(TagRegistryNodeListEpoch, []byte("1")) + if len(expiredNodes) == 0 { return nil } // Iff any nodes have expired, force-emit the application tag so // the change is picked up. - ctx.EmitTag([]byte(app.Name()), api.TagAppNameValue) ctx.EmitTag(TagNodesExpired, cbor.Marshal(expiredNodes)) return nil