Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/epochtime: add WatchLatestEpoch method #2876

Merged
merged 1 commit into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/2876.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
worker/registration: use WatchLatestEpoch when watching for registrations

By using WatchLatestEpoch the worker will always try to register for latest
known epoch, which should prevent cases where registration worker fell behind
and was trying to register for past epochs.
1 change: 1 addition & 0 deletions .changelog/2876.internal.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/common/pubsub: support subscriptions based on bounded ring channels
4 changes: 4 additions & 0 deletions .changelog/2876.internal.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/epochtime: add WatchLatestEpoch method

The method is similar to the existing WatchEpochs method, with the change that
unread epochs get overridden with latest epoch.
41 changes: 28 additions & 13 deletions go/common/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type broadcastedValue struct {
}

type cmdCtx struct {
ch *channels.InfiniteChannel
ch channels.Channel
errCh chan error
onSubscribeHook OnSubscribeHook

Expand Down Expand Up @@ -46,7 +46,7 @@ func NewContextSubscription(ctx context.Context) (context.Context, ClosableSubsc
// Subscription is a Broker subscription instance.
type Subscription struct {
b *Broker
ch *channels.InfiniteChannel
ch channels.Channel
}

// Untyped returns the subscription's untyped output. Effort should be
Expand Down Expand Up @@ -77,37 +77,52 @@ func (s *Subscription) Close() {

// Broker is a pub/sub broker instance.
type Broker struct {
subscribers map[*channels.InfiniteChannel]bool
subscribers map[channels.Channel]bool
cmdCh chan *cmdCtx
broadcastCh *channels.InfiniteChannel
broadcastCh channels.Channel
lastBroadcasted *broadcastedValue

onSubscribeHook OnSubscribeHook
}

// OnSubscribeHook is the on-subscribe callback hook prototype.
type OnSubscribeHook func(*channels.InfiniteChannel)
type OnSubscribeHook func(channels.Channel)

// Subscribe subscribes to the Broker's broadcasts, and returns a
// subscription handle that can be used to receive broadcasts.
//
// Note: The returned subscription's channel will have an unbounded
// capacity.
// capacity, use SubscribeBuffered to use a bounded ring channel.
func (b *Broker) Subscribe() *Subscription {
return b.SubscribeEx(nil)
return b.SubscribeEx(int64(channels.Infinity), nil)
}

// SubscribeBuffered subscribes to the Broker's broadcasts, and returns a
// subscription handle that can be used to receive broadcasts.
//
// Buffer controlls the capacity of a ring buffer - when buffer is full the
// oldest value will be discarded. In case buffer is negative (or zero) an
// unbounded channel is used.
func (b *Broker) SubscribeBuffered(buffer int64) *Subscription {
return b.SubscribeEx(buffer, nil)
}

// SubscribeEx subscribes to the Broker's broadcasts, and returns a
// subscription handle that can be used to receive broadcasts. In
// addition it also takes a per-subscription on-subscribe callback
// hook.
//
// Note: The returned subscription's channel will have an unbounded
// capacity. If there is a Broker wide hook set, it will be called
// Note: If there is a Broker wide hook set, it will be called
// after the per-subscription hook is called.
func (b *Broker) SubscribeEx(onSubscribeHook OnSubscribeHook) *Subscription {
func (b *Broker) SubscribeEx(buffer int64, onSubscribeHook OnSubscribeHook) *Subscription {
var ch channels.Channel
if buffer <= 0 {
ch = channels.NewInfiniteChannel()
} else {
ch = channels.NewRingChannel(channels.BufferCap(buffer))
}
ctx := &cmdCtx{
ch: channels.NewInfiniteChannel(),
ch: ch,
errCh: make(chan error),
onSubscribeHook: onSubscribeHook,
isSubscribe: true,
Expand Down Expand Up @@ -167,7 +182,7 @@ func (b *Broker) worker() {
func NewBroker(pubLastOnSubscribe bool) *Broker {
b := newBroker()
if pubLastOnSubscribe {
b.onSubscribeHook = func(ch *channels.InfiniteChannel) {
b.onSubscribeHook = func(ch channels.Channel) {
if b.lastBroadcasted != nil {
ch.In() <- b.lastBroadcasted.v
}
Expand All @@ -192,7 +207,7 @@ func NewBrokerEx(onSubscribeHook OnSubscribeHook) *Broker {

func newBroker() *Broker {
return &Broker{
subscribers: make(map[*channels.InfiniteChannel]bool),
subscribers: make(map[channels.Channel]bool),
cmdCh: make(chan *cmdCtx),
broadcastCh: channels.NewInfiniteChannel(),
}
Expand Down
107 changes: 87 additions & 20 deletions go/common/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@ import (
"github.com/stretchr/testify/require"
)

const recvTimeout = 5 * time.Second
const (
recvTimeout = 5 * time.Second
bufferSize = 5
)

func TestPubSub(t *testing.T) {
t.Run("Basic", testBasic)
t.Run("BasicInfinity", testBasicInfinity)
t.Run("BasicOverwriting", testBasicOverwriting)
t.Run("PubLastOnSubscribe", testLastOnSubscribe)
t.Run("SubscribeEx", testSubscribeEx)
t.Run("NewBrokerEx", testNewBrokerEx)
}

func testBasic(t *testing.T) {
func testBasicInfinity(t *testing.T) {
broker := NewBroker(false)

sub := broker.Subscribe()
Expand Down Expand Up @@ -50,41 +54,104 @@ func testBasic(t *testing.T) {
require.Len(t, broker.subscribers, 0, "Subscriber map, post Close()")
}

func testLastOnSubscribe(t *testing.T) {
broker := NewBroker(true)
broker.Broadcast(23)
func testBasicOverwriting(t *testing.T) {
broker := NewBroker(false)

sub := broker.Subscribe()
sub := broker.SubscribeBuffered(bufferSize)
typedCh := make(chan int)
sub.Unwrap(typedCh)

// Test a single broadcast/receive.
broker.Broadcast(23)
select {
case v := <-typedCh:
require.Equal(t, 23, v, "Last Broadcast()")
require.Equal(t, 23, v, "Single Broadcast())")
case <-time.After(recvTimeout):
t.Fatalf("Failed to receive value, last Broadcast() on Subscribe()")
t.Fatalf("Failed to receive value, initial Broadcast()")
}

// Test the buffered nature of the overwriting channel.
for i := 0; i < bufferSize+10; i++ {
broker.Broadcast(i)
}
// Ensure we don't start reading before all messages are processed by the
// underlying channel.
time.Sleep(100 * time.Millisecond)

// RingChannel prefers to write before buffering the items, so the first
// element will be instantly send to the output channel and removed from the
// buffer so it will not get overwritten.
expected := []int{
0,
}
for i := 10; i < bufferSize+10; i++ {
expected = append(expected, i)
}
for _, i := range expected {
select {
case v := <-typedCh:
require.Equal(t, i, v, "Buffered Broadcast()")
case <-time.After(recvTimeout):
t.Fatalf("Failed to receive value, buffered Broadcast()")
}
}

require.NotPanics(t, func() { sub.Close() }, "Close()")
require.Len(t, broker.subscribers, 0, "Subscriber map, post Close()")
}

func testLastOnSubscribe(t *testing.T) {
broker := NewBroker(true)
broker.Broadcast(23)

for _, b := range []int64{
int64(channels.Infinity),
bufferSize,
} {
sub := broker.SubscribeBuffered(b)
typedCh := make(chan int)
sub.Unwrap(typedCh)

select {
case v := <-typedCh:
require.Equal(t, 23, v, "Last Broadcast()")
case <-time.After(recvTimeout):
t.Fatalf("Failed to receive value, last Broadcast() on Subscribe()")
}
}
}

func testSubscribeEx(t *testing.T) {
broker := NewBroker(false)

var callbackCh *channels.InfiniteChannel
sub := broker.SubscribeEx(func(ch *channels.InfiniteChannel) {
var callbackCh channels.Channel
callback := func(ch channels.Channel) {
callbackCh = ch
})
}

for _, b := range []int64{
int64(channels.Infinity),
bufferSize,
} {
sub := broker.SubscribeEx(b, callback)

require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
}

require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
}

func testNewBrokerEx(t *testing.T) {
var callbackCh *channels.InfiniteChannel
broker := NewBrokerEx(func(ch *channels.InfiniteChannel) {
var callbackCh channels.Channel
broker := NewBrokerEx(func(ch channels.Channel) {
callbackCh = ch
})

sub := broker.Subscribe()
require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
for _, b := range []int64{
int64(channels.Infinity),
bufferSize,
} {
sub := broker.SubscribeBuffered(b)
require.NotNil(t, sub.ch, "Subscription, inner channel")
require.Equal(t, sub.ch, callbackCh, "Callback channel != Subscription, inner channel")
}
}
10 changes: 9 additions & 1 deletion go/consensus/tendermint/epochtime/epochtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func (t *tendermintBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscri
return typedCh, sub
}

func (t *tendermintBackend) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) {
typedCh := make(chan api.EpochTime)
sub := t.notifier.SubscribeBuffered(1)
sub.Unwrap(typedCh)

return typedCh, sub
}

func (t *tendermintBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
now, err := t.GetEpoch(ctx, height)
if err != nil {
Expand Down Expand Up @@ -125,7 +133,7 @@ func New(ctx context.Context, service service.TendermintService, interval int64)
base: base,
epoch: base,
}
r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
r.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
r.RLock()
defer r.RUnlock()

Expand Down
10 changes: 9 additions & 1 deletion go/consensus/tendermint/epochtime_mock/epochtime_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ func (t *tendermintMockBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Sub
return typedCh, sub
}

func (t *tendermintMockBackend) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) {
typedCh := make(chan api.EpochTime)
sub := t.notifier.SubscribeBuffered(1)
sub.Unwrap(typedCh)

return typedCh, sub
}

func (t *tendermintMockBackend) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
now, err := t.GetEpoch(ctx, height)
if err != nil {
Expand Down Expand Up @@ -232,7 +240,7 @@ func New(ctx context.Context, service service.TendermintService) (api.SetableBac
service: service,
querier: a.QueryFactory().(*app.QueryFactory),
}
r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
r.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
r.RLock()
defer r.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/keymanager/keymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
service: service,
querier: a.QueryFactory().(*app.QueryFactory),
}
tb.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
tb.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
statuses, err := tb.GetStatuses(ctx, consensus.HeightLatest)
if err != nil {
tb.logger.Error("status notifier: unable to get a list of statuses",
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
nodeNotifier: pubsub.NewBroker(false),
nodeListNotifier: pubsub.NewBroker(true),
}
tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
tb.runtimeNotifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
wr := ch.In()
runtimes, err := tb.GetRuntimes(ctx, consensus.HeightLatest)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (tb *tendermintBackend) getLatestBlockAt(ctx context.Context, id common.Nam
func (tb *tendermintBackend) WatchBlocks(id common.Namespace) (<-chan *api.AnnotatedBlock, *pubsub.Subscription, error) {
notifiers := tb.getRuntimeNotifiers(id)

sub := notifiers.blockNotifier.SubscribeEx(func(ch *channels.InfiniteChannel) {
sub := notifiers.blockNotifier.SubscribeEx(-1, func(ch channels.Channel) {
// Replay the latest block if it exists.
notifiers.Lock()
defer notifiers.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion go/consensus/tendermint/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func New(ctx context.Context, service service.TendermintService) (api.Backend, e
service: service,
querier: a.QueryFactory().(*app.QueryFactory),
}
tb.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) {
tb.notifier = pubsub.NewBrokerEx(func(ch channels.Channel) {
currentCommittees, err := tb.getCurrentCommittees()
if err != nil {
tb.logger.Error("couldn't get current committees. won't send them. good luck to the subscriber",
Expand Down
7 changes: 7 additions & 0 deletions go/epochtime/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type Backend interface {
// Upon subscription the current epoch is sent immediately.
WatchEpochs() (<-chan EpochTime, *pubsub.Subscription)

// WatchLatestEpoch returns a channel that produces a stream of messages on
// epoch transitions. If an epoch transition hapens before previous epoch
// is read from channel, the old epochs is overwritten.
//
// Upon subscription the current epoch is sent immediately.
WatchLatestEpoch() (<-chan EpochTime, *pubsub.Subscription)

// StateToGenesis returns the genesis state at the specified block height.
StateToGenesis(ctx context.Context, height int64) (*Genesis, error)
}
Expand Down
16 changes: 16 additions & 0 deletions go/epochtime/tests/mock_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func EpochtimeSetableImplementationTest(t *testing.T, backend api.Backend) {
t.Fatalf("failed to receive current epoch on WatchEpochs")
}

latestCh, subCh := timeSource.WatchLatestEpoch()
defer subCh.Close()
select {
case e = <-latestCh:
require.Equal(epoch, e, "WatchLatestEpoch initial")
case <-time.After(recvTimeout):
t.Fatalf("failed to receive current epoch on WatchLatestEpoch")
}

epoch++
err = timeSource.SetEpoch(context.Background(), epoch)
require.NoError(err, "SetEpoch")
Expand All @@ -48,6 +57,13 @@ func EpochtimeSetableImplementationTest(t *testing.T, backend api.Backend) {
t.Fatalf("failed to receive epoch notification after transition")
}

select {
case e = <-latestCh:
require.Equal(epoch, e, "WatchLatestEpoch after set")
case <-time.After(recvTimeout):
t.Fatalf("failed to receive latest epoch after transition")
}

e, err = timeSource.GetEpoch(context.Background(), consensus.HeightLatest)
require.NoError(err, "GetEpoch after set")
require.Equal(epoch, e, "GetEpoch after set, epoch")
Expand Down
4 changes: 4 additions & 0 deletions go/oasis-node/cmd/debug/consim/timesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (b *simTimeSource) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscriptio
panic("consim/epochtime: WatchEpochs not supported")
}

func (b *simTimeSource) WatchLatestEpoch() (<-chan api.EpochTime, *pubsub.Subscription) {
panic("consim/epochtime: WatchLatestEpoch not supported")
}

func (b *simTimeSource) StateToGenesis(ctx context.Context, height int64) (*api.Genesis, error) {
// WARNING: This ignores the height because it's only used for the final
// dump.
Expand Down
Loading