Skip to content

Commit

Permalink
go/epochtime: add WatchLatestEpoch method
Browse files Browse the repository at this point in the history
Pubsub framework is extended to support subscriptions based on bounded
ring channels. Bounded subscriptions are used in the new epochtime
WatchLatestEpoch method. The method is similar to the existing WatchEpochs
method, with the change that unread epochs get overridden with latest
epoch.

Registration worker is changed to use the WatchLatestEpoch method to
prevent trying to register for old epochs in case the worker falls
behind.
  • Loading branch information
ptrus committed Apr 29, 2020
1 parent 8642c9a commit 39a1654
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 41 deletions.
5 changes: 5 additions & 0 deletions .changelog/2876.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
worker/registration: Use WatchLatestEpoch when watching for registrations

By using WatchLatestEpoch the worker will always try to register for latest
known epoch, which should prevent cases where registration worker fell behind
and was trying to register for past epochs.
1 change: 1 addition & 0 deletions .changelog/2876.internal.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/common/pubsub: support subscriptions based on bounded ring channels
4 changes: 4 additions & 0 deletions .changelog/2876.internal.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/epochtime: add WatchLatestEpoch method

The method is similar to the existing WatchEpochs method, with the change that
unread epochs get overridden with latest epoch.
41 changes: 28 additions & 13 deletions go/common/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type broadcastedValue struct {
}

