Skip to content

Commit

Permalink
feat(event): add handler.StartupQuery() option (closes #124)
Browse files Browse the repository at this point in the history
feat(handler.go): add DefaultStartupQuery function to construct default query for event handler startup
feat(handler.go): add startupQuery field to Handler struct to allow customization of startup query
feat(handler.go): modify Startup function to accept query options and merge with default query
feat(handler.go): add StartupQuery function to configure Handler's startup query
feat(handler.go): modify New function to set default startupQuery if not provided
feat(handler.go): modify startup function to use startupQuery when querying events from startupStore
test(handler_test.go): add tests for new Startup and StartupQuery functions and their effects on event handling
  • Loading branch information
bounoable committed Sep 20, 2023
1 parent 65e0bfd commit 5b8eb52
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 11 deletions.
56 changes: 47 additions & 9 deletions event/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ import (
// handler.
var ErrRunning = errors.New("event handler is already running")

// DefaultStartupQuery constructs a default query for the startup of an event
// handler. It uses the provided slice of event names to create a query that
// sorts events by their timestamps. This function is typically used to
// determine which events should be processed during the startup of an
// event handler.
func DefaultStartupQuery(events []string) query.Query {
return query.New(query.Name(events...), query.SortByTime())
}

// Handler is a type that processes events from an event bus. It associates
// event names with specific functions, which are called whenever their
// respective event occurs. Handler uses multiple workers to process events
Expand All @@ -30,6 +39,7 @@ var ErrRunning = errors.New("event handler is already running")
type Handler struct {
bus event.Bus
startupStore event.Store
startupQuery func(event.Query) event.Query
workers int

mux sync.RWMutex
Expand All @@ -44,13 +54,37 @@ type Handler struct {
// constructing a new [Handler] using the New function.
type Option func(*Handler)

// Startup sets the startup event store for a [Handler]. This store is used to
// handle events when the [Handler] starts up. The Startup option is typically
// used to initialize the system with initial event handling on startup or
// implement a "catch-up" mechanism for their event handlers.
func Startup(store event.Store) Option {
// Startup configures a [Handler] with a specified event store and options for
// querying events. It is used to setup the event store that the [Handler] will
// use to fetch events during startup. This can be used to initialize the system
// with initial event handling on startup or implement a "catch-up" mechanism
// for their event handlers. The query options allow customization of how the
// events are fetched from the store. The returned [Option] can be used when
// creating a new [Handler].
//
// If [query.Option]s are provided, they will be merged with the default query
// using [query.Merge]. If you want to _replace_ the default query, use the
// [StartupQuery] option instead of providing [query.Option]s to [Startup].
func Startup(store event.Store, opts ...query.Option) Option {
return func(h *Handler) {
h.startupStore = store
if len(opts) > 0 {
StartupQuery(func(q event.Query) event.Query {
return query.Merge(q, query.New(opts...))
})(h)
}
}
}

// StartupQuery is a function that configures a [Handler]'s startup query. It
// accepts a function that takes and returns an event.Query as its argument. The
// provided function will be used by the [Handler] to modify the default query
// used when fetching events from the event store during startup. The resulting
// [Option] can be used when constructing a new [Handler], allowing
// customization of the startup behavior of the [Handler].
func StartupQuery(fn func(event.Query) event.Query) Option {
return func(h *Handler) {
h.startupQuery = fn
}
}

Expand Down Expand Up @@ -89,6 +123,11 @@ func New(bus event.Bus, opts ...Option) *Handler {
if h.workers < 1 {
h.workers = 1
}

if h.startupQuery == nil && h.startupStore != nil {
h.startupQuery = func(q event.Query) event.Query { return q }
}

return h
}

Expand Down Expand Up @@ -205,10 +244,9 @@ func (h *Handler) handleEvents(ctx context.Context, events <-chan event.Event) <
}

func (h *Handler) startup(ctx context.Context, eventNames []string) error {
str, errs, err := h.startupStore.Query(ctx, query.New(
query.Name(eventNames...),
query.SortByTime(),
))
q := h.startupQuery(DefaultStartupQuery(eventNames))

str, errs, err := h.startupStore.Query(ctx, q)
if err != nil {
return fmt.Errorf("query events %v: %w", eventNames, err)
}
Expand Down
175 changes: 173 additions & 2 deletions event/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/modernice/goes/event"
"github.com/modernice/goes/event/eventbus"
"github.com/modernice/goes/event/eventstore"
"github.com/modernice/goes/event/handler"
"github.com/modernice/goes/event/query"
"github.com/modernice/goes/event/test"
)

Expand Down Expand Up @@ -57,13 +59,13 @@ func TestHandler(t *testing.T) {
}
}

func TestWithStore(t *testing.T) {
func TestStartup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bus := eventbus.New()
store := eventstore.New()
h := handler.New(bus, handler.WithStore(store))
h := handler.New(bus, handler.Startup(store))

fooHandled := make(chan event.Of[test.FooEventData])
barHandled := make(chan event.Of[test.BarEventData])
Expand Down Expand Up @@ -102,3 +104,172 @@ func TestWithStore(t *testing.T) {
case <-barHandled:
}
}

func TestStartupQuery(t *testing.T) {
bus := eventbus.New()
store := eventstore.New()

h := handler.New(bus, handler.Startup(store), handler.StartupQuery(func(event.Query) event.Query {
return query.New(query.Name("bar"))
}))

fooHandled := make(chan event.Of[test.FooEventData])
barHandled := make(chan event.Of[test.BarEventData])

h.RegisterEventHandler("foo", func(evt event.Event) { fooHandled <- event.Cast[test.FooEventData](evt) })
h.RegisterEventHandler("bar", func(evt event.Event) { barHandled <- event.Cast[test.BarEventData](evt) })

if err := store.Insert(
context.Background(),
event.New("foo", test.FooEventData{}).Any(),
event.New("bar", test.BarEventData{}).Any(),
); err != nil {
t.Fatalf("Insert() failed with %q", err)
}

errs, err := h.Run(context.Background())
if err != nil {
t.Fatalf("Run() failed with %q", err)
}

go func() {
for err := range errs {
panic(err)
}
}()

select {
case <-time.After(time.Second):
t.Fatalf("bar event was not handled")
case <-barHandled:
}

select {
case <-time.After(50 * time.Millisecond):
case <-fooHandled:
t.Fatalf("foo event was handled")
}
}

func TestStartup_withQuery_merges_names(t *testing.T) {
bus := eventbus.New()
store := eventstore.New()

testID := uuid.New()

h := handler.New(bus, handler.Startup(store, query.Name("bar")))

fooHandled := make(chan event.Of[test.FooEventData])
barHandled := make(chan event.Of[test.BarEventData])

h.RegisterEventHandler("foo", func(evt event.Event) {
t.Log("Handling foo event")
fooHandled <- event.Cast[test.FooEventData](evt)
})
h.RegisterEventHandler("bar", func(evt event.Event) {
t.Log("Handling bar event")
barHandled <- event.Cast[test.BarEventData](evt)
})

t1 := time.Now().Add(time.Minute)
t2 := t1.Add(time.Second)

if err := store.Insert(
context.Background(),
event.New("foo", test.FooEventData{}).Any(),
event.New("bar", test.BarEventData{}, event.Time(t2)).Any(),
event.New("bar", test.BarEventData{}, event.Time(t1), event.ID(testID)).Any(),
); err != nil {
t.Fatalf("Insert() failed with %q", err)
}

errs, err := h.Run(context.Background())
if err != nil {
t.Fatalf("Run() failed with %q", err)
}

go func() {
for err := range errs {
panic(err)
}
}()

select {
case <-time.After(time.Second):
t.Fatalf("foo event was not handled")
case <-fooHandled:
}

select {
case <-time.After(time.Second):
t.Fatalf("bar event was not handled #1")
case evt := <-barHandled:
if evt.ID() != testID {
t.Fatalf("expected event ID %q; got %q", testID, evt.ID())
}
}

select {
case <-time.After(time.Second):
t.Fatalf("bar event was not handled #2")
case evt := <-barHandled:
if evt.ID() == testID {
t.Fatalf("expected event ID not to be %q; got %q", testID, evt.ID())
}
}
}

func TestStartup_withQuery_merges_ids(t *testing.T) {
bus := eventbus.New()
store := eventstore.New()

testID := uuid.New()

h := handler.New(bus, handler.Startup(store, query.ID(testID)))

fooHandled := make(chan event.Of[test.FooEventData])
barHandled := make(chan event.Of[test.BarEventData])

h.RegisterEventHandler("foo", func(evt event.Event) {
t.Log("Handling foo event")
fooHandled <- event.Cast[test.FooEventData](evt)
})
h.RegisterEventHandler("bar", func(evt event.Event) {
t.Log("Handling bar event")
barHandled <- event.Cast[test.BarEventData](evt)
})

if err := store.Insert(
context.Background(),
event.New("foo", test.FooEventData{}).Any(),
event.New("bar", test.BarEventData{}, event.ID(testID)).Any(),
); err != nil {
t.Fatalf("Insert() failed with %q", err)
}

errs, err := h.Run(context.Background())
if err != nil {
t.Fatalf("Run() failed with %q", err)
}

go func() {
for err := range errs {
panic(err)
}
}()

select {
case <-time.After(50 * time.Millisecond):
case <-fooHandled:
t.Fatalf("foo event was handled")
}

select {
case <-time.After(time.Second):
t.Fatalf("bar event was not handled")
case evt := <-barHandled:
if evt.ID() != testID {
t.Fatalf("expected event ID %q; got %q", testID, evt.ID())
}
}
}

0 comments on commit 5b8eb52

Please sign in to comment.