Skip to content

Commit

Permalink
Merge pull request #1899 from oasislabs/ptrus/feature/registry-watchn…
Browse files Browse the repository at this point in the history
…odelist-cleanup

go/registry: epochtime cleanup
  • Loading branch information
ptrus authored Jul 15, 2019
2 parents 5c82ac4 + 5fa72e9 commit 5bb535f
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 232 deletions.
3 changes: 1 addition & 2 deletions go/grpc/registry/entity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,5 @@ message WatchNodeListRequest {
}

message WatchNodeListResponse {
uint64 epoch = 1;
repeated common.Node node = 2;
repeated common.Node node = 1;
}
4 changes: 1 addition & 3 deletions go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/oasislabs/ekiden/go/common/logging"
"github.com/oasislabs/ekiden/go/common/node"
"github.com/oasislabs/ekiden/go/common/pubsub"
epochtime "github.com/oasislabs/ekiden/go/epochtime/api"
)

const (
Expand Down Expand Up @@ -155,7 +154,7 @@ type Backend interface {
// all runtimes will be sent immediately.
WatchRuntimes() (<-chan *Runtime, *pubsub.Subscription)

// Cleanup cleans up the regsitry backend.
// Cleanup cleans up the registry backend.
Cleanup()
}

Expand All @@ -182,7 +181,6 @@ type NodeEvent struct {

// NodeList is a per-epoch immutable node list.
type NodeList struct {
Epoch epochtime.EpochTime
Nodes []*node.Node
}

Expand Down
3 changes: 1 addition & 2 deletions go/registry/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ func (s *grpcServer) WatchNodeList(req *pb.WatchNodeListRequest, stream pb.Entit
nodes = append(nodes, n.ToProto())
}
resp := &pb.WatchNodeListResponse{
Epoch: uint64(nl.Epoch),
Node: nodes,
Node: nodes,
}
if err := stream.Send(resp); err != nil {
return err
Expand Down
197 changes: 19 additions & 178 deletions go/registry/tendermint/tendermint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"context"
"encoding/hex"
"sync"

"github.com/eapache/channels"
"github.com/pkg/errors"
Expand Down Expand Up @@ -35,25 +34,14 @@ var (
type tendermintBackend struct {
logger *logging.Logger

timeSource epochtime.Backend
service service.TendermintService
service service.TendermintService

cfg *api.Config

entityNotifier *pubsub.Broker
nodeNotifier *pubsub.Broker
nodeListNotifier *pubsub.Broker
runtimeNotifier *pubsub.Broker

cached struct {
sync.Mutex
nodeLists map[epochtime.EpochTime]*api.NodeList
runtimes map[epochtime.EpochTime][]*api.Runtime
}
lastEpoch epochtime.EpochTime

closeOnce sync.Once
closedWg sync.WaitGroup
}

func (r *tendermintBackend) RegisterEntity(ctx context.Context, sigEnt *entity.SignedEntity) error {
Expand Down Expand Up @@ -248,18 +236,10 @@ func (r *tendermintBackend) WatchRuntimes() (<-chan *api.Runtime, *pubsub.Subscr
}

func (r *tendermintBackend) GetNodeList(ctx context.Context, height int64) (*api.NodeList, error) {
epoch, err := r.timeSource.GetEpoch(ctx, height)
if err != nil {
return nil, err
}

return r.getNodeList(ctx, epoch)
return r.getNodeList(ctx, height)
}

func (r *tendermintBackend) Cleanup() {
r.closeOnce.Do(func() {
r.closedWg.Wait()
})
}

func (r *tendermintBackend) GetRuntimes(ctx context.Context, height int64) ([]*api.Runtime, error) {
Expand All @@ -276,9 +256,7 @@ func (r *tendermintBackend) GetRuntimes(ctx context.Context, height int64) ([]*a
return runtimes, nil
}

func (r *tendermintBackend) workerEvents(ctx context.Context) {
defer r.closedWg.Done()

func (r *tendermintBackend) worker(ctx context.Context) {
// Subscribe to transactions which modify state.
sub, err := r.service.Subscribe("registry-worker", app.QueryApp)
if err != nil {
Expand Down Expand Up @@ -316,7 +294,6 @@ func (r *tendermintBackend) workerEvents(ctx context.Context) {
func (r *tendermintBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes.EventDataNewBlock) {
events := ev.ResultBeginBlock.GetEvents()
events = append(events, ev.ResultEndBlock.GetEvents()...)

for _, tmEv := range events {
if tmEv.GetType() != tmapi.EventTypeEkiden {
continue
Expand Down Expand Up @@ -378,6 +355,16 @@ func (r *tendermintBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes.
Entity: ent,
IsRegistration: true,
})
} else if bytes.Equal(pair.GetKey(), app.TagRegistryNodeListEpoch) {
nl, err := r.getNodeList(ctx, ev.Block.Header.Height)
if err != nil {
r.logger.Error("worker: failed to get node list",
"height", ev.Block.Header.Height,
"err", err,
)
continue
}
r.nodeListNotifier.Broadcast(nl)
}
}
}
Expand Down Expand Up @@ -425,163 +412,23 @@ func (r *tendermintBackend) onEventDataTx(tx tmtypes.EventDataTx) {
}
}

func (r *tendermintBackend) workerPerEpochList(ctx context.Context) {
defer r.closedWg.Done()

epochEvents, sub := r.timeSource.WatchEpochs()
defer sub.Close()
for {
var newEpoch epochtime.EpochTime
var ok bool

select {
case newEpoch, ok = <-epochEvents:
if !ok {
r.logger.Debug("worker: terminating")
return
}
case <-ctx.Done():
return
}

r.logger.Debug("worker: epoch transition",
"prev_epoch", r.lastEpoch,
"epoch", newEpoch,
)

if newEpoch == r.lastEpoch {
continue
}

nl, err := r.getNodeList(ctx, newEpoch)
if err != nil {
r.logger.Error("worker: failed to generate node list for epoch",
"err", err,
"epoch", newEpoch,
)
continue
}

r.logger.Debug("worker: built node list",
"new_epoch", newEpoch,
"nodes_len", len(nl.Nodes),
)
r.nodeListNotifier.Broadcast(nl)

rl, err := r.getRuntimes(ctx, newEpoch)
if err != nil {
r.logger.Error("worker: failed to generate runtime list for epoch",
"err", err,
"epoch", newEpoch,
)
continue
}

r.logger.Debug("worker: built runtime list",
"new_epoch", newEpoch,
"runtimes_len", len(rl),
)

r.sweepCache(newEpoch)
r.lastEpoch = newEpoch
}
}

func (r *tendermintBackend) getNodeList(ctx context.Context, epoch epochtime.EpochTime) (*api.NodeList, error) {
r.cached.Lock()
defer r.cached.Unlock()

// Service the request from the cache if possible.
nl, ok := r.cached.nodeLists[epoch]
if ok {
return nl, nil
}

func (r *tendermintBackend) getNodeList(ctx context.Context, height int64) (*api.NodeList, error) {
// Generate the nodelist.
height, err := r.timeSource.GetEpochBlock(ctx, epoch)
if err != nil {
return nil, errors.Wrap(err, "registry: failed to query block height")
}

response, err := r.service.Query(app.QueryGetNodes, nil, height)
if err != nil {
return nil, errors.Wrap(err, "registry: failed to query nodes")
}

var nodes, tmp []*node.Node
if err := cbor.Unmarshal(response, &tmp); err != nil {
var nodes []*node.Node
if err := cbor.Unmarshal(response, &nodes); err != nil {
return nil, errors.Wrap(err, "registry: failed node deserialization")
}
for _, v := range tmp {
if epochtime.EpochTime(v.Expiration) < epoch {
continue
}
nodes = append(nodes, v)
}

api.SortNodeList(nodes)

nl = &api.NodeList{
Epoch: epoch,
return &api.NodeList{
Nodes: nodes,
}

r.cached.nodeLists[epoch] = nl

return nl, nil
}

func (r *tendermintBackend) getRuntimes(ctx context.Context, epoch epochtime.EpochTime) ([]*api.Runtime, error) {
r.cached.Lock()
defer r.cached.Unlock()

// Service the request from the cache if possible.
rl, ok := r.cached.runtimes[epoch]
if ok {
return rl, nil
}

// Generate the runtime list.
height, err := r.timeSource.GetEpochBlock(ctx, epoch)
if err != nil {
return nil, errors.Wrap(err, "registry: failed to query block height")
}

response, err := r.service.Query(app.QueryGetRuntimes, nil, height)
if err != nil {
return nil, errors.Wrap(err, "registry: failed to query runtimes")
}

var runtimes []*api.Runtime
if err := cbor.Unmarshal(response, &runtimes); err != nil {
return nil, errors.Wrap(err, "registry: get runtimes malformed response")
}

r.cached.runtimes[epoch] = runtimes

return runtimes, nil
}

func (r *tendermintBackend) sweepCache(epoch epochtime.EpochTime) {
const nrKept = 3

if epoch < nrKept {
return
}

r.cached.Lock()
defer r.cached.Unlock()

for k := range r.cached.nodeLists {
if k < epoch-nrKept {
delete(r.cached.nodeLists, k)
}
}
for k := range r.cached.runtimes {
if k < epoch-nrKept {
delete(r.cached.runtimes, k)
}
}
}, nil
}

// New constructs a new tendermint backed registry Backend instance.
Expand All @@ -594,16 +441,12 @@ func New(ctx context.Context, timeSource epochtime.Backend, service service.Tend

r := &tendermintBackend{
logger: logging.GetLogger("registry/tendermint"),
timeSource: timeSource,
service: service,
cfg: cfg,
entityNotifier: pubsub.NewBroker(false),
nodeNotifier: pubsub.NewBroker(false),
nodeListNotifier: pubsub.NewBroker(true),
lastEpoch: epochtime.EpochInvalid,
}
r.cached.nodeLists = make(map[epochtime.EpochTime]*api.NodeList)
r.cached.runtimes = make(map[epochtime.EpochTime][]*api.Runtime)
r.runtimeNotifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
wr := ch.In()
runtimes, err := r.GetRuntimes(ctx, 0)
Expand All @@ -619,9 +462,7 @@ func New(ctx context.Context, timeSource epochtime.Backend, service service.Tend
}
})

r.closedWg.Add(2)
go r.workerEvents(ctx)
go r.workerPerEpochList(ctx)
go r.worker(ctx)

return r, nil
}
Loading

0 comments on commit 5bb535f

Please sign in to comment.