From 61623c3b9a092cd7d6569a7f247d71854fa6374e Mon Sep 17 00:00:00 2001 From: Peter Wood Date: Sun, 31 Dec 2023 14:33:44 +0000 Subject: [PATCH] Add handling for sending and receiving events from the da. --- da_events.go | 24 +++++++++++++++++------- da_events_test.go | 37 ++++++++++++++++++++++++++++++++++++- gateway.go | 4 ++++ 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/da_events.go b/da_events.go index de7560a..6b16750 100644 --- a/da_events.go +++ b/da_events.go @@ -1,17 +1,27 @@ package zda -import "context" +import ( + "context" + "github.com/shimmeringbee/logwrap" +) type eventSender interface { sendEvent(event interface{}) } -func (g *gateway) sendEvent(event interface{}) { - //TODO implement me - panic("implement me") +func (g *gateway) sendEvent(e interface{}) { + select { + case g.events <- e: + default: + g.logger.LogError(g.ctx, "failed to send event, channel buffer is full", logwrap.Datum("event", e)) + } } -func (g *gateway) ReadEvent(_ context.Context) (interface{}, error) { - //TODO implement me - panic("implement me") +func (g *gateway) ReadEvent(ctx context.Context) (interface{}, error) { + select { + case e := <-g.events: + return e, nil + case <-ctx.Done(): + return nil, context.DeadlineExceeded + } } diff --git a/da_events_test.go b/da_events_test.go index bbf50b2..9948671 100644 --- a/da_events_test.go +++ b/da_events_test.go @@ -1,6 +1,12 @@ package zda -import "github.com/stretchr/testify/mock" +import ( + "context" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "testing" + "time" +) type mockEventSender struct { mock.Mock @@ -9,3 +15,32 @@ type mockEventSender struct { func (m *mockEventSender) sendEvent(event interface{}) { m.Called(event) } + +func TestZigbeeGateway_ReadEvent(t *testing.T) { + t.Run("context which expires should result in error", func(t *testing.T) { + zgw := New(context.Background(), nil, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + + _, err := zgw.ReadEvent(ctx) + assert.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("sent events are received through ReadEvent", func(t *testing.T) { + zgw := New(context.Background(), nil, nil).(*gateway) + + ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond) + defer cancel() + + expectedEvent := struct{}{} + + go func() { + zgw.sendEvent(expectedEvent) + }() + + actualEvent, err := zgw.ReadEvent(ctx) + assert.NoError(t, err) + assert.Equal(t, expectedEvent, actualEvent) + }) +} diff --git a/gateway.go b/gateway.go index 4ae4d3c..6bd9fd5 100644 --- a/gateway.go +++ b/gateway.go @@ -33,6 +33,8 @@ func New(baseCtx context.Context, p zigbee.Provider, r ruleExecutor) da.Gateway callbacks: callbacks.Create(), ruleExecutor: r, + + events: make(chan interface{}, 0xffff), } gw.WithGoLogger(log.New(os.Stderr, "", log.LstdFlags)) @@ -57,6 +59,8 @@ type gateway struct { callbacks callbacks.AdderCaller ruleExecutor ruleExecutor + + events chan interface{} } func (g *gateway) Capabilities() []da.Capability {