Skip to content
This repository has been archived by the owner on Mar 14, 2024. It is now read-only.

Refactor 'evented data' so that it is the saga that is evented, rather than the data. #62

Merged
merged 2 commits into from
May 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions examples/banking/account/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,6 @@ func (a *Account) InstanceDescription() string {
)
}

// ApplyEvent updates the data to reflect the fact that ev has occurred.
func (a *Account) ApplyEvent(env ax.Envelope) {
switch ev := env.Message.(type) {
case *messages.AccountOpened:
a.AccountId = ev.AccountId
a.Name = ev.Name
a.IsOpen = true
case *messages.AccountCredited:
a.Balance += ev.Cents
case *messages.AccountDebited:
a.Balance -= ev.Cents
}
}

// AggregateRoot is a saga that implements the Account aggregate.
var AggregateRoot saga.Saga = &aggregateRoot{}

Expand Down Expand Up @@ -114,3 +100,19 @@ func (aggregateRoot) HandleMessage(

return
}

// ApplyEvent updates the data to reflect the fact that ev has occurred.
func (aggregateRoot) ApplyEvent(d saga.Data, env ax.Envelope) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this method is now on the aggregateRoot (the saga.Saga implementation), not the saga data itself.

acct := d.(*Account)

switch ev := env.Message.(type) {
case *messages.AccountOpened:
acct.AccountId = ev.AccountId
acct.Name = ev.Name
acct.IsOpen = true
case *messages.AccountCredited:
acct.Balance += ev.Cents
case *messages.AccountDebited:
acct.Balance -= ev.Cents
}
}
8 changes: 5 additions & 3 deletions src/ax/saga/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
)

// Applier is an implementation of ax.Sender that applies published
// events to an EventedData instance.
// events to saga data for evented sagas.
type Applier struct {
Data EventedData
Saga EventedSaga
Data Data

Next ax.Sender
}

Expand All @@ -31,7 +33,7 @@ func (s *Applier) PublishEvent(ctx context.Context, m ax.Event) (ax.Envelope, er
return ax.Envelope{}, err
}

s.Data.ApplyEvent(env)
s.Saga.ApplyEvent(s.Data, env)

return env, nil
}
13 changes: 0 additions & 13 deletions src/ax/saga/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package saga

import (
"github.com/golang/protobuf/proto"
"github.com/jmalloc/ax/src/ax"
)

// Data is an interface for application-defined data associated with a saga
Expand All @@ -20,15 +19,3 @@ type Data interface {
// https://github.com/golang/go/wiki/CodeReviewComments#error-strings
InstanceDescription() string
}

// EventedData is a specialization of Data for sagas that use events to update
// their state. Event-sourced sagas always use EventedData.
type EventedData interface {
Data

// ApplyEvent updates the data to reflect the fact that an event has
// occurred.
//
// It may panic if env.Message does not implement ax.Event.
ApplyEvent(env ax.Envelope)
}
5 changes: 2 additions & 3 deletions src/ax/saga/eventsourcing/messagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func applyEvents(
ctx context.Context,
tx persistence.Tx,
ms persistence.MessageStore,
sg saga.EventedSaga,
i *saga.Instance,
) error {
s, err := ms.OpenStream(
Expand All @@ -57,8 +58,6 @@ func applyEvents(
return err
}

data := i.Data.(saga.EventedData)

for {
ok, err := s.Next(ctx)
if !ok || err != nil {
Expand All @@ -78,7 +77,7 @@ func applyEvents(
)
}

data.ApplyEvent(env)
sg.ApplyEvent(i.Data, env)
i.Revision++
}
}
8 changes: 7 additions & 1 deletion src/ax/saga/eventsourcing/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ func (p *Persister) BeginUpdate(
}
}

if err := applyEvents(ctx, tx, p.MessageStore, &i); err != nil {
if err := applyEvents(
ctx,
tx,
p.MessageStore,
sg.(saga.EventedSaga),
&i,
); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions src/ax/saga/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func (h *MessageHandler) handleMessage(
panic("unit-of-work contains saga instance with nil data")
}

if d, ok := i.Data.(EventedData); ok {
s = &Applier{d, s}
if es, ok := h.Saga.(EventedSaga); ok {
s = &Applier{es, i.Data, s}
}

if err := h.Saga.HandleMessage(ctx, s, env, i); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions src/ax/saga/saga.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,19 @@ type Saga interface {
// that could not be found.
HandleNotFound(context.Context, ax.Sender, ax.Envelope) error
}

// EventedSaga is a saga that only mutates its data when an event occurs.
//
// CRUD sagas may be evented or non-evented, but eventsourced sagas are always
// evented.
//
// Implementors should take care not to mutate the saga data directly inside the
// saga HandleMessage() method, only in ApplyEvent().
type EventedSaga interface {
Saga

// ApplyEvent updates d to reflect the fact that an event has occurred.
//
// It may panic if env.Message does not implement ax.Event.
ApplyEvent(d Data, env ax.Envelope)
}