type cmdCtx struct {
ch *channels.InfiniteChannel
ch channels.Channel
errCh chan error
onSubscribeHook OnSubscribeHook

Expand Down Expand Up @@ -46,7 +46,7 @@ func NewContextSubscription(ctx context.Context) (context.Context, ClosableSubsc
// Subscription is a Broker subscription instance.
type Subscription struct {
b *Broker
ch *channels.InfiniteChannel
ch channels.Channel
}

// Untyped returns the subscription's untyped output. Effort should be
Expand Down Expand Up @@ -77,37 +77,52 @@ func (s *Subscription) Close() {

// Broker is a pub/sub broker instance.
type Broker struct {
subscribers map[*channels.InfiniteChannel]bool
subscribers map[channels.Channel]bool
cmdCh chan *cmdCtx
broadcastCh *channels.InfiniteChannel
broadcastCh channels.Channel
lastBroadcasted *broadcastedValue

onSubscribeHook OnSubscribeHook
}

// OnSubscribeHook is the on-subscribe callback hook prototype.
type OnSubscribeHook func(*channels.InfiniteChannel)
type OnSubscribeHook func(channels.Channel)

// Subscribe subscribes to the Broker's broadcasts, and returns a
// subscription handle that can be used to receive broadcasts.
//
// Note: The returned subscription's channel will have an unbounded
// capacity.
// capacity, use SubscribeBuff to use a bounded ring channel.
func (b *Broker) Subscribe() *Subscription {
return b.SubscribeEx(nil)
return b.SubscribeEx(int64(channels.Infinity), nil)
}

// SubscribeBuff subscribes to the Broker's broadcasts, and returns a
// subscription handle that can be used to receive broadcasts.
//
// Buffer controlls the capacity of a ring buffer - when buffer is full the
// oldest value will be discarded. In case buffer is negative (or zero) an
// unbounded channel is used.
func (b *Broker) SubscribeBuff(buffer int64) *Subscription {
return b.SubscribeEx(buffer, nil)
}

// SubscribeEx subscribes to the Broker's broadcasts, and returns a
// subscription handle that can be used to receive broadcasts. In
// addition it also takes a per-subscription on-subscribe callback
// hook.
//
// Note: The returned subscription's channel will have an unbounded
// capacity. If there is a Broker wide hook set, it will be called
// Note: If there is a Broker wide hook set, it will be called
// after the per-subscription hook is called.
func (b *Broker) SubscribeEx(onSubscribeHook OnSubscribeHook) *Subscription {
func (b *Broker) SubscribeEx(buffer int64, onSubscribeHook OnSubscribeHook) *Subscription {
var ch channels.Channel
if buffer <= 0 {
ch = channels.NewInfiniteChannel()
} else {
ch = channels.NewRingChannel(channels.BufferCap(buffer))
}
ctx := &cmdCtx{
ch: channels.NewInfiniteChannel(),
ch: ch,
errCh: make(chan error),
onSubscribeHook: onSubscribeHook,
isSubscribe: true,
Expand Down Expand Up @@ -167,7 +182,7 @@ func (b *Broker) worker() {
func NewBroker(pubLastOnSubscribe bool) *Broker {
b := newBroker()
if pubLastOnSubscribe {
b.onSubscribeHook = func(ch *channels.InfiniteChannel) {
b.onSubscribeHook = func(ch channels.Channel) {
if b.lastBroadcasted != nil {
ch.In() <- b.lastBroadcasted.v
}
Expand All @@ -192,7 +207,7 @@ func NewBrokerEx(onSubscribeHook OnSubscribeHook) *Broker {

func newBroker() *Broker {
return &Broker{
subscribers: make(map[*channels.InfiniteChannel]bool),
subscribers: make(map[channels.Channel]bool),
cmdCh: make(chan *cmdCtx),
broadcastCh: channels.NewInfiniteChannel(),
}
Expand Down
107 changes: 87 additions & 20 deletions go/common/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@ import (
"github.com/stretchr/testify/require"
)

const recvTimeout = 5 * time.Second
const (
recvTimeout = 5 * time.Second
bufferSize = 5
)

func TestPubSub(t *testing.T) {
t.Run("Basic", testBasic)
t.Run("BasicInfinity", testBasicInfinity)
t.Run("BasicOverwriting", testBasicOverwriting)
t.Run("PubLastOnSubscribe", testLastOnSubscribe)
t.Run("SubscribeEx", testSubscribeEx)
t.Run("NewBrokerEx", testNewBrokerEx)
}

func testBasic(t *testing.T) {
func testBasicInfinity(t *testing.T) {
broker := NewBroker(false)

sub := broker.Subscribe()
Expand Down Expand Up @@ -50,41 +54,104 @@ func testBasic(t *testing.T) {
require.Len(t, broker.subscribers, 0, "Subscriber map, post Close()")
}

func testLastOnSubscribe(t *testing.T) {
broker := NewBroker(true)
broker.Broadcast(23)
func testBasicOverwriting(t *testing.T) {
broker := NewBroker(false)

sub := broker.Subscribe()
sub := broker.SubscribeBuff(bufferSize)
typedCh := make(chan int)
sub.Unwrap(typedCh)

// Test a single broadcast/receive.
broker.Broadcast(23)
select {
case v := <-typedCh:
require.Equal(t, 23, v, "Last Broadcast()")
require.Equal(t, 23, v, "Single Broadcast())")
case <-time.After(recvTimeout):
t.Fatalf("Failed to receive value, last Broadcast() on Subscribe()")
t.Fatalf("Failed to receive value, initial Broadcast()")
}

// Test the buffered nature of the overwriting channel.
for i := 0; i < bufferSize+10; i++ {
broker.Broadcast(i)
}
// Ensure we don't start reading before all messages are processed by the
// underlying channel.
time.Sleep(100 * time.Millisecond)

// RingChannel prefers to write before buffering the items, so the first
// element will be instantly send to the output channel and removed from the
// buffer so it will not get overwritten.
expected := []int{
0,
}
for i := 10; i < bufferSize+10; i++ {
expected = append(expected, i)
}
for _, i := range expected {
select {
case v := <-typedCh:
require.Equal(t, i, v, "Buffered Broadcast()")
case <-time.After(recvTimeout):
t.Fatalf("Failed to receive value, buffered Broadcast()")
}
}

require.NotPanics(t, func() { sub.Close() }, "Close()")
require.Len(t, broker.subscribers, 0, "Subscriber map, post Close()")
}

func testLastOnSubscribe(t *testing.T) {
broker := NewBroker(true)
broker.Broadcast(23)

for _, b := range []int64{
int64(channels.Infinity),
bufferSize,
} {
sub := broker.SubscribeBuff(b)
typedCh := make(chan int)
sub.Unwrap(typedCh)

select {
case v := <-typedCh:
require.Equal(t, 23, v, "Last Broadcast()")
case <-time.After(recvTimeout):
t.Fatalf("Failed to receive value, last Broadcast() on Subscribe()")
}
}
}

func testSubscribeEx(t *testing.T) {
broker := NewBroker(false)

var callbackCh *channels.InfiniteChannel
sub := broker.SubscribeEx(func(ch *channels.InfiniteChannel) {
var callbackCh channels.Channel
callback := func(ch channels.Channel) {
callbackCh = ch
})
}

for _, b := range []int64{
int64(channels.Infinity),
bufferSize,
} {
sub := broker.SubscribeEx(b, callback)

require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
}

require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
}

func testNewBrokerEx(t *testing.T) {
var callbackCh *channels.InfiniteChannel
broker := NewBrokerEx(func(ch *channels.InfiniteChannel) {
var callbackCh channels.Channel
broker := NewBrokerEx(func(ch channels.Channel) {
callbackCh = ch
})

sub := broker.Subscribe()
require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
for _, b := range []int64{
int64(channels.Infinity),
bufferSize,
} {
sub := broker.SubscribeBuff(b)
require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
}
}
10 changes: 9 additions & 1 deletion go/consensus/tendermint/epochtime/epochtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func (t *tendermintBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscri
return typedCh, sub
}

func (t *tendermintBackend) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) {
typedCh := make(chan api.EpochTime)
sub := t.notifier.SubscribeBuff(1)
sub.Unwrap(typedCh)

return typedCh, sub
}

func (t *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
now, err := t.GetEpoch(ctx, height)
if err != nil {
Expand Down Expand Up @@ -125,7 +133,7 @@ func New(ctx context.Context, service service.TendermintService, interval int64)
base: base,
epoch: base,
}
r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
r.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
r.RLock()
defer r.RUnlock()

Expand Down
10 changes: 9 additions & 1 deletion go/consensus/tendermint/epochtime_mock/epochtime_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ func (t *tendermintMockBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Sub
return typedCh, sub
}

func (t *tendermintMockBackend) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) {
typedCh := make(chan api.EpochTime)
sub := t.notifier.SubscribeBuff(1)
sub.Unwrap(typedCh)

return typedCh, sub
}

func (t *tendermintMockBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
now, err := t.GetEpoch(ctx, height)
if err != nil {
Expand Down Expand Up @@ -232,7 +240,7 @@ func New(ctx context.Context, service service.TendermintService) (api.SetableBac
service: service,
querier: a.QueryFactory().(*app.QueryFactory),
}
r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
r.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
r.RLock()
defer r.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/keymanager/keymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
service: service,
querier: a.QueryFactory().(*app.QueryFactory),
}
tb.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
tb.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
statuses, err := tb.GetStatuses(ctx, consensus.HeightLatest)
if err != nil {
tb.logger.Error("status notifier: unable to get a list of statuses",
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
nodeNotifier: pubsub.NewBroker(false),
nodeListNotifier: pubsub.NewBroker(true),
}
tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
wr := ch.In()
runtimes, err := tb.GetRuntimes(ctx, consensus.HeightLatest)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (tb *tendermintBackend) getLatestBlockAt(ctx context.Context, id common.Nam
func (tb *tendermintBackend) WatchBlocks(id common.Namespace) (<-chan *api.AnnotatedBlock, *pubsub.Subscription, error) {
notifiers := tb.getRuntimeNotifiers(id)

sub := notifiers.blockNotifier.SubscribeEx(func(ch *channels.InfiniteChannel) {
sub := notifiers.blockNotifier.SubscribeEx(-1, func(ch channels.Channel) {
// Replay the latest block if it exists.
notifiers.Lock()
defer notifiers.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
service: service,
querier: a.QueryFactory().(*app.QueryFactory),
}
tb.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
tb.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
currentCommittees, err := tb.getCurrentCommittees()
if err != nil {
tb.logger.Error("couldn't get current committees. won't send them. good luck to the subscriber",
Expand Down
7 changes: 7 additions & 0 deletions go/epochtime/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type Backend interface {
// Upon subscription the current epoch is sent immediately.
WatchEpochs() (<-chan EpochTime, *pubsub.Subscription)

// WatchLatestEpoch returns a channel that produces a stream of messages on
// epoch transitions. If an epoch transition hapens before previous epoch
// is read from channel, the old epochs is overwritten.
//
// Upon subscription the current epoch is sent immediately.
WatchLatestEpoch() (<-chan EpochTime, *pubsub.Subscription)

// StateToGenesis returns the genesis state at the specified block height.
StateToGenesis(ctx context.Context, height int64) (*Genesis, error)
}
Expand Down
16 changes: 16 additions & 0 deletions go/epochtime/tests/mock_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func EpochtimeSetableImplementationTest(t *testing.T, backend api.Backend) {
t.Fatalf("failed to receive current epoch on WatchEpochs")
}

latestCh, subCh := timeSource.WatchLatestEpoch()
defer subCh.Close()
select {
case e = <-latestCh:
require.Equal(epoch, e, "WatchLatestEpoch initial")
case <-time.After(recvTimeout):
t.Fatalf("failed to receive current epoch on WatchLatestEpoch")
}

epoch++
err = timeSource.SetEpoch(context.Background(), epoch)
require.NoError(err, "SetEpoch")
Expand All @@ -48,6 +57,13 @@ func EpochtimeSetableImplementationTest(t *testing.T, backend api.Backend) {
t.Fatalf("failed to receive epoch notification after transition")
}

select {
case e = <-latestCh:
require.Equal(epoch, e, "WatchLatestEpoch after set")
case <-time.After(recvTimeout):
t.Fatalf("failed to receive latest epoch after transition")
}

e, err = timeSource.GetEpoch(context.Background(), consensus.HeightLatest)
require.NoError(err, "GetEpoch after set")
require.Equal(epoch, e, "GetEpoch after set, epoch")
Expand Down
4 changes: 4 additions & 0 deletions go/oasis-node/cmd/debug/consim/timesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (b *simTimeSource) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscriptio
panic("consim/epochtime: WatchEpochs not supported")
}

func (b *simTimeSource) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) {
panic("consim/epochtime: WatchLatestEpoch not supported")
}

func (b *simTimeSource) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
// WARNING: This ignores the height because it's only used for the final
// dump.
Expand Down
Loading

0 comments on commit 39a1654

Please sign in to comment.