From 71e14e33f186d1a7972df645badc82a10c4391c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kevin=20Wiesm=C3=BCller?= Date: Tue, 26 Mar 2019 21:28:53 +0100 Subject: [PATCH 1/2] add broker implementation for event and subscription handling --- pkg/event/broker.go | 74 ++++++++ pkg/event/broker_test.go | 145 +++++++++++++++ pkg/event/event.go | 10 ++ pkg/event/subscription.go | 28 +++ pkg/mocks/rcon_connection.go | 11 +- pkg/rcon/battleye/battleye.go | 51 +++--- pkg/rcon/battleye/battleye_internal_test.go | 87 ++------- pkg/rcon/battleye/battleye_test.go | 167 ++++++++++++------ pkg/rcon/battleye/helpers_test.go | 9 +- pkg/rcon/battleye/reader.go | 15 +- pkg/rcon/battleye/reader_test.go | 25 +-- pkg/rcon/rcon.go | 35 +++- pkg/watcher/event.go | 21 ++- pkg/watcher/watcher.go | 186 ++++++++++---------- pkg/watcher/watcher_internal_test.go | 124 +++++-------- pkg/watcher/watcher_test.go | 182 +++++++------------ 16 files changed, 682 insertions(+), 488 deletions(-) create mode 100644 pkg/event/broker.go create mode 100644 pkg/event/broker_test.go create mode 100644 pkg/event/event.go create mode 100644 pkg/event/subscription.go diff --git a/pkg/event/broker.go b/pkg/event/broker.go new file mode 100644 index 0000000..ca4817e --- /dev/null +++ b/pkg/event/broker.go @@ -0,0 +1,74 @@ +package event + +import ( + "context" + "errors" + + "github.com/seibert-media/golibs/log" + "go.uber.org/zap" +) + +var ( + // ErrInputClosed is returned when the input channel for the broker gets closed + ErrInputClosed = errors.New("input channel closed") +) + +// Broker for subscribing to an eventsource with multiple subscriptions automatically canceled on ctx.Close +type Broker struct { + new chan chan<- Event + active map[chan<- Event]struct{} + closed chan chan<- Event + + in <-chan Event +} + +// NewBroker with the provided input channel as event source +// A running broker will handle all incoming events by sending them to all active subscriptions +func NewBroker(ctx context.Context, in <-chan Event) *Broker { + return &Broker{ + new: make(chan chan<- Event), + active: make(map[chan<- Event]struct{}), + closed: make(chan chan<- Event), + + in: in, + } +} + +// Run the broker and listen for new subscriptions, events and unsubscribes +// The broker will run until either it's parent context closes or the incoming event channel gets closed +func (b *Broker) Run(ctx context.Context) error { + defer func() { + for s := range b.active { + delete(b.active, s) + close(s) + } + }() + for { + select { + case <-ctx.Done(): + log.From(ctx).Info("stopping background loop", zap.Error(ctx.Err())) + return ctx.Err() + + case s := <-b.new: + b.active[s] = struct{}{} + log.From(ctx).Debug("subscribing", zap.Int("count", len(b.active))) + + case s := <-b.closed: + delete(b.active, s) + close(s) + log.From(ctx).Debug("unsubscribing", zap.Int("count", len(b.active))) + + case event, ok := <-b.in: + if !ok { + log.From(ctx).Info("stopping background loop") + return ErrInputClosed + } + + for s := range b.active { + log.From(ctx).Debug("handling event", zap.String("data", event.Data())) + // TODO(kwiesmueller): make sure we don't leak here and need an unsubscribing timeout + go func(s chan<- Event) { s <- event }(s) + } + } + } +} diff --git a/pkg/event/broker_test.go b/pkg/event/broker_test.go new file mode 100644 index 0000000..55c032d --- /dev/null +++ b/pkg/event/broker_test.go @@ -0,0 +1,145 @@ +package event_test + +import ( + "context" + "testing" + "time" + + "github.com/playnet-public/gorcon/pkg/event" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/seibert-media/golibs/log" +) + +const debug = false + +func TestEvent(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Event Suite") +} + +type fakeEvent struct{} + +func (f *fakeEvent) Timestamp() time.Time { + t, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + return t +} +func (f *fakeEvent) Kind() string { return "fake" } +func (f *fakeEvent) Data() string { return "fake" } + +var _ = Describe("Event", func() { + + setup := func() (ctx context.Context, in chan event.Event, b *event.Broker) { + ctx = context.Background() + l := log.New("", debug) + ctx = log.WithLogger(ctx, l) + in = make(chan event.Event) + b = event.NewBroker(ctx, in) + return + } + + Describe("Run", func() { + It("does exit on closed context", func() { + ctx, _, b := setup() + + ctx, close := context.WithCancel(ctx) + close() + Expect(b.Run(ctx)).To(BeEquivalentTo(context.Canceled)) + }) + It("does return error on closed input", func() { + ctx, in, b := setup() + + close(in) + Expect(b.Run(ctx)).To(BeEquivalentTo(event.ErrInputClosed)) + }) + It("does cleanup and close all subscriptions on exit", func() { + ctx, in, b := setup() + + go b.Run(ctx) + + c1 := make(chan event.Event) + b.Subscribe(ctx, c1) + + c2 := make(chan event.Event) + b.Subscribe(ctx, c2) + + close(in) + + e1, ok1 := <-c1 + e2, ok2 := <-c2 + Expect(e1).To(BeNil()) + Expect(e2).To(BeNil()) + Expect(ok1).To(BeFalse()) + Expect(ok2).To(BeFalse()) + }) + It("does not block on inactive subscriptions", func() { + ctx, in, b := setup() + + go b.Run(ctx) + + c1 := make(chan event.Event) + b.Subscribe(ctx, c1) + + c2 := make(chan event.Event) + b.Subscribe(ctx, c2) + + go func() { in <- &fakeEvent{} }() + + Expect(<-c1).NotTo(BeNil()) + + go func() { in <- &fakeEvent{} }() + + Expect(<-c1).NotTo(BeNil()) + }) + }) + + Describe("Subscribe", func() { + It("does forward events to new subscriptions", func() { + ctx, in, b := setup() + + go b.Run(ctx) + c := make(chan event.Event) + b.Subscribe(ctx, c) + go func() { in <- &fakeEvent{} }() + Expect(<-c).NotTo(BeNil()) + }) + It("does unsubscribe on closed context", func() { + ctx, in, b := setup() + + go b.Run(ctx) + ctx, close := context.WithCancel(ctx) + c := make(chan event.Event) + b.Subscribe(ctx, c) + go func() { in <- &fakeEvent{} }() + Expect(<-c).NotTo(BeNil()) + + close() + // wait until we check, as it might take one event + <-time.After(1 * time.Millisecond) + + go func() { in <- &fakeEvent{} }() + + e, ok := <-c + Expect(ok).To(BeFalse()) + Expect(e).To(BeNil()) + }) + }) + + Describe("Unsubscribe", func() { + It("does not forward events to closed subscriptions", func() { + ctx, in, b := setup() + + go b.Run(ctx) + c := make(chan event.Event) + b.Subscribe(ctx, c) + b.Unsubscribe(ctx, c) + go func() { in <- &fakeEvent{} }() + select { + case e := <-c: + Expect(e).To(BeNil()) + case <-time.After(5 * time.Millisecond): + } + }) + }) +}) diff --git a/pkg/event/event.go b/pkg/event/event.go new file mode 100644 index 0000000..82273e0 --- /dev/null +++ b/pkg/event/event.go @@ -0,0 +1,10 @@ +package event + +import "time" + +// Event is the generic interface for events handled by the broker +type Event interface { + Timestamp() time.Time + Kind() string + Data() string +} diff --git a/pkg/event/subscription.go b/pkg/event/subscription.go new file mode 100644 index 0000000..ca87f67 --- /dev/null +++ b/pkg/event/subscription.go @@ -0,0 +1,28 @@ +package event + +import ( + "context" + "errors" + "time" + + "github.com/seibert-media/golibs/log" + "go.uber.org/zap" +) + +// Subscribe adds a new channel as receiver for events and unsubscribes on a closed ctx +func (b *Broker) Subscribe(ctx context.Context, out chan<- Event) { + b.new <- out + go func() { + <-ctx.Done() + select { + case b.closed <- out: + case <-time.After(1 * time.Second): + log.From(ctx).Warn("closing subscription", zap.String("reason", "ctx closed"), zap.Error(errors.New("timeout"))) + } + }() +} + +// Unsubscribe removes the provided channel from the active listeners and tells the broker to clean up +func (b *Broker) Unsubscribe(ctx context.Context, out chan<- Event) { + b.closed <- out +} diff --git a/pkg/mocks/rcon_connection.go b/pkg/mocks/rcon_connection.go index e51d65a..8092de9 100644 --- a/pkg/mocks/rcon_connection.go +++ b/pkg/mocks/rcon_connection.go @@ -5,6 +5,7 @@ import ( "context" "sync" + "github.com/playnet-public/gorcon/pkg/event" "github.com/playnet-public/gorcon/pkg/rcon" ) @@ -45,11 +46,11 @@ type RconConnection struct { result1 rcon.Transmission result2 error } - SubscribeStub func(context.Context, chan *rcon.Event) + SubscribeStub func(context.Context, chan<- event.Event) subscribeMutex sync.RWMutex subscribeArgsForCall []struct { arg1 context.Context - arg2 chan *rcon.Event + arg2 chan<- event.Event } invocations map[string][][]interface{} invocationsMutex sync.RWMutex @@ -203,11 +204,11 @@ func (fake *RconConnection) WriteReturnsOnCall(i int, result1 rcon.Transmission, }{result1, result2} } -func (fake *RconConnection) Subscribe(arg1 context.Context, arg2 chan *rcon.Event) { +func (fake *RconConnection) Subscribe(arg1 context.Context, arg2 chan<- event.Event) { fake.subscribeMutex.Lock() fake.subscribeArgsForCall = append(fake.subscribeArgsForCall, struct { arg1 context.Context - arg2 chan *rcon.Event + arg2 chan<- event.Event }{arg1, arg2}) fake.recordInvocation("Subscribe", []interface{}{arg1, arg2}) fake.subscribeMutex.Unlock() @@ -222,7 +223,7 @@ func (fake *RconConnection) SubscribeCallCount() int { return len(fake.subscribeArgsForCall) } -func (fake *RconConnection) SubscribeArgsForCall(i int) (context.Context, chan *rcon.Event) { +func (fake *RconConnection) SubscribeArgsForCall(i int) (context.Context, chan<- event.Event) { fake.subscribeMutex.RLock() defer fake.subscribeMutex.RUnlock() return fake.subscribeArgsForCall[i].arg1, fake.subscribeArgsForCall[i].arg2 diff --git a/pkg/rcon/battleye/battleye.go b/pkg/rcon/battleye/battleye.go index 031d8f1..592e543 100644 --- a/pkg/rcon/battleye/battleye.go +++ b/pkg/rcon/battleye/battleye.go @@ -7,23 +7,34 @@ import ( "sync/atomic" "time" - "go.uber.org/zap" + "github.com/playnet-public/gorcon/pkg/event" + "github.com/playnet-public/gorcon/pkg/rcon" "github.com/pkg/errors" + be_proto "github.com/playnet-public/battleye/battleye" "github.com/seibert-media/golibs/log" + "go.uber.org/zap" tomb "gopkg.in/tomb.v2" - - be_proto "github.com/playnet-public/battleye/battleye" - "github.com/playnet-public/gorcon/pkg/rcon" ) // Client is a BattlEye specific implementation of rcon.Client to create new BattlEye rcon connections type Client struct { + *event.Broker + events chan event.Event +} + +// New battleye client +func New(ctx context.Context) *Client { + e := make(chan event.Event) + return &Client{ + Broker: event.NewBroker(ctx, e), + events: e, + } } // NewConnection from the current client's configuration func (c *Client) NewConnection(ctx context.Context) rcon.Connection { - return NewConnection(ctx) + return NewConnection(ctx, c.Broker, c.events) } // Connection is a BattlEye specific implementation of rcon.Connection offering all required rcon generics @@ -42,17 +53,18 @@ type Connection struct { transmissions map[be_proto.Sequence]*Transmission transmissionsMutext sync.RWMutex - subscriptions []chan *rcon.Event - subscriptionsMutex sync.RWMutex + *event.Broker + events chan event.Event Tomb *tomb.Tomb } // NewConnection from the passed in configuration -func NewConnection(ctx context.Context) *Connection { +func NewConnection(ctx context.Context, broker *event.Broker, events chan event.Event) *Connection { c := &Connection{ - Protocol: be_proto.New(), - subscriptions: []chan *rcon.Event{}, + Protocol: be_proto.New(), + Broker: broker, + events: events, } atomic.StoreUint32(&c.seq, 0) atomic.StoreInt64(&c.keepAliveCount, 0) @@ -191,22 +203,3 @@ func (c *Connection) Write(ctx context.Context, cmd string) (rcon.Transmission, } return trm, nil } - -// Subscribe for events on the connection until the context ends -func (c *Connection) Subscribe(ctx context.Context, to chan *rcon.Event) { - c.subscriptionsMutex.Lock() - defer c.subscriptionsMutex.Unlock() - c.subscriptions = append(c.subscriptions, to) - go func() { - <-ctx.Done() - c.subscriptionsMutex.Lock() - defer c.subscriptionsMutex.Unlock() - for i, s := range c.subscriptions { - if s == to { - c.subscriptions[i] = c.subscriptions[len(c.subscriptions)-1] - c.subscriptions[len(c.subscriptions)-1] = nil - c.subscriptions = c.subscriptions[:len(c.subscriptions)-1] - } - } - }() -} diff --git a/pkg/rcon/battleye/battleye_internal_test.go b/pkg/rcon/battleye/battleye_internal_test.go index 419a7bf..2a9152d 100644 --- a/pkg/rcon/battleye/battleye_internal_test.go +++ b/pkg/rcon/battleye/battleye_internal_test.go @@ -2,103 +2,36 @@ package battleye import ( "context" - "time" be_mocks "github.com/playnet-public/battleye/mocks" + "github.com/playnet-public/gorcon/pkg/event" . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/playnet-public/gorcon/pkg/rcon" + //. "github.com/onsi/gomega" ) var _ = Describe("Connection", func() { var ( + ctx context.Context + c chan event.Event + b *event.Broker con *Connection proto *be_mocks.Protocol - ctx context.Context ) BeforeEach(func() { - proto = &be_mocks.Protocol{} ctx = context.Background() - con = NewConnection(ctx) + c = make(chan event.Event) + b = event.NewBroker(ctx, c) + con = NewConnection(ctx, b, c) + proto = &be_mocks.Protocol{} con.Protocol = proto }) Describe("Subscribe", func() { BeforeEach(func() { - con = NewConnection(ctx) + con = NewConnection(ctx, b, c) ctx = context.Background() }) - It("does add subscription on Listen", func() { - l := len(con.subscriptions) - ctx, _ := context.WithCancel(ctx) - con.Subscribe(ctx, make(chan *rcon.Event)) - time.Sleep(1 * time.Second) - con.subscriptionsMutex.RLock() - Expect(len(con.subscriptions)).To(BeEquivalentTo(l + 1)) - con.subscriptionsMutex.RUnlock() - }) - It("does end subscription on closed context", func() { - ctx, close := context.WithCancel(ctx) - l := len(con.subscriptions) - con.Subscribe(ctx, make(chan *rcon.Event)) - con.subscriptionsMutex.RLock() - Expect(len(con.subscriptions)).To(BeEquivalentTo(l + 1)) - con.subscriptionsMutex.RUnlock() - close() - time.Sleep(1 * time.Second) - con.subscriptionsMutex.RLock() - Expect(len(con.subscriptions)).To(BeEquivalentTo(l)) - con.subscriptionsMutex.RUnlock() - }) - It("does remove the correct subscription on closed context", func(done Done) { - ctx, _ := context.WithCancel(ctx) - c1 := make(chan *rcon.Event) - c2 := make(chan *rcon.Event) - ctx2, close2 := context.WithCancel(ctx) - c3 := make(chan *rcon.Event) - con.Subscribe(ctx, c1) - con.Subscribe(ctx2, c2) - con.Subscribe(ctx, c3) - - go func() { - Expect(<-c1).NotTo(BeNil()) - }() - go func() { - Expect(<-c2).NotTo(BeNil()) - }() - go func() { - Expect(<-c3).NotTo(BeNil()) - }() - e := &rcon.Event{} - con.subscriptionsMutex.RLock() - defer con.subscriptionsMutex.RUnlock() - for _, l := range con.subscriptions { - go func(l chan *rcon.Event) { l <- e }(l) - } - close2() - go func() { - Expect(<-c1).NotTo(BeNil()) - }() - go func() { - defer GinkgoRecover() - select { - case <-c2: - Fail("should not receive on c2") - case <-time.After(300 * time.Millisecond): - Expect(true).To(BeTrue()) - } - close(done) - }() - go func() { - Expect(<-c3).NotTo(BeNil()) - }() - con.subscriptionsMutex.RLock() - defer con.subscriptionsMutex.RUnlock() - for _, l := range con.subscriptions { - go func(l chan *rcon.Event) { l <- e }(l) - } - }) }) }) diff --git a/pkg/rcon/battleye/battleye_test.go b/pkg/rcon/battleye/battleye_test.go index 7771513..b0af5fd 100644 --- a/pkg/rcon/battleye/battleye_test.go +++ b/pkg/rcon/battleye/battleye_test.go @@ -11,11 +11,11 @@ import ( be_proto "github.com/playnet-public/battleye/battleye" be_mocks "github.com/playnet-public/battleye/mocks" + "github.com/playnet-public/gorcon/pkg/mocks" + be "github.com/playnet-public/gorcon/pkg/rcon/battleye" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/playnet-public/gorcon/pkg/mocks" - be "github.com/playnet-public/gorcon/pkg/rcon/battleye" ) func TestBattlEye(t *testing.T) { @@ -25,13 +25,13 @@ func TestBattlEye(t *testing.T) { var _ = Describe("Client", func() { var ( - c *be.Client ctx context.Context + c *be.Client ) BeforeEach(func() { - c = &be.Client{} ctx = context.Background() + c = be.New(ctx) }) Describe("NewConnection", func() { @@ -42,105 +42,126 @@ var _ = Describe("Client", func() { }) var _ = Describe("Connection", func() { - var ( - con *be.Connection - dial *mocks.UDPDialer - udp *mocks.UDPConnection - proto *be_mocks.Protocol - ctx context.Context - ) + setup := func() (ctx context.Context, c *be.Client, con *be.Connection, dial *mocks.UDPDialer, proto *be_mocks.Protocol, udp *mocks.UDPConnection) { + ctx = context.Background() + c = be.New(ctx) + con = c.NewConnection(ctx).(*be.Connection) - BeforeEach(func() { dial = &mocks.UDPDialer{} - proto = &be_mocks.Protocol{} - ctx = context.Background() - con = be.NewConnection(ctx) con.Dialer = dial + + proto = &be_mocks.Protocol{} con.Protocol = proto udp = &mocks.UDPConnection{} dial.DialUDPReturns(udp, nil) - }) + con.UDP = udp + + return + } Describe("Open", func() { - BeforeEach(func() { - con = be.NewConnection(ctx) - con.Dialer = dial - con.Protocol = proto - con.Password = "test" - udp = &mocks.UDPConnection{} - dial.DialUDPReturns(udp, nil) - proto.VerifyLoginReturns(nil) - }) It("does not return error", func() { + ctx, _, con, dial, _, _ := setup() + con.UDP = nil + con.Dialer = dial Expect(con.Open(ctx)).To(BeNil()) }) It("returns an error if there already is a udp connection", func() { + ctx, _, con, _, _, _ := setup() con.UDP = &net.UDPConn{} Expect(con.Open(ctx)).NotTo(BeNil()) }) It("calls DialUDP once", func() { - con.Open(ctx) + ctx, _, con, dial, _, _ := setup() + con.UDP = nil + con.Dialer = dial + Expect(con.Open(ctx)).To(BeNil()) Expect(dial.DialUDPCallCount()).To(BeEquivalentTo(1)) }) It("calls DialUDP with the correct address", func() { + ctx, _, con, dial, _, _ := setup() + con.UDP = nil + con.Dialer = dial con.Addr, _ = net.ResolveUDPAddr("udp", "127.0.0.1:8080") - con.Open(ctx) + Expect(con.Open(ctx)).To(BeNil()) _, _, addr := dial.DialUDPArgsForCall(0) Expect(addr).To(BeEquivalentTo(con.Addr)) }) It("is setting the udp connection", func() { - con.Open(ctx) + ctx, _, con, dial, _, _ := setup() + con.UDP = nil + con.Dialer = dial + Expect(con.Open(ctx)).To(BeNil()) Expect(con.UDP).NotTo(BeNil()) }) It("calls deadline setters", func() { - con.Open(ctx) + ctx, _, con, dial, _, udp := setup() + con.UDP = nil + con.Dialer = dial + Expect(con.Open(ctx)).To(BeNil()) Expect(udp.SetReadDeadlineCallCount()).To(BeEquivalentTo(1)) Expect(udp.SetWriteDeadlineCallCount()).To(BeEquivalentTo(1)) }) It("returns error if dial fails", func() { + ctx, _, con, dial, _, _ := setup() + con.UDP = nil + con.Dialer = dial dial.DialUDPReturns(nil, errors.New("test")) Expect(con.Open(ctx)).NotTo(BeNil()) }) It("does send a login packet", func() { - con.Open(ctx) + ctx, _, con, dial, _, udp := setup() + con.UDP = nil + con.Dialer = dial + Expect(con.Open(ctx)).To(BeNil()) args := udp.WriteArgsForCall(0) Expect(args).To(BeEquivalentTo(con.Protocol.BuildLoginPacket("test"))) }) It("does use the stored credentials for building login packets", func() { + ctx, _, con, dial, _, udp := setup() + con.UDP = nil + con.Dialer = dial con.Password = "password" - con.Open(ctx) + Expect(con.Open(ctx)).To(BeNil()) args := udp.WriteArgsForCall(0) Expect(args).To(BeEquivalentTo(con.Protocol.BuildLoginPacket("password"))) }) It("does return error if sending login packet fails", func() { + ctx, _, con, dial, _, udp := setup() + con.UDP = nil + con.Dialer = dial udp.WriteReturns(0, errors.New("test")) Expect(con.Open(ctx)).NotTo(BeNil()) }) It("does call read after sending login", func() { - con.Open(ctx) + ctx, _, con, dial, _, udp := setup() + con.UDP = nil + con.Dialer = dial + Expect(con.Open(ctx)).To(BeNil()) Expect(udp.ReadCallCount()).To(BeNumerically(">", 0)) }) It("does return error if reading from udp fails", func() { + ctx, _, con, _, _, udp := setup() udp.ReadReturns(0, errors.New("test")) Expect(con.Open(ctx)).NotTo(BeNil()) }) It("does return error on invalid login response", func() { + ctx, _, con, _, proto, _ := setup() proto.VerifyLoginReturns(errors.New("test")) Expect(con.Open(ctx)).NotTo(BeNil()) }) It("does return error on invalid login credentials", func() { + ctx, _, con, _, proto, _ := setup() proto.VerifyLoginReturns(errors.New("test")) Expect(con.Open(ctx)).NotTo(BeNil()) }) }) Describe("WriterLoop", func() { - BeforeEach(func() { - con.UDP = udp - con.KeepAliveTimeout = 0 - }) It("does send at least one keepAlive packet", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp con.KeepAliveTimeout = 0 con.Tomb.Go(con.WriterLoop(ctx)) <-time.After(time.Second * 1) @@ -149,6 +170,8 @@ var _ = Describe("Connection", func() { con.Close(ctx) }) It("does exit on close", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp con.KeepAliveTimeout = 100 go func() { time.Sleep(time.Second * 1) @@ -158,21 +181,25 @@ var _ = Describe("Connection", func() { }) It("does return error if udp is nil", func() { + ctx, _, con, _, _, _ := setup() con.UDP = nil + con.KeepAliveTimeout = 0 Expect(con.WriterLoop(ctx)()).NotTo(BeNil()) }) }) Describe("ReaderLoop", func() { - BeforeEach(func() { - con.UDP = udp - con.KeepAliveTimeout = 0 - }) It("does return error if udp is nil", func() { + ctx, _, con, _, _, _ := setup() + con.KeepAliveTimeout = 0 con.UDP = nil Expect(con.ReaderLoop(ctx)()).NotTo(BeNil()) }) It("does not return on timeout", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp + con.KeepAliveTimeout = 0 + udp.ReadReturns(0, &timeoutError{}) con.Tomb.Go(con.ReaderLoop(ctx)) <-time.After(time.Second * 1) @@ -181,24 +208,25 @@ var _ = Describe("Connection", func() { con.Close(ctx) }) It("does return on non-timeout error", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp + con.KeepAliveTimeout = 0 + udp.ReadReturns(0, errors.New("test")) Expect(con.ReaderLoop(ctx)()).NotTo(BeNil()) }) }) Describe("Close", func() { - BeforeEach(func() { - con = be.NewConnection(ctx) - con.Dialer = dial - con.Protocol = proto - udp = &mocks.UDPConnection{} - con.UDP = udp - }) It("does not return error", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp con.Hold(ctx) Expect(con.Close(ctx)).To(BeNil()) }) It("does return error if udp connection is nil", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp con.Tomb.Go(func() error { for { <-con.Tomb.Dying() @@ -209,16 +237,22 @@ var _ = Describe("Connection", func() { Expect(con.Close(ctx)).NotTo(BeNil()) }) It("calls close on the udp connection", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp con.Hold(ctx) con.Close(ctx) Expect(udp.CloseCallCount()).To(BeEquivalentTo(1)) }) It("does return error if udp close fails", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp con.Hold(ctx) udp.CloseReturns(errors.New("test")) Expect(con.Close(ctx)).NotTo(BeNil()) }) It("does reset the udp after closing", func() { + ctx, _, con, _, _, udp := setup() + con.UDP = udp con.Hold(ctx) con.Close(ctx) Expect(con.UDP).To(BeNil()) @@ -226,40 +260,63 @@ var _ = Describe("Connection", func() { }) Describe("Write", func() { - BeforeEach(func() { - con.UDP = udp + It("does not return error", func() { + ctx, _, con, _, proto, _ := setup() proto.BuildCmdPacketStub = be_proto.New().BuildCmdPacket con.ResetSequence() - }) - It("does not return error", func() { _, err := con.Write(ctx, "") Expect(err).To(BeNil()) }) It("does return error if udp connection is nil", func() { + ctx, _, con, _, proto, _ := setup() + proto.BuildCmdPacketStub = be_proto.New().BuildCmdPacket + con.ResetSequence() con.UDP = nil _, err := con.Write(ctx, "") Expect(err).NotTo(BeNil()) }) It("does call con.Write", func() { - con.Write(ctx, "test") + ctx, _, con, _, proto, udp := setup() + proto.BuildCmdPacketStub = be_proto.New().BuildCmdPacket + con.ResetSequence() + _, err := con.Write(ctx, "test") + Expect(err).To(BeNil()) Expect(udp.WriteCallCount()).To(BeEquivalentTo(1)) }) It("does return error on failed write", func() { + ctx, _, con, _, proto, udp := setup() + proto.BuildCmdPacketStub = be_proto.New().BuildCmdPacket + con.ResetSequence() udp.WriteReturns(0, errors.New("test")) _, err := con.Write(ctx, "") Expect(err).NotTo(BeNil()) }) It("does write correct command packet", func() { - con.Write(ctx, "test") + ctx, _, con, _, proto, udp := setup() + con.UDP = udp + proto.BuildCmdPacketStub = be_proto.New().BuildCmdPacket + con.ResetSequence() + _, err := con.Write(ctx, "test") + Expect(err).To(BeNil()) Expect(udp.WriteArgsForCall(0)).To(BeEquivalentTo(con.Protocol.BuildCmdPacket([]byte("test"), 1))) }) It("does increase sequence after write", func() { + ctx, _, con, _, proto, udp := setup() + con.UDP = udp + proto.BuildCmdPacketStub = be_proto.New().BuildCmdPacket + con.ResetSequence() seq := con.Sequence() - con.Write(ctx, "") + _, err := con.Write(ctx, "") + Expect(err).To(BeNil()) Expect(con.Sequence() == seq+1).To(BeTrue()) }) It("does add transmission to connection at write", func() { - con.Write(ctx, "test") + ctx, _, con, _, proto, udp := setup() + con.UDP = udp + proto.BuildCmdPacketStub = be_proto.New().BuildCmdPacket + con.ResetSequence() + _, err := con.Write(ctx, "test") + Expect(err).To(BeNil()) Expect(con.GetTransmission(1)).NotTo(BeNil()) }) }) diff --git a/pkg/rcon/battleye/helpers_test.go b/pkg/rcon/battleye/helpers_test.go index a5a18f8..3c38aa0 100644 --- a/pkg/rcon/battleye/helpers_test.go +++ b/pkg/rcon/battleye/helpers_test.go @@ -3,18 +3,23 @@ package battleye_test import ( "context" + be "github.com/playnet-public/gorcon/pkg/rcon/battleye" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - be "github.com/playnet-public/gorcon/pkg/rcon/battleye" ) var _ = Describe("Connection Helpers", func() { var ( + ctx context.Context + c *be.Client con *be.Connection ) BeforeEach(func() { - con = be.NewConnection(context.Background()) + ctx = context.Background() + c = be.New(ctx) + con = c.NewConnection(ctx).(*be.Connection) }) Describe("Sequence", func() { diff --git a/pkg/rcon/battleye/reader.go b/pkg/rcon/battleye/reader.go index 31f57b4..c951b32 100644 --- a/pkg/rcon/battleye/reader.go +++ b/pkg/rcon/battleye/reader.go @@ -7,9 +7,10 @@ import ( "go.uber.org/zap" - "github.com/pkg/errors" be_proto "github.com/playnet-public/battleye/battleye" "github.com/playnet-public/gorcon/pkg/rcon" + + "github.com/pkg/errors" "github.com/seibert-media/golibs/log" ) @@ -118,22 +119,14 @@ func (c *Connection) HandleServerMessage(ctx context.Context, p be_proto.Packet) } } - event := &rcon.Event{ - Timestamp: time.Now(), - Type: t, - Payload: string(p), - } + event := rcon.NewEvent(t, string(p)) _, err = c.UDP.Write(c.Protocol.BuildMsgAckPacket(s)) if err != nil { return errors.Wrap(err, "handling server message") } - c.subscriptionsMutex.RLock() - defer c.subscriptionsMutex.RUnlock() - for _, l := range c.subscriptions { - go func(l chan *rcon.Event) { l <- event }(l) - } + go func(e *rcon.Event) { c.events <- e }(event) return nil } diff --git a/pkg/rcon/battleye/reader_test.go b/pkg/rcon/battleye/reader_test.go index c22d1ba..cbe8b5d 100644 --- a/pkg/rcon/battleye/reader_test.go +++ b/pkg/rcon/battleye/reader_test.go @@ -3,32 +3,35 @@ package battleye_test import ( "context" + be_proto "github.com/playnet-public/battleye/battleye" + be_mocks "github.com/playnet-public/battleye/mocks" + "github.com/playnet-public/gorcon/pkg/event" "github.com/playnet-public/gorcon/pkg/mocks" "github.com/playnet-public/gorcon/pkg/rcon" + be "github.com/playnet-public/gorcon/pkg/rcon/battleye" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/pkg/errors" - be_proto "github.com/playnet-public/battleye/battleye" - be_mocks "github.com/playnet-public/battleye/mocks" - be "github.com/playnet-public/gorcon/pkg/rcon/battleye" ) var _ = Describe("Reader", func() { var ( + ctx context.Context + c *be.Client con *be.Connection pr *be_mocks.Protocol udp *mocks.UDPConnection - ctx context.Context ) BeforeEach(func() { - con = be.NewConnection(context.Background()) + ctx = context.Background() + c = be.New(ctx) + con = c.NewConnection(ctx).(*be.Connection) pr = &be_mocks.Protocol{} udp = &mocks.UDPConnection{} con.Protocol = pr con.UDP = udp - ctx = context.Background() }) Describe("HandlePacket", func() { @@ -200,18 +203,20 @@ var _ = Describe("Reader", func() { Expect(con.HandleServerMessage(ctx, nil)).NotTo(BeNil()) }) It("does send event to channel", func() { - c := make(chan *rcon.Event) + c := make(chan event.Event) + go con.Broker.Run(ctx) con.Subscribe(ctx, c) con.HandleServerMessage(ctx, []byte("test")) event := <-c - Expect(event.Payload).NotTo(BeEquivalentTo("")) + Expect(event.Data()).NotTo(BeEquivalentTo("")) }) It("does set correct type when handling chat event", func() { - c := make(chan *rcon.Event) + c := make(chan event.Event) + go con.Broker.Run(ctx) con.Subscribe(ctx, c) con.HandleServerMessage(ctx, []byte("(Group) Test")) event := <-c - Expect(event.Type).To(BeEquivalentTo(rcon.TypeChat)) + Expect(event.Kind()).To(BeEquivalentTo(string(rcon.TypeChat))) }) It("does return error if UDP.Write fails", func() { udp.WriteReturns(0, errors.New("test")) diff --git a/pkg/rcon/rcon.go b/pkg/rcon/rcon.go index 82e0636..4d26ae0 100644 --- a/pkg/rcon/rcon.go +++ b/pkg/rcon/rcon.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/playnet-public/gorcon/pkg/event" + "github.com/pkg/errors" ) @@ -27,7 +29,7 @@ type Connection interface { // Write a command to the connection and return the resulting transmission Write(context.Context, string) (Transmission, error) // Listen for events on the connection. - Subscribe(context.Context, chan *Event) + Subscribe(context.Context, chan<- event.Event) } // Client is the interface for specific rcon implementations which provides connections or acts as connection pool @@ -46,13 +48,36 @@ type Transmission interface { Response() string } -// Event describes an rcon event happening on the server and being received by the connection +// Event describes a log event emitted by the process +// TODO(kwiesmueller): rework this to a new Event interface used by all broker dependents type Event struct { - Timestamp time.Time - Type byte - Payload string + timestamp time.Time + kind byte + payload string } +// NewEvent of kind with data +func NewEvent(kind byte, data string) *Event { + return &Event{ + timestamp: time.Now().UTC(), + kind: kind, + payload: data, + } +} + +// Timestamp when the event occurred +func (e Event) Timestamp() time.Time { + return e.timestamp +} + +// Kind of the event +func (e Event) Kind() string { + return string(e.kind) +} + +// Data of the real event +func (e Event) Data() string { return e.payload } + // TypeEvent identifies default rcon events var TypeEvent byte = 0x00 diff --git a/pkg/watcher/event.go b/pkg/watcher/event.go index b7fcc0a..2df9179 100644 --- a/pkg/watcher/event.go +++ b/pkg/watcher/event.go @@ -3,8 +3,23 @@ package watcher import "time" // Event describes a log event emitted by the process +// TODO(kwiesmueller): rework this to a new Event interface used by all broker dependents type Event struct { - Timestamp time.Time - Type string - Payload string + timestamp time.Time + kind string + payload string } + +// Timestamp when the event occurred +func (e Event) Timestamp() time.Time { + return e.timestamp +} + +// Kind of the event +func (e Event) Kind() string { + return e.kind +} + +// Data of the real event +func (e Event) Data() string { return e.payload } + diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 5d02ab3..3b662c7 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -5,21 +5,22 @@ import ( "context" "errors" "io" - "sync" "time" - "gopkg.in/tomb.v2" + "github.com/playnet-public/gorcon/pkg/event" + + "github.com/seibert-media/golibs/log" + "go.uber.org/zap" ) // Watcher is responsible for starting and keeping a process alive type Watcher struct { - Process Process - - Tomb *tomb.Tomb - TLock sync.RWMutex + Process Process + closeFunc func() + close chan error - subscriptions []chan *Event - subscriptionsMutex sync.RWMutex + *event.Broker + events chan event.Event StopTimeout time.Duration } @@ -30,123 +31,114 @@ var ErrStopEvent = errors.New("received external stop event") // NewWatcher responsible for starting and keeping a process alive, restarting if necessary func NewWatcher(ctx context.Context, path string, args ...string) *Watcher { w := &Watcher{ - Process: nil, - subscriptions: []chan *Event{}, - StopTimeout: 5 * time.Second, + Process: nil, + close: make(chan error), + events: make(chan event.Event), + StopTimeout: 5 * time.Second, } - w.TLock.Lock() - defer w.TLock.Unlock() - w.Tomb, _ = tomb.WithContext(ctx) + return w } // Start the underlying process and handle events func (w *Watcher) Start(ctx context.Context) error { - w.TLock.Lock() - if !w.Tomb.Alive() { - w.Tomb, _ = tomb.WithContext(ctx) - } - w.TLock.Unlock() + ctx, w.closeFunc = context.WithCancel(ctx) + + go func() { + log.From(ctx).Debug("waiting for ctx to close", zap.String("span", "Watcher.Start")) + <-ctx.Done() + log.From(ctx).Debug("handling ctx close", zap.String("span", "Watcher.Start")) + go func() { w.close <- ctx.Err() }() + }() + rerr, stderr := io.Pipe() rout, stdout := io.Pipe() - w.TLock.RLock() - defer w.TLock.RUnlock() - - w.Tomb.Go(w.OutputHandler(rerr, "StdErr")) - w.Tomb.Go(func() error { - w.TLock.RLock() - defer w.TLock.RUnlock() - <-w.Tomb.Dying() - stderr.CloseWithError(w.Tomb.Err()) - return nil - }) - w.Tomb.Go(w.OutputHandler(rout, "StdOut")) - w.Tomb.Go(func() error { - w.TLock.RLock() - defer w.TLock.RUnlock() - <-w.Tomb.Dying() - stdout.CloseWithError(w.Tomb.Err()) - return nil - }) + go w.OutputHandler(ctx, rerr, "StdErr") + go func() { + log.From(ctx).Debug("waiting for ctx to close", zap.String("span", "OutputHandler.StdErr")) + <-ctx.Done() + log.From(ctx).Debug("handling ctx close", zap.String("span", "OutputHandler.StdErr")) + stderr.CloseWithError(ctx.Err()) + }() + go w.OutputHandler(ctx, rout, "StdOut") + go func() { + log.From(ctx).Debug("waiting for ctx to close", zap.String("span", "OutputHandler.StdOut")) + <-ctx.Done() + log.From(ctx).Debug("handling ctx close", zap.String("span", "OutputHandler.StdOut")) + stdout.CloseWithError(ctx.Err()) + }() + + go func() { + w.Broker = event.NewBroker(ctx, w.events) + log.From(ctx).Debug("running broker") + if err := w.Broker.Run(ctx); err != nil { + log.From(ctx).Error("running broker", zap.Error(err)) + w.close <- err + } + }() w.Process.SetOut(stderr, stdout) - w.Tomb.Go(w.Process.Run) - w.Tomb.Go(func() error { - w.TLock.RLock() - defer w.TLock.RUnlock() - <-w.Tomb.Dying() - return w.Process.Stop() - }) - - return nil + go func() { + log.From(ctx).Debug("waiting for ctx to close", zap.String("span", "Process.Stop")) + <-ctx.Done() + log.From(ctx).Debug("stopping process") + if err := w.Process.Stop(); err != nil { + log.From(ctx).Error("stopping process", zap.Error(err)) + } + }() + + log.From(ctx).Debug("running process") + if err := w.Process.Run(); err != nil { + log.From(ctx).Error("running process", zap.Error(err)) + w.close <- err + return err + } + + return ctx.Err() } // Stop the underlying process and all event handling routines func (w *Watcher) Stop(ctx context.Context) error { - w.TLock.RLock() - defer w.TLock.RUnlock() - w.Tomb.Kill(ErrStopEvent) - select { - case <-w.Tomb.Dead(): - return nil - case <-time.After(w.StopTimeout): - return errors.New("timeout stopping watcher") - } + return w.Process.Stop() } // KeepAlive starts a go routine responsible for reviving the process once it dies func (w *Watcher) KeepAlive(ctx context.Context) { go func() { - w.TLock.RLock() - defer w.TLock.RUnlock() - <-w.Tomb.Dead() - if w.Tomb.Err() != ErrStopEvent { - go w.Start(ctx) + if err := <-w.close; err != ErrStopEvent { + log.From(ctx).Info("handling close event", zap.Error(err)) w.KeepAlive(ctx) - } - }() -} - -// Subscribe for events on the process until the context ends -func (w *Watcher) Subscribe(ctx context.Context, to chan *Event) { - w.subscriptionsMutex.Lock() - defer w.subscriptionsMutex.Unlock() - w.subscriptions = append(w.subscriptions, to) - go func() { - <-ctx.Done() - w.subscriptionsMutex.Lock() - defer w.subscriptionsMutex.Unlock() - for i, s := range w.subscriptions { - if s == to { - w.subscriptions[i] = w.subscriptions[len(w.subscriptions)-1] - w.subscriptions[len(w.subscriptions)-1] = nil - w.subscriptions = w.subscriptions[:len(w.subscriptions)-1] - } + go func() { + log.From(ctx).Debug("running process") + if err := w.Process.Run(); err != nil { + log.From(ctx).Error("running process", zap.Error(err)) + w.close <- err + } + }() } }() } // OutputHandler returns a function reading from io.Reader and creating events -func (w *Watcher) OutputHandler(r io.Reader, eventType string) func() error { +func (w *Watcher) OutputHandler(ctx context.Context, r io.Reader, eventType string) func() error { return func() error { scn := bufio.NewScanner(r) - for scn.Scan() { - w.publishEvent(&Event{ - Timestamp: time.Now(), - Type: eventType, - Payload: scn.Text(), - }) - continue + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if scn.Scan() { + w.events <- &Event{ + timestamp: time.Now(), + kind: eventType, + payload: scn.Text(), + } + continue + } + return errors.New("end of stream") + } } - return errors.New("end of stream") - } -} - -func (w *Watcher) publishEvent(e *Event) { - w.subscriptionsMutex.RLock() - defer w.subscriptionsMutex.RUnlock() - for _, l := range w.subscriptions { - go func(l chan *Event) { l <- e }(l) } } diff --git a/pkg/watcher/watcher_internal_test.go b/pkg/watcher/watcher_internal_test.go index b05d996..0e4557e 100644 --- a/pkg/watcher/watcher_internal_test.go +++ b/pkg/watcher/watcher_internal_test.go @@ -2,97 +2,69 @@ package watcher import ( "context" - "time" + "fmt" + "io" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) var _ = Describe("Watcher", func() { - var ( - w *Watcher - ctx context.Context - ) + // var ( + // w *Watcher + // ctx context.Context + // ) - BeforeEach(func() { + // BeforeEach(func() { + // ctx = context.Background() + // w = &Watcher{} + // }) + + setup := func() (ctx context.Context, w *Watcher) { ctx = context.Background() - w = &Watcher{} - }) + w = NewWatcher(ctx, "") + + return + } Describe("Subscribe", func() { + // BeforeEach(func() { + // w = &Watcher{ + // events: make(chan event.Event), + // } + // ctx = context.Background() + // }) + }) + + Describe("OutputHandler", func() { + var ( + re *io.PipeReader + wr *io.PipeWriter + ) BeforeEach(func() { - w = &Watcher{} - ctx = context.Background() + re, wr = io.Pipe() }) - It("does add subscription on Listen", func() { - l := len(w.subscriptions) - ctx, _ := context.WithCancel(ctx) - w.Subscribe(ctx, make(chan *Event)) - time.Sleep(1 * time.Second) - w.subscriptionsMutex.RLock() - Expect(len(w.subscriptions)).To(BeEquivalentTo(l + 1)) - w.subscriptionsMutex.RUnlock() + It("does return error on end of stream", func() { + ctx, w := setup() + + wr.Close() + Expect(w.OutputHandler(ctx, re, "test")()).NotTo(BeNil()) }) - It("does end subscription on closed context", func() { - ctx, close := context.WithCancel(ctx) - l := len(w.subscriptions) - w.Subscribe(ctx, make(chan *Event)) - w.subscriptionsMutex.RLock() - Expect(len(w.subscriptions)).To(BeEquivalentTo(l + 1)) - w.subscriptionsMutex.RUnlock() - close() - time.Sleep(1 * time.Second) - w.subscriptionsMutex.RLock() - Expect(len(w.subscriptions)).To(BeEquivalentTo(l)) - w.subscriptionsMutex.RUnlock() + It("does send event to subscriptions for each line", func() { + ctx, w := setup() + + go w.OutputHandler(ctx, re, "test")() + wr.Write([]byte(fmt.Sprintln("testLine1"))) + ev := <-w.events + Expect(ev.Kind()).To(BeEquivalentTo("test")) }) - It("does remove the correct subscription on closed context", func(done Done) { - ctx, _ := context.WithCancel(ctx) - c1 := make(chan *Event) - c2 := make(chan *Event) - ctx2, close2 := context.WithCancel(ctx) - c3 := make(chan *Event) - w.Subscribe(ctx, c1) - w.Subscribe(ctx2, c2) - w.Subscribe(ctx, c3) + It("does send event to subscriptions for each line", func() { + ctx, w := setup() - go func() { - Expect(<-c1).NotTo(BeNil()) - }() - go func() { - Expect(<-c2).NotTo(BeNil()) - }() - go func() { - Expect(<-c3).NotTo(BeNil()) - }() - e := &Event{} - w.subscriptionsMutex.RLock() - defer w.subscriptionsMutex.RUnlock() - for _, l := range w.subscriptions { - go func(l chan *Event) { l <- e }(l) - } - close2() - go func() { - Expect(<-c1).NotTo(BeNil()) - }() - go func() { - defer GinkgoRecover() - select { - case <-c2: - Fail("should not receive on c2") - case <-time.After(300 * time.Millisecond): - Expect(true).To(BeTrue()) - } - close(done) - }() - go func() { - Expect(<-c3).NotTo(BeNil()) - }() - w.subscriptionsMutex.RLock() - defer w.subscriptionsMutex.RUnlock() - for _, l := range w.subscriptions { - go func(l chan *Event) { l <- e }(l) - } + go w.OutputHandler(ctx, re, "test")() + wr.Write([]byte(fmt.Sprintln("testLine1"))) + ev := <-w.events + Expect(ev.Kind()).To(BeEquivalentTo("test")) }) }) }) diff --git a/pkg/watcher/watcher_test.go b/pkg/watcher/watcher_test.go index 7704448..aa1a503 100644 --- a/pkg/watcher/watcher_test.go +++ b/pkg/watcher/watcher_test.go @@ -3,183 +3,129 @@ package watcher_test import ( "context" "errors" - "fmt" - "io" "testing" "time" "github.com/playnet-public/gorcon/pkg/mocks" + "github.com/playnet-public/gorcon/pkg/watcher" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/playnet-public/gorcon/pkg/watcher" + "github.com/seibert-media/golibs/log" + "go.uber.org/zap" ) +const debug = false + func TestWatcher(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Watcher Suite") } var _ = Describe("Watcher", func() { - var ( - w *watcher.Watcher - ctx context.Context - p *mocks.Process - ) - - BeforeEach(func() { + setup := func(name string) (ctx context.Context, w *watcher.Watcher, p *mocks.Process) { ctx = context.Background() + l := log.New("", debug) + ctx = log.WithLogger(ctx, l.WithFields(zap.String("test", name))) w = watcher.NewWatcher(ctx, "") p = &mocks.Process{} w.Process = p - }) + + return + } Describe("Start", func() { - BeforeEach(func() { - w = watcher.NewWatcher(ctx, "") - p = &mocks.Process{} - w.Process = p - }) It("does not return error", func() { + ctx, w, _ := setup("Start.does not return error") + Expect(w.Start(ctx)).To(BeNil()) }) It("does set process outputs", func() { + ctx, w, p := setup("Start.does set process outputs") + w.Start(ctx) time.Sleep(100 * time.Millisecond) Expect(p.SetOutCallCount()).To(BeEquivalentTo(1)) }) It("does start process", func() { + ctx, w, p := setup("Start.does start process") + w.Start(ctx) time.Sleep(100 * time.Millisecond) Expect(p.RunCallCount()).To(BeEquivalentTo(1)) }) - It("does end OutputHandlers on tomb dying", func() { + It("does end OutputHandlers on closed context", func() { + ctx, w, p := setup("Start.does end OutputHandlers on closed context") + + // TODO: check for running functions to verify there are no leaks p.RunReturns(nil) + w.Stop(ctx) w.Start(ctx) - time.Sleep(100 * time.Millisecond) - w.TLock.RLock() - defer w.TLock.RUnlock() - w.Tomb.Kill(nil) - select { - case <-time.After(1 * time.Second): - Fail("did not exit") - case <-w.Tomb.Dead(): - return - } }) - }) - - Describe("Stop", func() { - BeforeEach(func() { + It("does exit all functions on ctx close", func() { + ctx, w, p := setup("Start.does exit all functions on ctx close") + ctx, close := context.WithCancel(ctx) w = watcher.NewWatcher(ctx, "") - p = &mocks.Process{} w.Process = p + + w.Start(ctx) + close() + <-time.After(10 * time.Millisecond) + Expect(p.StopCallCount()).To(BeEquivalentTo(1)) }) - It("does return nil on successful stop", func() { - w.TLock.RLock() - defer w.TLock.RUnlock() - w.Tomb.Go(func() error { - w.TLock.RLock() - defer w.TLock.RUnlock() - <-w.Tomb.Dying() - return nil - }) + }) + + Describe("Stop", func() { + It("does return nil", func() { + ctx, w, p := setup("Stop.does return nil") + Expect(w.Stop(ctx)).To(BeNil()) - }) - It("does return error on kill timeout", func() { - w := watcher.NewWatcher(ctx, "") - p := &mocks.Process{} - w.Process = p - w.TLock.RLock() - defer w.TLock.RUnlock() - w.Tomb.Go(func() error { - w.TLock.RLock() - defer w.TLock.RUnlock() - <-w.Tomb.Dying() - time.Sleep(100 * time.Millisecond) - return nil - }) - w.StopTimeout = 1 * time.Millisecond - Expect(w.Stop(ctx)).NotTo(BeNil()) + Expect(p.StopCallCount()).To(BeEquivalentTo(1)) }) }) Describe("KeepAlive", func() { It("does revive the watcher once dead", func() { + ctx, w, p := setup("KeepAlive.does revive the watcher once dead") + w.Start(ctx) w.KeepAlive(ctx) - w.TLock.RLock() - w.Tomb.Kill(errors.New("some test crash")) - <-w.Tomb.Dead() - w.TLock.RUnlock() - time.Sleep(100 * time.Millisecond) - w.TLock.RLock() - defer w.TLock.RUnlock() - Expect(w.Tomb.Alive()).To(BeTrue()) + w.Stop(ctx) + + Expect(p.StopCallCount()).To(BeEquivalentTo(1)) }) It("does revive the watcher on multiple deaths", func() { + ctx, w, p := setup("KeepAlive.does revive the watcher on multiple deaths") + w.Start(ctx) w.KeepAlive(ctx) - w.TLock.RLock() - w.Tomb.Kill(errors.New("some test crash")) - <-w.Tomb.Dead() - w.TLock.RUnlock() - time.Sleep(100 * time.Millisecond) - w.TLock.RLock() - w.Tomb.Kill(errors.New("some test crash")) - w.TLock.RUnlock() - time.Sleep(100 * time.Millisecond) - w.TLock.RLock() - defer w.TLock.RUnlock() - Expect(w.Tomb.Alive()).To(BeTrue()) + w.Stop(ctx) + Expect(p.StopCallCount()).To(BeEquivalentTo(1)) + time.Sleep(10 * time.Millisecond) + w.Stop(ctx) + Expect(p.StopCallCount()).To(BeEquivalentTo(2)) + }) It("does not revive the wacher on ordered death (Stop)", func() { + ctx, w, p := setup("KeepAlive.does not revive the wacher on ordered death (Stop)") + w.Start(ctx) w.KeepAlive(ctx) w.Stop(ctx) - w.TLock.RLock() - <-w.Tomb.Dead() - w.TLock.RUnlock() - time.Sleep(100 * time.Millisecond) - w.TLock.RLock() - defer w.TLock.RUnlock() - Expect(w.Tomb.Alive()).To(BeFalse()) + Expect(p.StopCallCount()).To(BeEquivalentTo(1)) }) - }) + It("does restart process on stop", func() { + ctx, w, p := setup("KeepAlive.does restart process on stop") - Describe("OutputHandler", func() { - var ( - re *io.PipeReader - wr *io.PipeWriter - ) - BeforeEach(func() { - re, wr = io.Pipe() - }) - It("does return error on end of stream", func() { - wr.Close() - Expect(w.OutputHandler(re, "test")()).NotTo(BeNil()) - }) - It("does send event to subscriptions for each line", func() { - w.TLock.RLock() - defer w.TLock.RUnlock() - w.Tomb.Go(w.OutputHandler(re, "test")) - c := make(chan *watcher.Event) - w.Subscribe(ctx, c) - time.Sleep(500 * time.Millisecond) - wr.Write([]byte(fmt.Sprintln("testLine1"))) - ev := <-c - Expect(ev.Type).To(BeEquivalentTo("test")) - }) - It("does send event to subscriptions for each line", func() { - w.TLock.RLock() - defer w.TLock.RUnlock() - w.Tomb.Go(w.OutputHandler(re, "test")) - c := make(chan *watcher.Event) - w.Subscribe(ctx, c) - time.Sleep(500 * time.Millisecond) - wr.Write([]byte(fmt.Sprintln("testLine1"))) - ev := <-c - Expect(ev.Type).To(BeEquivalentTo("test")) + p.RunReturns(nil) + // p.RunReturnsOnCall(0, nil) + p.RunReturnsOnCall(0, nil) + p.RunReturnsOnCall(1, errors.New("test err")) + p.RunReturnsOnCall(2, nil) + + w.KeepAlive(ctx) + Expect(w.Start(ctx)).To(BeNil()) + Expect(p.RunCallCount()).To(BeEquivalentTo(1)) }) }) }) From 574699be8c92851b14529901300500ab99e862d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kevin=20Wiesm=C3=BCller?= Date: Tue, 26 Mar 2019 21:42:58 +0100 Subject: [PATCH 2/2] add godacov to travis.yaml --- .travis.yml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index a32c16e..4d3b849 100755 --- a/.travis.yml +++ b/.travis.yml @@ -5,15 +5,16 @@ go: env: global: - - REPO: $TRAVIS_REPO_SLUG - - VERSION: $TRAVIS_TAG + - REPO: $TRAVIS_REPO_SLUG + - VERSION: $TRAVIS_TAG before_script: - make test - make check + - go get github.com/schrej/godacov - godacov -t $CODACY_TOKEN -r ./coverage.out -c $TRAVIS_COMMIT script: -- make docker -- docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD" quay.io -- make upload + - make docker + - docker login -u="$DOCKER_USERNAME" -p="$DOCKER_PASSWORD" quay.io + - make upload