Skip to content

Commit

Permalink
Merge pull request #8 from playnet-public/fix-event-handling
Browse files Browse the repository at this point in the history
add broker implementation for event and subscription handling
  • Loading branch information
kwiesmueller authored Mar 26, 2019
2 parents f395876 + 574699b commit b2b6b87
Show file tree
Hide file tree
Showing 17 changed files with 688 additions and 493 deletions.
11 changes: 6 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ go:

env:
global:
- REPO: $TRAVIS_REPO_SLUG
- VERSION: $TRAVIS_TAG
- REPO: $TRAVIS_REPO_SLUG
- VERSION: $TRAVIS_TAG

before_script:
- make test
- make check
- go get github.com/schrej/godacov
- godacov -t $CODACY_TOKEN -r ./coverage.out -c $TRAVIS_COMMIT

script:
- make docker
- docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD" quay.io
- make upload
- make docker
- docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD" quay.io
- make upload
74 changes: 74 additions & 0 deletions pkg/event/broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package event

import (
"context"
"errors"

"github.com/seibert-media/golibs/log"
"go.uber.org/zap"
)

var (
// ErrInputClosed is returned when the input channel for the broker gets closed
ErrInputClosed = errors.New("input channel closed")
)

// Broker for subscribing to an eventsource with multiple subscriptions automatically canceled on ctx.Close
type Broker struct {
new chan chan<- Event
active map[chan<- Event]struct{}
closed chan chan<- Event

in <-chan Event
}

// NewBroker with the provided input channel as event source
// A running broker will handle all incoming events by sending them to all active subscriptions
func NewBroker(ctx context.Context, in <-chan Event) *Broker {
return &Broker{
new: make(chan chan<- Event),
active: make(map[chan<- Event]struct{}),
closed: make(chan chan<- Event),

in: in,
}
}

// Run the broker and listen for new subscriptions, events and unsubscribes
// The broker will run until either it's parent context closes or the incoming event channel gets closed
func (b *Broker) Run(ctx context.Context) error {
defer func() {
for s := range b.active {
delete(b.active, s)
close(s)
}
}()
for {
select {
case <-ctx.Done():
log.From(ctx).Info("stopping background loop", zap.Error(ctx.Err()))
return ctx.Err()

case s := <-b.new:
b.active[s] = struct{}{}
log.From(ctx).Debug("subscribing", zap.Int("count", len(b.active)))

case s := <-b.closed:
delete(b.active, s)
close(s)
log.From(ctx).Debug("unsubscribing", zap.Int("count", len(b.active)))

case event, ok := <-b.in:
if !ok {
log.From(ctx).Info("stopping background loop")
return ErrInputClosed
}

for s := range b.active {
log.From(ctx).Debug("handling event", zap.String("data", event.Data()))
// TODO(kwiesmueller): make sure we don't leak here and need an unsubscribing timeout
go func(s chan<- Event) { s <- event }(s)
}
}
}
}
145 changes: 145 additions & 0 deletions pkg/event/broker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package event_test

import (
"context"
"testing"
"time"

"github.com/playnet-public/gorcon/pkg/event"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/seibert-media/golibs/log"
)

const debug = false

func TestEvent(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Event Suite")
}

type fakeEvent struct{}

func (f *fakeEvent) Timestamp() time.Time {
t, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
return t
}
func (f *fakeEvent) Kind() string { return "fake" }
func (f *fakeEvent) Data() string { return "fake" }

var _ = Describe("Event", func() {

setup := func() (ctx context.Context, in chan event.Event, b *event.Broker) {
ctx = context.Background()
l := log.New("", debug)
ctx = log.WithLogger(ctx, l)
in = make(chan event.Event)
b = event.NewBroker(ctx, in)
return
}

Describe("Run", func() {
It("does exit on closed context", func() {
ctx, _, b := setup()

ctx, close := context.WithCancel(ctx)
close()
Expect(b.Run(ctx)).To(BeEquivalentTo(context.Canceled))
})
It("does return error on closed input", func() {
ctx, in, b := setup()

close(in)
Expect(b.Run(ctx)).To(BeEquivalentTo(event.ErrInputClosed))
})
It("does cleanup and close all subscriptions on exit", func() {
ctx, in, b := setup()

go b.Run(ctx)

c1 := make(chan event.Event)
b.Subscribe(ctx, c1)

c2 := make(chan event.Event)
b.Subscribe(ctx, c2)

close(in)

e1, ok1 := <-c1
e2, ok2 := <-c2
Expect(e1).To(BeNil())
Expect(e2).To(BeNil())
Expect(ok1).To(BeFalse())
Expect(ok2).To(BeFalse())
})
It("does not block on inactive subscriptions", func() {
ctx, in, b := setup()

go b.Run(ctx)

c1 := make(chan event.Event)
b.Subscribe(ctx, c1)

c2 := make(chan event.Event)
b.Subscribe(ctx, c2)

go func() { in <- &fakeEvent{} }()

Expect(<-c1).NotTo(BeNil())

go func() { in <- &fakeEvent{} }()

Expect(<-c1).NotTo(BeNil())
})
})

Describe("Subscribe", func() {
It("does forward events to new subscriptions", func() {
ctx, in, b := setup()

go b.Run(ctx)
c := make(chan event.Event)
b.Subscribe(ctx, c)
go func() { in <- &fakeEvent{} }()
Expect(<-c).NotTo(BeNil())
})
It("does unsubscribe on closed context", func() {
ctx, in, b := setup()

go b.Run(ctx)
ctx, close := context.WithCancel(ctx)
c := make(chan event.Event)
b.Subscribe(ctx, c)
go func() { in <- &fakeEvent{} }()
Expect(<-c).NotTo(BeNil())

close()
// wait until we check, as it might take one event
<-time.After(1 * time.Millisecond)

go func() { in <- &fakeEvent{} }()

e, ok := <-c
Expect(ok).To(BeFalse())
Expect(e).To(BeNil())
})
})

Describe("Unsubscribe", func() {
It("does not forward events to closed subscriptions", func() {
ctx, in, b := setup()

go b.Run(ctx)
c := make(chan event.Event)
b.Subscribe(ctx, c)
b.Unsubscribe(ctx, c)
go func() { in <- &fakeEvent{} }()
select {
case e := <-c:
Expect(e).To(BeNil())
case <-time.After(5 * time.Millisecond):
}
})
})
})
10 changes: 10 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package event

import "time"

// Event is the generic interface for events handled by the broker
type Event interface {
Timestamp() time.Time
Kind() string
Data() string
}
28 changes: 28 additions & 0 deletions pkg/event/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package event

import (
"context"
"errors"
"time"

"github.com/seibert-media/golibs/log"
"go.uber.org/zap"
)

// Subscribe adds a new channel as receiver for events and unsubscribes on a closed ctx
func (b *Broker) Subscribe(ctx context.Context, out chan<- Event) {
b.new <- out
go func() {
<-ctx.Done()
select {
case b.closed <- out:
case <-time.After(1 * time.Second):
log.From(ctx).Warn("closing subscription", zap.String("reason", "ctx closed"), zap.Error(errors.New("timeout")))
}
}()
}

// Unsubscribe removes the provided channel from the active listeners and tells the broker to clean up
func (b *Broker) Unsubscribe(ctx context.Context, out chan<- Event) {
b.closed <- out
}
11 changes: 6 additions & 5 deletions pkg/mocks/rcon_connection.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b2b6b87

Please sign in to comment.