diff --git a/Makefile b/Makefile index be610e0..2b73a19 100644 --- a/Makefile +++ b/Makefile @@ -2,4 +2,5 @@ test-example: @go run github.com/onsi/ginkgo/v2/ginkgo -v ./example/testing test: + @go test -v ./internal/... ./pkg/... @go run github.com/onsi/ginkgo/v2/ginkgo -v -p --race ./tests diff --git a/go.mod b/go.mod index 6597cf1..207365e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/gothunder/thunder -go 1.19 +go 1.21 + +toolchain go1.21.5 require ( entgo.io/ent v0.12.4 @@ -8,7 +10,7 @@ require ( github.com/TheRafaBonin/roxy v0.6.1 github.com/cenkalti/backoff/v4 v4.2.0 github.com/go-chi/chi/v5 v5.0.8 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.3.1 github.com/mattn/go-sqlite3 v1.14.16 github.com/onsi/ginkgo/v2 v2.8.0 github.com/onsi/gomega v1.25.0 @@ -33,13 +35,16 @@ require ( github.com/go-openapi/inflect v0.19.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/hashicorp/hcl/v2 v2.13.0 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect + github.com/oklog/ulid v1.3.1 // indirect github.com/zclconf/go-cty v1.8.0 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect google.golang.org/protobuf v1.30.0 // indirect ) require ( + github.com/ThreeDotsLabs/watermill v1.3.5 github.com/agnivade/levenshtein v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect diff --git a/go.sum b/go.sum index a8c3be4..124c5f8 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/TheRafaBonin/roxy v0.6.1 h1:uW7ttgqtww4ipH82smosk6SZDz3GjJM5mC/GRWsi+2Q= github.com/TheRafaBonin/roxy v0.6.1/go.mod h1:wLE6Upm7KEkwpYA6LzPBMHlyXSXiMe/fWwHi/+5Fk1w= +github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg= +github.com/ThreeDotsLabs/watermill v1.3.5/go.mod h1:O/u/Ptyrk5MPTxSeWM5vzTtZcZfxXfO9PK9eXTYiFZY= github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8= github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/agnivade/levenshtein v1.0.1/go.mod h1:CURSv5d9Uaml+FovSIICkLbAUZ9S4RqaHDIsdSBg7lM= @@ -164,8 +166,11 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= @@ -194,6 +199,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348 h1:MtvEpTB6LX3vkb4ax0b5D2DHbNAUsen0Gx5wZoq3lV4= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/logrusorgru/aurora/v3 v3.0.0/go.mod h1:vsR12bk5grlLvLXAYrBsb5Oc/N+LxAlxggSjiwMnCUc= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= @@ -204,6 +211,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= @@ -212,6 +221,10 @@ github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 h1:DpOJ2HYzC github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo/v2 v2.8.0 h1:pAM+oBNPrpXRs+E/8spkeGx9QgekbRVyr74EUvRVOUI= github.com/onsi/ginkgo/v2 v2.8.0/go.mod h1:6JsQiECmxCa3V5st74AL/AmsV482EDdVrGaVW6z3oYU= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= diff --git a/tests/entInit/client.go b/internal/events/outbox/ent/entInit/client.go similarity index 98% rename from tests/entInit/client.go rename to internal/events/outbox/ent/entInit/client.go index 26f545b..1ef8054 100644 --- a/tests/entInit/client.go +++ b/internal/events/outbox/ent/entInit/client.go @@ -10,12 +10,12 @@ import ( "reflect" "github.com/google/uuid" - "github.com/gothunder/thunder/tests/entInit/migrate" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/migrate" "entgo.io/ent" "entgo.io/ent/dialect" "entgo.io/ent/dialect/sql" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" ) // Client is the client that holds all ent builders. diff --git a/tests/entInit/ent.go b/internal/events/outbox/ent/entInit/ent.go similarity index 99% rename from tests/entInit/ent.go rename to internal/events/outbox/ent/entInit/ent.go index f30a075..be1eb9c 100644 --- a/tests/entInit/ent.go +++ b/internal/events/outbox/ent/entInit/ent.go @@ -12,7 +12,7 @@ import ( "entgo.io/ent" "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" ) // ent aliases to avoid import conflicts in user's code. diff --git a/tests/entInit/enttest/enttest.go b/internal/events/outbox/ent/entInit/enttest/enttest.go similarity index 89% rename from tests/entInit/enttest/enttest.go rename to internal/events/outbox/ent/entInit/enttest/enttest.go index 4ff2494..00fc432 100644 --- a/tests/entInit/enttest/enttest.go +++ b/internal/events/outbox/ent/entInit/enttest/enttest.go @@ -5,12 +5,12 @@ package enttest import ( "context" - "github.com/gothunder/thunder/tests/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" // required by schema hooks. - _ "github.com/gothunder/thunder/tests/entInit/runtime" + _ "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/runtime" "entgo.io/ent/dialect/sql/schema" - "github.com/gothunder/thunder/tests/entInit/migrate" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/migrate" ) type ( diff --git a/internal/events/outbox/ent/entInit/generate.go b/internal/events/outbox/ent/entInit/generate.go new file mode 100644 index 0000000..84b7f43 --- /dev/null +++ b/internal/events/outbox/ent/entInit/generate.go @@ -0,0 +1,3 @@ +package entInit + +//go:generate go run -mod=mod entgo.io/ent/cmd/ent generate --feature intercept ./schema diff --git a/tests/entInit/hook/hook.go b/internal/events/outbox/ent/entInit/hook/hook.go similarity index 98% rename from tests/entInit/hook/hook.go rename to internal/events/outbox/ent/entInit/hook/hook.go index 900e12e..e5346b1 100644 --- a/tests/entInit/hook/hook.go +++ b/internal/events/outbox/ent/entInit/hook/hook.go @@ -6,7 +6,7 @@ import ( "context" "fmt" - "github.com/gothunder/thunder/tests/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" ) // The OutboxMessageFunc type is an adapter to allow the use of ordinary diff --git a/internal/events/outbox/ent/entInit/intercept/intercept.go b/internal/events/outbox/ent/entInit/intercept/intercept.go new file mode 100644 index 0000000..8e72c41 --- /dev/null +++ b/internal/events/outbox/ent/entInit/intercept/intercept.go @@ -0,0 +1,149 @@ +// Code generated by ent, DO NOT EDIT. + +package intercept + +import ( + "context" + "fmt" + + "entgo.io/ent/dialect/sql" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/predicate" +) + +// The Query interface represents an operation that queries a graph. +// By using this interface, users can write generic code that manipulates +// query builders of different types. +type Query interface { + // Type returns the string representation of the query type. + Type() string + // Limit the number of records to be returned by this query. + Limit(int) + // Offset to start from. + Offset(int) + // Unique configures the query builder to filter duplicate records. + Unique(bool) + // Order specifies how the records should be ordered. + Order(...func(*sql.Selector)) + // WhereP appends storage-level predicates to the query builder. Using this method, users + // can use type-assertion to append predicates that do not depend on any generated package. + WhereP(...func(*sql.Selector)) +} + +// The Func type is an adapter that allows ordinary functions to be used as interceptors. +// Unlike traversal functions, interceptors are skipped during graph traversals. Note that the +// implementation of Func is different from the one defined in entgo.io/ent.InterceptFunc. +type Func func(context.Context, Query) error + +// Intercept calls f(ctx, q) and then applied the next Querier. +func (f Func) Intercept(next entInit.Querier) entInit.Querier { + return entInit.QuerierFunc(func(ctx context.Context, q entInit.Query) (entInit.Value, error) { + query, err := NewQuery(q) + if err != nil { + return nil, err + } + if err := f(ctx, query); err != nil { + return nil, err + } + return next.Query(ctx, q) + }) +} + +// The TraverseFunc type is an adapter to allow the use of ordinary function as Traverser. +// If f is a function with the appropriate signature, TraverseFunc(f) is a Traverser that calls f. +type TraverseFunc func(context.Context, Query) error + +// Intercept is a dummy implementation of Intercept that returns the next Querier in the pipeline. +func (f TraverseFunc) Intercept(next entInit.Querier) entInit.Querier { + return next +} + +// Traverse calls f(ctx, q). +func (f TraverseFunc) Traverse(ctx context.Context, q entInit.Query) error { + query, err := NewQuery(q) + if err != nil { + return err + } + return f(ctx, query) +} + +// The OutboxMessageFunc type is an adapter to allow the use of ordinary function as a Querier. +type OutboxMessageFunc func(context.Context, *entInit.OutboxMessageQuery) (entInit.Value, error) + +// Query calls f(ctx, q). +func (f OutboxMessageFunc) Query(ctx context.Context, q entInit.Query) (entInit.Value, error) { + if q, ok := q.(*entInit.OutboxMessageQuery); ok { + return f(ctx, q) + } + return nil, fmt.Errorf("unexpected query type %T. expect *entInit.OutboxMessageQuery", q) +} + +// The TraverseOutboxMessage type is an adapter to allow the use of ordinary function as Traverser. +type TraverseOutboxMessage func(context.Context, *entInit.OutboxMessageQuery) error + +// Intercept is a dummy implementation of Intercept that returns the next Querier in the pipeline. +func (f TraverseOutboxMessage) Intercept(next entInit.Querier) entInit.Querier { + return next +} + +// Traverse calls f(ctx, q). +func (f TraverseOutboxMessage) Traverse(ctx context.Context, q entInit.Query) error { + if q, ok := q.(*entInit.OutboxMessageQuery); ok { + return f(ctx, q) + } + return fmt.Errorf("unexpected query type %T. expect *entInit.OutboxMessageQuery", q) +} + +// NewQuery returns the generic Query interface for the given typed query. +func NewQuery(q entInit.Query) (Query, error) { + switch q := q.(type) { + case *entInit.OutboxMessageQuery: + return &query[*entInit.OutboxMessageQuery, predicate.OutboxMessage, outboxmessage.OrderOption]{typ: entInit.TypeOutboxMessage, tq: q}, nil + default: + return nil, fmt.Errorf("unknown query type %T", q) + } +} + +type query[T any, P ~func(*sql.Selector), R ~func(*sql.Selector)] struct { + typ string + tq interface { + Limit(int) T + Offset(int) T + Unique(bool) T + Order(...R) T + Where(...P) T + } +} + +func (q query[T, P, R]) Type() string { + return q.typ +} + +func (q query[T, P, R]) Limit(limit int) { + q.tq.Limit(limit) +} + +func (q query[T, P, R]) Offset(offset int) { + q.tq.Offset(offset) +} + +func (q query[T, P, R]) Unique(unique bool) { + q.tq.Unique(unique) +} + +func (q query[T, P, R]) Order(orders ...func(*sql.Selector)) { + rs := make([]R, len(orders)) + for i := range orders { + rs[i] = orders[i] + } + q.tq.Order(rs...) +} + +func (q query[T, P, R]) WhereP(ps ...func(*sql.Selector)) { + p := make([]P, len(ps)) + for i := range ps { + p[i] = ps[i] + } + q.tq.Where(p...) +} diff --git a/tests/entInit/migrate/migrate.go b/internal/events/outbox/ent/entInit/migrate/migrate.go similarity index 100% rename from tests/entInit/migrate/migrate.go rename to internal/events/outbox/ent/entInit/migrate/migrate.go diff --git a/tests/entInit/migrate/schema.go b/internal/events/outbox/ent/entInit/migrate/schema.go similarity index 100% rename from tests/entInit/migrate/schema.go rename to internal/events/outbox/ent/entInit/migrate/schema.go diff --git a/tests/entInit/mutation.go b/internal/events/outbox/ent/entInit/mutation.go similarity index 99% rename from tests/entInit/mutation.go rename to internal/events/outbox/ent/entInit/mutation.go index f200633..c00598f 100644 --- a/tests/entInit/mutation.go +++ b/internal/events/outbox/ent/entInit/mutation.go @@ -12,8 +12,8 @@ import ( "entgo.io/ent" "entgo.io/ent/dialect/sql" "github.com/google/uuid" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" - "github.com/gothunder/thunder/tests/entInit/predicate" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/predicate" ) const ( diff --git a/tests/entInit/outboxmessage.go b/internal/events/outbox/ent/entInit/outboxmessage.go similarity index 98% rename from tests/entInit/outboxmessage.go rename to internal/events/outbox/ent/entInit/outboxmessage.go index 8b623fe..c481e10 100644 --- a/tests/entInit/outboxmessage.go +++ b/internal/events/outbox/ent/entInit/outboxmessage.go @@ -11,7 +11,7 @@ import ( "entgo.io/ent" "entgo.io/ent/dialect/sql" "github.com/google/uuid" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" ) // OutboxMessage is the model entity for the OutboxMessage schema. diff --git a/tests/entInit/outboxmessage/outboxmessage.go b/internal/events/outbox/ent/entInit/outboxmessage/outboxmessage.go similarity index 100% rename from tests/entInit/outboxmessage/outboxmessage.go rename to internal/events/outbox/ent/entInit/outboxmessage/outboxmessage.go diff --git a/tests/entInit/outboxmessage/where.go b/internal/events/outbox/ent/entInit/outboxmessage/where.go similarity index 99% rename from tests/entInit/outboxmessage/where.go rename to internal/events/outbox/ent/entInit/outboxmessage/where.go index e480012..26da587 100644 --- a/tests/entInit/outboxmessage/where.go +++ b/internal/events/outbox/ent/entInit/outboxmessage/where.go @@ -7,7 +7,7 @@ import ( "entgo.io/ent/dialect/sql" "github.com/google/uuid" - "github.com/gothunder/thunder/tests/entInit/predicate" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/predicate" ) // ID filters vertices based on their ID field. diff --git a/tests/entInit/outboxmessage_create.go b/internal/events/outbox/ent/entInit/outboxmessage_create.go similarity index 99% rename from tests/entInit/outboxmessage_create.go rename to internal/events/outbox/ent/entInit/outboxmessage_create.go index 9729cbc..58323ab 100644 --- a/tests/entInit/outboxmessage_create.go +++ b/internal/events/outbox/ent/entInit/outboxmessage_create.go @@ -11,7 +11,7 @@ import ( "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" "github.com/google/uuid" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" ) // OutboxMessageCreate is the builder for creating a OutboxMessage entity. diff --git a/tests/entInit/outboxmessage_delete.go b/internal/events/outbox/ent/entInit/outboxmessage_delete.go similarity index 93% rename from tests/entInit/outboxmessage_delete.go rename to internal/events/outbox/ent/entInit/outboxmessage_delete.go index a3969d5..73ea159 100644 --- a/tests/entInit/outboxmessage_delete.go +++ b/internal/events/outbox/ent/entInit/outboxmessage_delete.go @@ -8,8 +8,8 @@ import ( "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" - "github.com/gothunder/thunder/tests/entInit/predicate" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/predicate" ) // OutboxMessageDelete is the builder for deleting a OutboxMessage entity. diff --git a/tests/entInit/outboxmessage_query.go b/internal/events/outbox/ent/entInit/outboxmessage_query.go similarity index 98% rename from tests/entInit/outboxmessage_query.go rename to internal/events/outbox/ent/entInit/outboxmessage_query.go index 6c62ccc..60cc3ad 100644 --- a/tests/entInit/outboxmessage_query.go +++ b/internal/events/outbox/ent/entInit/outboxmessage_query.go @@ -11,8 +11,8 @@ import ( "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" "github.com/google/uuid" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" - "github.com/gothunder/thunder/tests/entInit/predicate" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/predicate" ) // OutboxMessageQuery is the builder for querying OutboxMessage entities. diff --git a/tests/entInit/outboxmessage_update.go b/internal/events/outbox/ent/entInit/outboxmessage_update.go similarity index 98% rename from tests/entInit/outboxmessage_update.go rename to internal/events/outbox/ent/entInit/outboxmessage_update.go index bc643b7..ae2ef5f 100644 --- a/tests/entInit/outboxmessage_update.go +++ b/internal/events/outbox/ent/entInit/outboxmessage_update.go @@ -11,8 +11,8 @@ import ( "entgo.io/ent/dialect/sql" "entgo.io/ent/dialect/sql/sqlgraph" "entgo.io/ent/schema/field" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" - "github.com/gothunder/thunder/tests/entInit/predicate" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/predicate" ) // OutboxMessageUpdate is the builder for updating OutboxMessage entities. diff --git a/tests/entInit/predicate/predicate.go b/internal/events/outbox/ent/entInit/predicate/predicate.go similarity index 100% rename from tests/entInit/predicate/predicate.go rename to internal/events/outbox/ent/entInit/predicate/predicate.go diff --git a/tests/entInit/runtime.go b/internal/events/outbox/ent/entInit/runtime.go similarity index 92% rename from tests/entInit/runtime.go rename to internal/events/outbox/ent/entInit/runtime.go index 4ddf892..1743425 100644 --- a/tests/entInit/runtime.go +++ b/internal/events/outbox/ent/entInit/runtime.go @@ -6,8 +6,8 @@ import ( "time" "github.com/google/uuid" - "github.com/gothunder/thunder/tests/entInit/outboxmessage" - "github.com/gothunder/thunder/tests/entInit/schema" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/schema" ) // The init function reads all schema descriptors with runtime code diff --git a/tests/entInit/runtime/runtime.go b/internal/events/outbox/ent/entInit/runtime/runtime.go similarity index 84% rename from tests/entInit/runtime/runtime.go rename to internal/events/outbox/ent/entInit/runtime/runtime.go index c01e83e..7cbad17 100644 --- a/tests/entInit/runtime/runtime.go +++ b/internal/events/outbox/ent/entInit/runtime/runtime.go @@ -2,7 +2,7 @@ package runtime -// The schema-stitching logic is generated in github.com/gothunder/thunder/tests/entInit/runtime.go +// The schema-stitching logic is generated in github.com/gothunder/thunder/internal/events/outbox/ent/entInit/runtime.go const ( Version = "v0.12.4" // Version of ent codegen. diff --git a/tests/entInit/schema/OutboxMessage.go b/internal/events/outbox/ent/entInit/schema/OutboxMessage.go similarity index 100% rename from tests/entInit/schema/OutboxMessage.go rename to internal/events/outbox/ent/entInit/schema/OutboxMessage.go diff --git a/tests/entInit/tx.go b/internal/events/outbox/ent/entInit/tx.go similarity index 100% rename from tests/entInit/tx.go rename to internal/events/outbox/ent/entInit/tx.go diff --git a/internal/events/outbox/ent/ent_test.go b/internal/events/outbox/ent/ent_test.go new file mode 100644 index 0000000..e471383 --- /dev/null +++ b/internal/events/outbox/ent/ent_test.go @@ -0,0 +1,28 @@ +package outboxent + +import ( + "context" + "testing" + + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/enttest" + + "entgo.io/ent/dialect" + _ "github.com/mattn/go-sqlite3" +) + +func setupEnt(t *testing.T) *entInit.Client { + return enttest.Open(t, dialect.SQLite, "file:ent?mode=memory&cache=shared&_fk=1") +} + +func populateOutboxMessages(ctx context.Context, client *entInit.Client, messages []entInit.OutboxMessage) error { + return client.OutboxMessage.MapCreateBulk(messages, func(omc *entInit.OutboxMessageCreate, i int) { + omc.SetID(messages[i].ID) + omc.SetPayload(messages[i].Payload) + omc.SetTopic(messages[i].Topic) + omc.SetCreatedAt(messages[i].CreatedAt) + if !messages[i].DeliveredAt.IsZero() { + omc.SetDeliveredAt(messages[i].DeliveredAt) + } + }).Exec(ctx) +} diff --git a/internal/events/outbox/ent/messageMarker.go b/internal/events/outbox/ent/messageMarker.go new file mode 100644 index 0000000..aa41977 --- /dev/null +++ b/internal/events/outbox/ent/messageMarker.go @@ -0,0 +1,128 @@ +package outboxent + +import ( + "context" + "reflect" + "time" + + "entgo.io/ent/dialect/sql" + "github.com/TheRafaBonin/roxy" + "github.com/google/uuid" + "github.com/gothunder/thunder/internal/events/outbox" + "github.com/gothunder/thunder/internal/utils" +) + +const ( + // fieldID is the id field for the OutboxMessage entity. + fieldID = "id" + // fieldDeliveredAt holds the string denoting the delivered_at field in the database. + fieldDeliveredAt = "delivered_at" + + // methods + methodUpdate = "Update" + methodSetDeliveredAt = "SetDeliveredAt" + methodUpdateWhere = "Where" + methodUpdateExec = "Exec" +) + +func NewEntMessageMarker(outboxMessageClient interface{}) (outbox.MessageMarker, error) { + if outboxMessageClient == nil || !utils.HasMethod(outboxMessageClient, "Update") { + return nil, roxy.Wrap(utils.ErrMethodNotFound, "creating ent message marker") + } + + return &entMessageMarker{ + client: outboxMessageClient, + }, nil +} + +type entMessageMarker struct { + client interface{} +} + +// MarkAsPublished implements outbox.MessageMarker. +func (e entMessageMarker) MarkAsPublished(ctx context.Context, msgPack []outbox.Message) error { + ids := make([]uuid.UUID, len(msgPack)) + for i, msg := range msgPack { + ids[i] = msg.ID + } + + updateBuilder, err := newUpdateBuilder(e.client) + if err != nil { + return roxy.Wrap(err, "creating update builder") + } + + if err = updateBuilder.SetDeliveredAt(time.Now()); err != nil { + return roxy.Wrap(err, "setting delivered_at field") + } + + if err = updateBuilder.WhereDeliveryAtIsNilAndIDIn(ids); err != nil { + return roxy.Wrap(err, "setting where clause") + } + + return updateBuilder.Exec(ctx) +} + +type outboxMessageUpdateBuilder struct { + builder interface{} +} + +func newUpdateBuilder(client interface{}) (*outboxMessageUpdateBuilder, error) { + // initialize query builder + updateBuilder, err := utils.SafeCallMethod(client, methodUpdate, []reflect.Value{}) + if err != nil { + return nil, roxy.Wrap(err, "calling update method on ent client") + } + + return &outboxMessageUpdateBuilder{ + builder: updateBuilder[0].Interface(), + }, nil +} + +func (q *outboxMessageUpdateBuilder) SetDeliveredAt(deliveredAt time.Time) error { + _, err := utils.SafeCallMethod(q.builder, methodSetDeliveredAt, []reflect.Value{ + reflect.ValueOf(deliveredAt), + }) + if err != nil { + return roxy.Wrap(err, "calling SetDeliveredAt method on OutboxMessageUpdate") + } + return nil +} + +func (q *outboxMessageUpdateBuilder) WhereDeliveryAtIsNilAndIDIn(ids []uuid.UUID) error { + method, ok := reflect.TypeOf(q.builder).MethodByName(methodUpdateWhere) + if !ok { + return roxy.Wrap(utils.ErrMethodNotFound, "getting method Where of OutboxMessageUpdate") + } + + // Where + elemType := method.Type.In(1).Elem() + IDInClause := sql.FieldIn(fieldID, ids...) + deliveredAtNilClause := sql.FieldIsNull(fieldDeliveredAt) + whereIDInAndDeliveredAtNil := reflect.ValueOf( + sql.AndPredicates(IDInClause, deliveredAtNilClause), + ).Convert(elemType) + + _, err := utils.SafeCallMethod(q.builder, methodUpdateWhere, []reflect.Value{ + whereIDInAndDeliveredAtNil, + }) + if err != nil { + return roxy.Wrap(err, "calling Where method on OutboxMessageUpdate") + } + + return nil +} + +func (q *outboxMessageUpdateBuilder) Exec(ctx context.Context) error { + result, err := utils.SafeCallMethod(q.builder, methodUpdateExec, []reflect.Value{ + reflect.ValueOf(ctx), + }) + if err != nil { + return roxy.Wrap(err, "calling Exec method on OutboxMessageUpdate") + } + + if result[0].Interface() != nil { + return result[0].Interface().(error) + } + + return nil +} diff --git a/internal/events/outbox/ent/messageMarker_test.go b/internal/events/outbox/ent/messageMarker_test.go new file mode 100644 index 0000000..055830c --- /dev/null +++ b/internal/events/outbox/ent/messageMarker_test.go @@ -0,0 +1,138 @@ +package outboxent + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/gothunder/thunder/internal/events/outbox" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" +) + +func TestNewEntMessageMarker(t *testing.T) { + entClient := setupEnt(t) + + type args struct { + outboxMessageClient interface{} + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "success", + args: args{ + outboxMessageClient: entClient.OutboxMessage, + }, + wantErr: false, + }, + { + name: "error", + args: args{ + outboxMessageClient: nil, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewEntMessageMarker(tt.args.outboxMessageClient) + if (err != nil) != tt.wantErr { + t.Errorf("NewEntMessageMarker() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.wantErr && got != nil { + t.Errorf("NewEntMessageMarker() got = %v, want nil", got) + } + }) + } +} + +func TestEntMessageMarker_MarkAsPublished(t *testing.T) { + type fields struct { + client *entInit.Client + } + type args struct { + ctx context.Context + msgPack []outbox.Message + } + tests := []struct { + name string + fields fields + args args + popuation []entInit.OutboxMessage + wantErr bool + }{ + { + name: "success", + fields: fields{ + client: setupEnt(t), + }, + args: args{ + ctx: context.Background(), + msgPack: []outbox.Message{ + { + ID: uuid.MustParse("14d8e114-71c0-4309-81aa-351d64dd9d74"), + }, + { + ID: uuid.MustParse("934997bd-eee8-4e1f-810a-4fd601ad8b9c"), + }, + }, + }, + popuation: []entInit.OutboxMessage{ + { + ID: uuid.MustParse("14d8e114-71c0-4309-81aa-351d64dd9d74"), + Topic: "topic", + Payload: []byte("payload"), + Headers: map[string]string{ + "key": "value", + }, + CreatedAt: time.Now(), + DeliveredAt: time.Now(), + }, + { + ID: uuid.MustParse("934997bd-eee8-4e1f-810a-4fd601ad8b9c"), + Topic: "topic", + Payload: []byte("payload"), + Headers: map[string]string{ + "key": "value", + }, + CreatedAt: time.Now(), + }, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + populateOutboxMessages(tt.args.ctx, tt.fields.client, tt.popuation) + marker, err := NewEntMessageMarker(tt.fields.client.OutboxMessage) + if err != nil { + t.Fatal(err) + } + + if err := marker.MarkAsPublished(tt.args.ctx, tt.args.msgPack); (err != nil) != tt.wantErr { + t.Errorf("EntMessageMarker.MarkAsPublished() error = %v, wantErr %v", err, tt.wantErr) + } + + // check if the messages were marked as delivered + for _, msg := range tt.args.msgPack { + entMsg, err := tt.fields.client.OutboxMessage. + Query(). + Where(outboxmessage.ID(msg.ID)). + First(context.Background()) + if err != nil { + t.Fatal(err) + } + + if entMsg.DeliveredAt.IsZero() { + t.Errorf("EntMessageMarker.MarkAsPublished() message %s was not marked as delivered", msg.ID) + } + } + }) + } +} diff --git a/internal/events/outbox/ent/messagePoller.go b/internal/events/outbox/ent/messagePoller.go new file mode 100644 index 0000000..65c90ed --- /dev/null +++ b/internal/events/outbox/ent/messagePoller.go @@ -0,0 +1,273 @@ +package outboxent + +import ( + "context" + "errors" + "reflect" + "time" + + "entgo.io/ent/dialect/sql" + "github.com/TheRafaBonin/roxy" + "github.com/google/uuid" + "github.com/gothunder/thunder/internal/events/outbox" + "github.com/gothunder/thunder/internal/utils" + "github.com/rs/zerolog" +) + +var ( + ErrMessagePollerClosed = errors.New("message poller closed") + ErrInvalidMessagePack = errors.New("invalid message pack") + ErrInvalidPollInterval = errors.New("invalid poll interval") + ErrInvalidBatchSize = errors.New("invalid batch size") +) + +const ( + // methods + methodQuery = "Query" + methodQueryWhere = "Where" + methodQueryAll = "All" + methodLimit = "Limit" + methodOrder = "Order" + + // fields + fieldCreatedAt = "created_at" +) + +func NewEntMessagePoller( + outboxMessageClient interface{}, + pollInterval time.Duration, + batchSize int, +) (outbox.MessagePoller, error) { + if outboxMessageClient == nil || !utils.HasMethod(outboxMessageClient, methodQuery) { + return nil, roxy.Wrap(utils.ErrMethodNotFound, methodQuery) + } + if pollInterval <= 0 { + return nil, ErrInvalidPollInterval + } + if batchSize <= 0 { + return nil, ErrInvalidBatchSize + } + + return &entMessagePoller{ + client: outboxMessageClient, + pollInterval: pollInterval, + batchSize: batchSize, + closeChan: make(chan struct{}), + nextChan: make(chan struct{}), + closed: false, + }, nil +} + +type entMessagePoller struct { + client interface{} + + pollInterval time.Duration + batchSize int + + closed bool + closeChan chan struct{} + // nextChan is used to notify the poller to load more messages. + // It is used to avoid duplicate messages due to eaguer loading + // of one batch until the message channel blocks awaiting consumer. + // This way it awaits the consumer to tell it already processed that + // batch of messages. + nextChan chan struct{} +} + +// Close implements outbox.MessagePoller. +func (e *entMessagePoller) Close() error { + if e.closed { + return nil + } + close(e.closeChan) + close(e.nextChan) + e.closed = true + return nil +} + +// Poll implements outbox.MessagePoller. +func (e *entMessagePoller) Poll(ctx context.Context) (<-chan []outbox.Message, func(), error) { + if e.closed { + return nil, func() {}, ErrMessagePollerClosed + } + + messageChan := make(chan []outbox.Message) + logger := zerolog.Ctx(ctx) + + go func() { + defer close(messageChan) + + for { + select { + case <-ctx.Done(): + return + case <-e.closeChan: + return + default: + if err := e.forwardMessages(ctx, messageChan); err != nil { + if errors.Is(err, ErrMessagePollerClosed) || errors.Is(err, context.Canceled) { + return + } + logger.Err(err).Msg("error polling outbox messages") + panic(err) + } + + time.Sleep(e.pollInterval) + } + } + }() + + return messageChan, func() { + if e.closed { + return + } + e.nextChan <- struct{}{} + }, nil +} + +func (e *entMessagePoller) forwardMessages(ctx context.Context, msgChan chan<- []outbox.Message) error { + var err error + var msgPack []outbox.Message + for msgPack, err = e.readBatch(ctx); len(msgPack) > 0 && err == nil; msgPack, err = e.readBatch(ctx) { + select { + case <-ctx.Done(): + return ctx.Err() + case <-e.closeChan: + return ErrMessagePollerClosed + default: + msgChan <- msgPack + // await notification to load more messages + <-e.nextChan + } + } + + return roxy.Wrap(err, "reading batch of messages") +} + +func (e *entMessagePoller) readBatch(ctx context.Context) ([]outbox.Message, error) { + queryBuilder, err := newQueryBuilder(e.client) + if err != nil { + return nil, roxy.Wrap(err, "creating query builder") + } + + if err := queryBuilder.WhereDeliveryAtIsNil(); err != nil { + return nil, roxy.Wrap(err, "setting where clause") + } + + if err := queryBuilder.Limit(e.batchSize); err != nil { + return nil, roxy.Wrap(err, "setting limit clause") + } + + if err := queryBuilder.OrderByCreatedAt(); err != nil { + return nil, roxy.Wrap(err, "setting order clause") + } + + return queryBuilder.All(ctx) +} + +type outboxMessageQueryBuilder struct { + builder interface{} +} + +func newQueryBuilder(client interface{}) (*outboxMessageQueryBuilder, error) { + // initialize query builder + queryBuilder, err := utils.SafeCallMethod(client, methodQuery, []reflect.Value{}) + if err != nil { + return nil, roxy.Wrap(err, "calling query method on ent client") + } + + return &outboxMessageQueryBuilder{ + builder: queryBuilder[0].Interface(), + }, nil +} + +func (q *outboxMessageQueryBuilder) WhereDeliveryAtIsNil() error { + method, ok := reflect.TypeOf(q.builder).MethodByName(methodQueryWhere) + if !ok { + return roxy.Wrap(utils.ErrMethodNotFound, "getting method Where of OutboxMessageQuery") + } + + elemType := method.Type.In(1).Elem() + deliveredAtNilClause := reflect.ValueOf(sql.FieldIsNull(fieldDeliveredAt)).Convert(elemType) + + _, err := utils.SafeCallMethod(q.builder, methodQueryWhere, []reflect.Value{ + deliveredAtNilClause, + }) + if err != nil { + return roxy.Wrap(err, "calling Where method on OutboxMessageQuery") + } + return nil +} + +func (q *outboxMessageQueryBuilder) Limit(limit int) error { + _, err := utils.SafeCallMethod(q.builder, methodLimit, []reflect.Value{ + reflect.ValueOf(limit), + }) + if err != nil { + return roxy.Wrap(err, "calling Limit method on OutboxMessageQuery") + } + + return nil +} + +func (q *outboxMessageQueryBuilder) OrderByCreatedAt() error { + method, ok := reflect.TypeOf(q.builder).MethodByName(methodOrder) + if !ok { + return roxy.Wrap(utils.ErrMethodNotFound, "getting method Order of OutboxMessageQuery") + } + + elemType := method.Type.In(1).Elem() + orderClause := reflect.ValueOf(sql.OrderByField(fieldCreatedAt, sql.OrderAsc()).ToFunc()).Convert(elemType) + + _, err := utils.SafeCallMethod(q.builder, methodOrder, []reflect.Value{ + orderClause, + }) + if err != nil { + return roxy.Wrap(err, "calling Order method on OutboxMessageQuery") + } + + return nil +} + +func (q *outboxMessageQueryBuilder) All(ctx context.Context) ([]outbox.Message, error) { + // All + result, err := utils.SafeCallMethod(q.builder, methodQueryAll, []reflect.Value{ + reflect.ValueOf(ctx), + }) + if err != nil { + return nil, roxy.Wrap(err, "calling All method on OutboxMessageQuery") + } + if result[1].Interface() != nil { + return nil, roxy.Wrap(result[1].Interface().(error), "getting result of All method on OutboxMessageQuery") + } + + return parseEntMessages(result[0]) +} + +func parseEntMessages(result reflect.Value) ([]outbox.Message, error) { + msgPack := result + if msgPack.Kind() != reflect.Slice { + return nil, roxy.Wrap(ErrInvalidMessagePack, "getting slice of messages") + } + + messages := make([]outbox.Message, msgPack.Len()) + for i := 0; i < msgPack.Len(); i++ { + msg := msgPack.Index(i) + if msg.Kind() == reflect.Ptr { + msg = msg.Elem() + } + if msg.Kind() != reflect.Struct { + return nil, roxy.Wrap(ErrInvalidMessagePack, "getting message struct") + } + + messages[i] = outbox.Message{ + ID: msg.FieldByName("ID").Interface().(uuid.UUID), + CreatedAt: msg.FieldByName("CreatedAt").Interface().(time.Time), + Topic: msg.FieldByName("Topic").Interface().(string), + Payload: msg.FieldByName("Payload").Interface().([]byte), + Headers: msg.FieldByName("Headers").Interface().(map[string]string), + } + } + + return messages, nil +} diff --git a/internal/events/outbox/ent/messagePoller_test.go b/internal/events/outbox/ent/messagePoller_test.go new file mode 100644 index 0000000..9f3c130 --- /dev/null +++ b/internal/events/outbox/ent/messagePoller_test.go @@ -0,0 +1,273 @@ +package outboxent + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/gothunder/thunder/internal/events/outbox" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/outboxmessage" +) + +func TestNewEntMessagePoller(t *testing.T) { + entClient := setupEnt(t) + + type args struct { + outboxMessageClient interface{} + pollInterval time.Duration + batchSize int + } + tests := []struct { + name string + args args + want *entMessagePoller + wantErr bool + }{ + { + name: "success", + args: args{ + outboxMessageClient: entClient.OutboxMessage, + pollInterval: 1, + batchSize: 1, + }, + want: &entMessagePoller{ + client: entClient.OutboxMessage, + pollInterval: 1, + batchSize: 1, + }, + wantErr: false, + }, + { + name: "error", + args: args{ + outboxMessageClient: nil, + pollInterval: 1, + batchSize: 1, + }, + want: nil, + wantErr: true, + }, + { + name: "error", + args: args{ + outboxMessageClient: entClient.OutboxMessage, + pollInterval: 0, + batchSize: 1, + }, + want: nil, + wantErr: true, + }, + { + name: "error", + args: args{ + outboxMessageClient: entClient.OutboxMessage, + pollInterval: 1, + batchSize: 0, + }, + want: nil, + wantErr: true, + }, + { + name: "error", + args: args{ + outboxMessageClient: entClient, + pollInterval: 1, + batchSize: 1, + }, + want: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewEntMessagePoller(tt.args.outboxMessageClient, tt.args.pollInterval, tt.args.batchSize) + if (err != nil) != tt.wantErr { + t.Errorf("NewEntMessagePoller() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.want == nil && got != nil { + t.Errorf("NewEntMessagePoller() got = %v, want %v", got, tt.want) + } + if tt.want != nil && got == nil { + t.Errorf("NewEntMessagePoller() got = %v, want %v", got, tt.want) + } + if tt.want != nil && got != nil { + if got.(*entMessagePoller).client != tt.want.client { + t.Errorf("NewEntMessagePoller() got client = %v, want %v", got, tt.want) + } + if got.(*entMessagePoller).pollInterval != tt.want.pollInterval { + t.Errorf("NewEntMessagePoller() got pollInterval = %v, want %v", got, tt.want) + } + if got.(*entMessagePoller).batchSize != tt.want.batchSize { + t.Errorf("NewEntMessagePoller() got batchSize = %v, want %v", got, tt.want) + } + close(got.(*entMessagePoller).closeChan) + close(got.(*entMessagePoller).nextChan) + } + }) + } +} + +func TestEntMessagePoller_Close(t *testing.T) { + t.Run("polling already closed", func(t *testing.T) { + // Arrange + entClient := setupEnt(t) + e, err := NewEntMessagePoller(entClient.OutboxMessage, 1, 1) + if err != nil { + t.Errorf("NewEntMessagePoller error = %v, wantErr %v", err, nil) + return + } + e = outbox.WrapPollerWithTracing(e) + + // Act + if err := e.Close(); err != nil { + t.Errorf("entMessagePoller.Close() error = %v, wantErr %v", err, nil) + } + messages, _, err := e.Poll(context.Background()) + + if messages != nil { + t.Errorf("entMessagePoller.Close() messages = %v, want %v", messages, nil) + } + if err == nil { + t.Errorf("entMessagePoller.Close() error = %v, wantErr %v", err, ErrMessagePollerClosed) + } + }) + + t.Run("close a polling", func(t *testing.T) { + // Arrange + entClient := setupEnt(t) + e, err := NewEntMessagePoller(entClient.OutboxMessage, 1, 1) + if err != nil { + t.Errorf("NewEntMessagePoller error = %v, wantErr %v", err, nil) + return + } + e = outbox.WrapPollerWithTracing(e) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + messagesChan, _, err := e.Poll(ctx) + if err != nil { + t.Errorf("entMessagePoller.Close() error = %v, wantErr %v", err, nil) + return + } + + // Act + if err := e.Close(); err != nil { + t.Errorf("entMessagePoller.Close() error = %v, wantErr %v", err, nil) + } + messages := <-messagesChan + + if messages != nil { + t.Errorf("entMessagePoller.Close() messages = %v, want %v", messages, nil) + } + }) +} + +func TestEntMessagePoller_Poll(t *testing.T) { + // Arrange + population := []entInit.OutboxMessage{ + { + ID: uuid.MustParse("6d1559ea-4a68-4c10-9646-b1a42cb9c6cd"), + Payload: []byte("payload"), + Topic: "delivered", + CreatedAt: time.Now(), + DeliveredAt: time.Now(), + }, + { + ID: uuid.MustParse("592ea815-34a9-4924-9097-82e59476f14a"), + Payload: []byte("payload"), + Topic: "notDelivered", + CreatedAt: time.Now(), + }, + } + insertion := []entInit.OutboxMessage{ + { + ID: uuid.MustParse("6d5b845b-dd40-41f8-a6d6-b43639697abc"), + Payload: []byte("payload"), + Topic: "notDeliveredToo", + CreatedAt: time.Now(), + }, + } + + entClient := setupEnt(t) + err := populateOutboxMessages(context.Background(), entClient, population) + if err != nil { + t.Errorf("populateOutboxMessages error = %v, wantErr %v", err, nil) + return + } + + wantMessages := []outbox.Message{ + { + ID: population[1].ID, + Payload: population[1].Payload, + Topic: population[1].Topic, + CreatedAt: population[1].CreatedAt, + }, + { + ID: insertion[0].ID, + Payload: insertion[0].Payload, + Topic: insertion[0].Topic, + CreatedAt: insertion[0].CreatedAt, + }, + } + + e, err := NewEntMessagePoller(entClient.OutboxMessage, 1, 1) + if err != nil { + t.Errorf("NewEntMessagePoller error = %v, wantErr %v", err, nil) + return + } + e = outbox.WrapPollerWithTracing(e) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Act + messagesChan, next, err := e.Poll(ctx) + + messages := make([]outbox.Message, 0) + go func() { + for msgPack := range messagesChan { + messages = append(messages, msgPack...) + ids := make([]uuid.UUID, len(msgPack)) + for i, msg := range msgPack { + ids[i] = msg.ID + } + entClient.OutboxMessage.Update().SetDeliveredAt(time.Now()).Where(outboxmessage.IDIn(ids...)).ExecX(ctx) + next() + } + }() + + time.Sleep(10 * time.Millisecond) + err = populateOutboxMessages(context.Background(), entClient, insertion) + if err != nil { + t.Errorf("populateOutboxMessages error = %v, wantErr %v", err, nil) + return + } + time.Sleep(10 * time.Millisecond) + cancel() + + // Assert + if messages == nil { + t.Errorf("entMessagePoller.Poll() messages = %v, want %v", messages, wantMessages) + } + if len(messages) != len(wantMessages) { + t.Errorf("entMessagePoller.Poll() len messages = %v, want %v", len(messages), len(wantMessages)) + return + } + for i := range messages { + if messages[i].ID != wantMessages[i].ID { + t.Errorf("entMessagePoller.Poll() message.ID = %v, want %v", messages[i].ID, wantMessages[i].ID) + } + if string(messages[i].Payload) != string(wantMessages[i].Payload) { + t.Errorf("entMessagePoller.Poll() message.Payload = %v, want %v", string(messages[i].Payload), string(wantMessages[i].Payload)) + } + if messages[i].Topic != wantMessages[i].Topic { + t.Errorf("entMessagePoller.Poll() message.Topic = %v, want %v", messages[i].Topic, wantMessages[i].Topic) + } + if !messages[i].CreatedAt.Equal(wantMessages[i].CreatedAt) { + t.Errorf("entMessagePoller.Poll() message.CreatedAt = %v, want %v", messages[i].CreatedAt, wantMessages[i].CreatedAt) + } + } +} diff --git a/internal/events/outbox/message.go b/internal/events/outbox/message.go index 1fb9265..a06408d 100644 --- a/internal/events/outbox/message.go +++ b/internal/events/outbox/message.go @@ -2,6 +2,9 @@ package outbox import ( "errors" + "time" + + "github.com/google/uuid" ) var ( @@ -10,11 +13,28 @@ var ( ) type Message struct { + // Fields to be used by the outbox + ID uuid.UUID + CreatedAt time.Time + + // Fields to be used during the storage of the message Topic string Payload []byte Headers map[string]string } +func NewMessage(topic string, payload []byte, headers map[string]string) Message { + if headers == nil { + headers = make(map[string]string) + } + return Message{ + ID: uuid.New(), + Topic: topic, + Payload: payload, + Headers: headers, + } +} + func (m Message) BuildEntMessage(creator MessageCreator) MessageCreator { return creator. SetTopic(m.Topic). diff --git a/internal/events/outbox/relayer.go b/internal/events/outbox/relayer.go index 0cb2ab6..0ec8937 100644 --- a/internal/events/outbox/relayer.go +++ b/internal/events/outbox/relayer.go @@ -1,5 +1,147 @@ package outbox +import ( + "context" + "errors" + + "github.com/TheRafaBonin/roxy" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/cenkalti/backoff/v4" +) + +var ( + ErrRelayerClosed = errors.New("relayer is closed") + ErrNilBackoff = errors.New("backoff is nil") + ErrNilPublisher = errors.New("publisher is nil") + ErrNilPoller = errors.New("poller is nil") + ErrNilMarker = errors.New("marker is nil") +) + type Relayer interface { - Start() + Start(ctx context.Context) error + Close(ctx context.Context) error + relay(ctx context.Context, msgPack []Message) error + prepareMessages(msgPack []Message) map[string][]*message.Message +} + +type MessagePoller interface { + Poll(ctx context.Context) (<-chan []Message, func(), error) + Close() error +} + +type MessageMarker interface { + MarkAsPublished(ctx context.Context, msgPack []Message) error +} + +type relayer struct { + publisher message.Publisher + poller MessagePoller + marker MessageMarker + backOff backoff.BackOff + + closedChan chan struct{} +} + +func NewRelayer( + backOff backoff.BackOff, + publisher message.Publisher, + poller MessagePoller, + marker MessageMarker, +) (Relayer, error) { + if backOff == nil { + return nil, roxy.Wrap(ErrNilBackoff, "creating relayer") + } + + if publisher == nil { + return nil, roxy.Wrap(ErrNilPublisher, "creating relayer") + } + + if poller == nil { + return nil, roxy.Wrap(ErrNilPoller, "creating relayer") + } + + if marker == nil { + return nil, roxy.Wrap(ErrNilMarker, "creating relayer") + } + + return &relayer{ + backOff: backOff, + poller: poller, + marker: marker, + publisher: publisher, + closedChan: make(chan struct{}), + }, nil +} + +func (r *relayer) Start(ctx context.Context) error { + msgPacks, next, _ := r.poller.Poll(ctx) + + for msgPack := range msgPacks { + select { + case <-r.closedChan: + return ErrRelayerClosed + case <-ctx.Done(): + return ctx.Err() + default: + err := backoff.Retry(func() error { + return r.relay(ctx, msgPack) + }, backoff.WithContext(r.backOff, ctx)) + + if err != nil { + return roxy.Wrap(err, "relaying messages") + } + + next() + } + } + + return nil +} + +func (r *relayer) relay(ctx context.Context, msgPack []Message) error { + select { + case <-r.closedChan: + return backoff.Permanent(roxy.Wrap(ErrRelayerClosed, "relaying messages")) + default: + msgMap := r.prepareMessages(msgPack) + + for topic, msgs := range msgMap { + if err := r.publisher.Publish(topic, msgs...); err != nil { + return roxy.Wrap(err, "publishing messages") + } + } + + if err := r.marker.MarkAsPublished(ctx, msgPack); err != nil { + return roxy.Wrap(err, "marking messages as published") + } + + return nil + } +} + +func (r *relayer) prepareMessages(msgPack []Message) map[string][]*message.Message { + msgMap := make(map[string][]*message.Message) + + for _, msg := range msgPack { + if _, ok := msgMap[msg.Topic]; !ok { + msgMap[msg.Topic] = make([]*message.Message, 0) + } + wMsg := message.NewMessage(msg.ID.String(), msg.Payload) + if msg.Headers != nil { + wMsg.Metadata = msg.Headers + } + + msgMap[msg.Topic] = append(msgMap[msg.Topic], wMsg) + } + + return msgMap +} + +func (r *relayer) Close(ctx context.Context) error { + close(r.closedChan) + errs := make([]error, 2) + errs[0] = r.poller.Close() + errs[1] = r.publisher.Close() + + return errors.Join(errs...) } diff --git a/internal/events/outbox/relayer_logging.go b/internal/events/outbox/relayer_logging.go new file mode 100644 index 0000000..cd61d0e --- /dev/null +++ b/internal/events/outbox/relayer_logging.go @@ -0,0 +1,100 @@ +package outbox + +import ( + "context" + "time" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/rs/zerolog" +) + +type withLoggingReayer struct { + next Relayer +} + +// Close implements Relayer. +func (w *withLoggingReayer) Close(ctx context.Context) error { + logger := zerolog.Ctx(ctx) + + logger.Debug(). + Str("context", "thunder.outbox.relayer"). + Str("op", "Close").Msg("closing relayer") + + defer func() { + logger.Info(). + Str("context", "thunder.outbox.relayer"). + Str("op", "Close").Msg("reayer closed") + }() + + return w.next.Close(ctx) +} + +// Start implements Relayer. +func (w *withLoggingReayer) Start(ctx context.Context) (err error) { + logger := zerolog.Ctx(ctx) + + logger.Debug(). + Str("context", "thunder.outbox.relayer"). + Str("op", "Start").Msg("starting relaying messages") + + defer func() { + if err != nil { + logger.Error(). + Str("context", "thunder.outbox.relayer"). + Str("op", "Start"). + Err(err). + Msg("relaying messages failed to start") + } else { + logger.Info(). + Str("context", "thunder.outbox.relayer"). + Str("op", "Start").Msg("relaying messages started") + } + }() + + err = w.next.Start(ctx) + return +} + +// prepareMessages implements Relayer. +func (w *withLoggingReayer) prepareMessages(msgPack []Message) map[string][]*message.Message { + return w.next.prepareMessages(msgPack) +} + +// relay implements Relayer. +func (w *withLoggingReayer) relay(ctx context.Context, msgPack []Message) (err error) { + logger := zerolog.Ctx(ctx) + start := time.Now() + + logger.Debug(). + Str("context", "thunder.outbox.relayer"). + Str("op", "relay"). + Msg("starting relaying messages") + + defer func() { + if err != nil { + logger.Info(). + Str("context", "thunder.outbox.relayer"). + Str("op", "relay"). + Int("messages_num", len(msgPack)). + Dur("latency", time.Since(start)). + Err(err). + Msg("relaying messages failed... it will be retried") + } else { + logger.Info(). + Str("context", "thunder.outbox.relayer"). + Str("op", "relay"). + Int("messages_num", len(msgPack)). + Dur("latency", time.Since(start)). + Msg("messages relayed") + } + }() + + err = w.next.relay(ctx, msgPack) + return +} + +func WrapRelayerWithLogging(next Relayer) Relayer { + return &withLoggingReayer{ + next: next, + } +} diff --git a/internal/events/outbox/relayer_metrics.go b/internal/events/outbox/relayer_metrics.go new file mode 100644 index 0000000..2900453 --- /dev/null +++ b/internal/events/outbox/relayer_metrics.go @@ -0,0 +1,107 @@ +package outbox + +import ( + "context" + "time" + + "github.com/TheRafaBonin/roxy" + "github.com/ThreeDotsLabs/watermill/message" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" +) + +const ( + outboxRelayerMeterName = "thunder.outbox.relayer" +) + +type withMetricsReayer struct { + next Relayer + relayerCounter metric.Int64Counter + relayerCounterErr metric.Int64Counter + relayerLatencyHistogram metric.Float64Histogram +} + +// Close implements Relayer. +func (w *withMetricsReayer) Close(ctx context.Context) error { + return w.next.Close(ctx) +} + +// Start implements Relayer. +func (w *withMetricsReayer) Start(ctx context.Context) error { + return w.next.Start(ctx) +} + +// prepareMessages implements Relayer. +func (w *withMetricsReayer) prepareMessages(msgPack []Message) map[string][]*message.Message { + return w.next.prepareMessages(msgPack) +} + +// relay implements Relayer. +func (w *withMetricsReayer) relay(ctx context.Context, msgPack []Message) error { + err := w.next.relay(ctx, msgPack) + + defer func() { + topicCounter := make(map[string]int64) + for _, msg := range msgPack { + if _, ok := topicCounter[msg.Topic]; !ok { + topicCounter[msg.Topic] = 0 + } + topicCounter[msg.Topic]++ + } + for topic, count := range topicCounter { + w.relayerCounter.Add(ctx, count, metric.WithAttributes( + semconv.MessagingDestinationName(topic), + )) + if err != nil { + w.relayerCounterErr.Add(ctx, count, metric.WithAttributes( + semconv.MessagingDestinationName(topic), + )) + } + } + + for _, msg := range msgPack { + w.relayerLatencyHistogram.Record(ctx, time.Since(msg.CreatedAt).Seconds(), + metric.WithAttributes( + semconv.MessagingDestinationName(msg.Topic), + )) + } + }() + + return err +} + +func WrapRelayerWithMetrics(next Relayer) (Relayer, error) { + meterProvider := otel.GetMeterProvider() + + meter := meterProvider.Meter( + outboxRelayerMeterName, + ) + + relayerCounter, err := meter.Int64Counter("thunder.outbox.relayer.message.total", + metric.WithDescription("Total number of messages relayed"), + metric.WithUnit("1")) + if err != nil { + return nil, roxy.Wrap(err, "creating relayer total counter metrics") + } + + relayerCounterErr, err := meter.Int64Counter("thunder.outbox.relayer.message.error", + metric.WithDescription("Total number of messages relayed with error"), + metric.WithUnit("1")) + if err != nil { + return nil, roxy.Wrap(err, "creating relayer error counter metrics") + } + + relayerLatencyHistogram, err := meter.Float64Histogram("thunder.outbox.relayer.message.latency", + metric.WithDescription("Latency of messages relayed in seconds")) + if err != nil { + return nil, roxy.Wrap(err, "creating relayer latency histogram metrics") + } + + return &withMetricsReayer{ + next: next, + relayerCounter: relayerCounter, + relayerCounterErr: relayerCounterErr, + relayerLatencyHistogram: relayerLatencyHistogram, + }, nil +} diff --git a/internal/events/outbox/relayer_test.go b/internal/events/outbox/relayer_test.go new file mode 100644 index 0000000..189bec3 --- /dev/null +++ b/internal/events/outbox/relayer_test.go @@ -0,0 +1,506 @@ +package outbox + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/cenkalti/backoff/v4" + "github.com/google/uuid" +) + +func newBackoffMock(d time.Duration) backoff.BackOff { + return backoff.NewConstantBackOff(d) +} + +type messagePollerMock struct { + msgPacks [][]Message + pollCalls int + closeResults []error + closeCalls int +} + +func (m *messagePollerMock) Poll(ctx context.Context) (<-chan []Message, func(), error) { + m.pollCalls++ + msgPacks := make(chan []Message, len(m.msgPacks)) + for _, msgPack := range m.msgPacks { + msgPacks <- msgPack + } + close(msgPacks) + return msgPacks, func() {}, nil +} + +func (m *messagePollerMock) Close() error { + m.closeCalls++ + if len(m.closeResults) == 0 { + return nil + } + ret := m.closeResults[0] + m.closeResults = m.closeResults[1:] + return ret +} + +func newMessagePullerMock(msgPacks [][]Message, closeResults []error) MessagePoller { + return &messagePollerMock{ + msgPacks: msgPacks, + closeResults: closeResults, + } +} + +type messageMarkerMock struct { + markResults []error + markCalls [][]Message +} + +func (m *messageMarkerMock) MarkAsPublished(ctx context.Context, msgPack []Message) error { + m.markCalls = append(m.markCalls, msgPack) + if len(m.markResults) == 0 { + return nil + } + ret := m.markResults[0] + m.markResults = m.markResults[1:] + return ret +} + +func newMessageMarkerMock(markResults []error) MessageMarker { + return &messageMarkerMock{ + markResults: markResults, + } +} + +type publisherMock struct { + publishResults []error + publisherCalls []struct { + topic string + messages []*message.Message + } + closeResults []error + closeCalls int + closed bool +} + +func (p *publisherMock) Publish(topic string, messages ...*message.Message) error { + p.publisherCalls = append(p.publisherCalls, struct { + topic string + messages []*message.Message + }{ + topic: topic, + messages: messages, + }) + if p.closed { + return errors.New("closed") + } + if len(p.publishResults) == 0 { + return nil + } + + ret := p.publishResults[0] + p.publishResults = p.publishResults[1:] + return ret +} + +func (p *publisherMock) Close() error { + p.closeCalls++ + p.closed = true + if len(p.closeResults) == 0 { + return nil + } + ret := p.closeResults[0] + p.closeResults = p.closeResults[1:] + return ret +} + +func newPublisherMock(publishResults []error) message.Publisher { + return &publisherMock{ + publishResults: publishResults, + } +} + +func TestNewRelayer(t *testing.T) { + tests := []struct { + name string + backoff backoff.BackOff + publisher message.Publisher + poller MessagePoller + marker MessageMarker + expectedErr error + }{ + { + name: "success", + backoff: nil, + publisher: newPublisherMock(nil), + poller: newMessagePullerMock(nil, nil), + marker: newMessageMarkerMock(nil), + expectedErr: ErrNilBackoff, + }, + { + name: "nil publisher", + backoff: newBackoffMock(0), + publisher: nil, + poller: newMessagePullerMock(nil, nil), + marker: newMessageMarkerMock(nil), + expectedErr: ErrNilPublisher, + }, + { + name: "nil poller", + backoff: newBackoffMock(0), + publisher: newPublisherMock(nil), + poller: nil, + marker: newMessageMarkerMock(nil), + expectedErr: ErrNilPoller, + }, + { + name: "nil marker", + backoff: newBackoffMock(0), + publisher: newPublisherMock(nil), + poller: newMessagePullerMock(nil, nil), + marker: nil, + expectedErr: ErrNilMarker, + }, + { + name: "success", + backoff: newBackoffMock(0), + publisher: newPublisherMock(nil), + poller: newMessagePullerMock(nil, nil), + marker: newMessageMarkerMock(nil), + expectedErr: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + relayer, err := NewRelayer(test.backoff, test.publisher, test.poller, test.marker) + if test.expectedErr != nil && !errors.Is(err, test.expectedErr) { + t.Errorf("expected error %v, got %v", test.expectedErr, err) + } + if test.expectedErr != nil && relayer != nil { + t.Errorf("expected nil relayer, got %v", relayer) + } + + if test.expectedErr == nil && err != nil { + t.Errorf("expected no error, got %v", err) + } + if test.expectedErr == nil && relayer == nil { + t.Errorf("expected non-nil relayer, got %v", relayer) + } + }) + } +} + +func TestRelayer_Start(t *testing.T) { + tests := []struct { + name string + publisher message.Publisher + publishCalls []struct { + topic string + messages []*message.Message + } + poller MessagePoller + marker MessageMarker + markerCalls [][]Message + expectedErr error + }{ + { + name: "success", + publisher: newPublisherMock([]error{}), + publishCalls: []struct { + topic string + messages []*message.Message + }{ + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + }, + poller: newMessagePullerMock([][]Message{ + { + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }, + }, []error{}), + marker: newMessageMarkerMock([]error{}), + markerCalls: [][]Message{{ + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }}, + expectedErr: nil, + }, + { + name: "publsher error once, then retry success", + publisher: newPublisherMock([]error{errors.New("error")}), + publishCalls: []struct { + topic string + messages []*message.Message + }{ + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + }, + poller: newMessagePullerMock([][]Message{ + { + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }, + }, []error{}), + marker: newMessageMarkerMock([]error{}), + markerCalls: [][]Message{{ + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }}, + expectedErr: nil, + }, + { + name: "marker error once, then retry success", + publisher: newPublisherMock([]error{}), + publishCalls: []struct { + topic string + messages []*message.Message + }{ + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + }, + poller: newMessagePullerMock([][]Message{ + { + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }, + }, []error{}), + marker: newMessageMarkerMock([]error{errors.New("error")}), + markerCalls: [][]Message{{ + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }, { + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }}, + expectedErr: nil, + }, + { + name: "publisher error once, then marker error once, then retry success", + publisher: newPublisherMock([]error{errors.New("error")}), + publishCalls: []struct { + topic string + messages []*message.Message + }{ + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + { + topic: "topic", + messages: []*message.Message{ + { + UUID: "8171478b-fece-4093-aa7e-342c4d816a21", + Payload: []byte("payload"), + }, + }, + }, + }, + poller: newMessagePullerMock([][]Message{ + { + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }, + }, []error{}), + marker: newMessageMarkerMock([]error{errors.New("error")}), + markerCalls: [][]Message{{ + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }, { + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }}, + expectedErr: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + relayer, err := NewRelayer(newBackoffMock(0), test.publisher, test.poller, test.marker) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + relayer = WrapRelayerWithTracing(relayer) + relayer, _ = WrapRelayerWithMetrics(relayer) + relayer = WrapRelayerWithLogging(relayer) + + err = relayer.Start(context.Background()) + + if test.expectedErr != err { + t.Errorf("expected error %v, got %v", test.expectedErr, err) + } + + if test.poller.(*messagePollerMock).pollCalls != 1 { + t.Errorf("expected poller to be called once, got %d", test.poller.(*messagePollerMock).pollCalls) + } + + if len(test.publisher.(*publisherMock).publisherCalls) != len(test.publishCalls) { + t.Errorf("expected publisher to be called %d times, got %d", len(test.publishCalls), len(test.publisher.(*publisherMock).publisherCalls)) + } + + for i, call := range test.publishCalls { + if test.publisher.(*publisherMock).publisherCalls[i].topic != call.topic { + t.Errorf("expected publisher to be called with topic %s, got %s", call.topic, test.publisher.(*publisherMock).publisherCalls[i].topic) + } + if len(test.publisher.(*publisherMock).publisherCalls[i].messages) != len(call.messages) { + t.Errorf("expected publisher to be called with %d messages, got %d", len(call.messages), len(test.publisher.(*publisherMock).publisherCalls[i].messages)) + } + for j, msg := range call.messages { + if test.publisher.(*publisherMock).publisherCalls[i].messages[j].UUID != msg.UUID { + t.Errorf("expected publisher to be called with message %s, got %s", msg.UUID, test.publisher.(*publisherMock).publisherCalls[i].messages[j].UUID) + } + if string(test.publisher.(*publisherMock).publisherCalls[i].messages[j].Payload) != string(msg.Payload) { + t.Errorf("expected publisher to be called with message payload %s, got %s", msg.Payload, test.publisher.(*publisherMock).publisherCalls[i].messages[j].Payload) + } + } + } + + if len(test.marker.(*messageMarkerMock).markCalls) != len(test.markerCalls) { + t.Errorf("expected marker to be called %d times, got %d", len(test.markerCalls), len(test.marker.(*messageMarkerMock).markCalls)) + } + + for i, call := range test.markerCalls { + if len(test.marker.(*messageMarkerMock).markCalls[i]) != len(call) { + t.Errorf("expected marker to be called with %d messages, got %d", len(call), len(test.marker.(*messageMarkerMock).markCalls[i])) + } + for j, msg := range call { + if test.marker.(*messageMarkerMock).markCalls[i][j].ID != msg.ID { + t.Errorf("expected marker to be called with message %s, got %s", msg.ID, test.marker.(*messageMarkerMock).markCalls[i][j].ID) + } + if test.marker.(*messageMarkerMock).markCalls[i][j].Topic != msg.Topic { + t.Errorf("expected marker to be called with message topic %s, got %s", msg.Topic, test.marker.(*messageMarkerMock).markCalls[i][j].Topic) + } + if string(test.marker.(*messageMarkerMock).markCalls[i][j].Payload) != string(msg.Payload) { + t.Errorf("expected marker to be called with message payload %s, got %s", msg.Payload, test.marker.(*messageMarkerMock).markCalls[i][j].Payload) + } + } + } + }) + } +} + +func TestRelayer_Cancel(t *testing.T) { + t.Run("Relayer cancel", func(t *testing.T) { + publisher := newPublisherMock([]error{}) + poller := newMessagePullerMock([][]Message{ + { + { + ID: uuid.MustParse("8171478b-fece-4093-aa7e-342c4d816a21"), + Topic: "topic", + Payload: []byte("payload"), + }, + }, + }, []error{}) + marker := newMessageMarkerMock([]error{}) + + relayer, err := NewRelayer( + newBackoffMock(0), + publisher, + poller, + marker) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + relayer = WrapRelayerWithTracing(relayer) + relayer, _ = WrapRelayerWithMetrics(relayer) + relayer = WrapRelayerWithLogging(relayer) + + publisher.Close() // close publisher before starting relayer + + stop := make(chan struct{}) + + go func() { + err = relayer.Start(context.Background()) + close(stop) + }() + + relayer.Close(context.Background()) + <-stop + + if !errors.Is(err, ErrRelayerClosed) { + t.Errorf("expected ErrRelayerClosed, got %v", err) + } + }) +} diff --git a/internal/events/outbox/relayer_tracing.go b/internal/events/outbox/relayer_tracing.go new file mode 100644 index 0000000..7b39ffe --- /dev/null +++ b/internal/events/outbox/relayer_tracing.go @@ -0,0 +1,136 @@ +package outbox + +import ( + "context" + + "github.com/ThreeDotsLabs/watermill/message" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +const ( + SpanNameRelay = "thunder.outbox.relayer.Relay" + TracerNameRelayer = "thunder.outbox.relayer" + + SpanNamePoll = "thunder.outbox.relayer.Poll" + TracerNamePoll = "thunder.outbox.poller" +) + +type withTracingRelayer struct { + next Relayer + prop propagation.TextMapPropagator + tracer trace.Tracer +} + +// Close implements Relayer. +func (w *withTracingRelayer) Close(ctx context.Context) error { + return w.next.Close(ctx) +} + +// Start implements Relayer. +func (w *withTracingRelayer) Start(ctx context.Context) error { + return w.next.Start(ctx) +} + +// prepareMessages implements Relayer. +func (w *withTracingRelayer) prepareMessages(msgPack []Message) map[string][]*message.Message { + return w.next.prepareMessages(msgPack) +} + +// relay implements Relayer. +func (w *withTracingRelayer) relay(ctx context.Context, msgPack []Message) error { + spans := make([]trace.Span, len(msgPack)) + + for i := range msgPack { + tctx := w.prop.Extract(ctx, propagation.MapCarrier(msgPack[i].Headers)) + ntctx, span := w.tracer.Start( + tctx, SpanNameStore, + trace.WithSpanKind(trace.SpanKindProducer)) + defer span.End() + spans[i] = span + + w.prop.Inject(ntctx, propagation.MapCarrier(msgPack[i].Headers)) + } + + err := w.next.relay(ctx, msgPack) + if err != nil { + for i := range spans { + spans[i].RecordError(err) + } + } + + return err +} + +func WrapRelayerWithTracing(next Relayer) Relayer { + return &withTracingRelayer{ + next: next, + prop: propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + tracer: otel.Tracer(TracerNameRelayer)} +} + +type withTracingMessagePoller struct { + next MessagePoller + prop propagation.TextMapPropagator + tracer trace.Tracer +} + +// Close implements MessagePoller. +func (w *withTracingMessagePoller) Close() error { + return w.next.Close() +} + +// Poll implements MessagePoller. +func (w *withTracingMessagePoller) Poll(ctx context.Context) (<-chan []Message, func(), error) { + tMessageChan := make(chan []Message) + + messages, next, err := w.next.Poll(ctx) + if err != nil { + return messages, next, err + } + + go func() { + defer close(tMessageChan) + + endSpans := func() {} + defer endSpans() + + for msgPack := range messages { + endSpans() + spans := make([]trace.Span, len(msgPack)) + for i := range msgPack { + tctx := w.prop.Extract(ctx, propagation.MapCarrier(msgPack[i].Headers)) + ntctx, span := w.tracer.Start( + tctx, SpanNamePoll, + trace.WithSpanKind(trace.SpanKindConsumer)) + spans[i] = span + w.prop.Inject(ntctx, propagation.MapCarrier(msgPack[i].Headers)) + + endSpans = func() { + for i := range spans { + spans[i].End() + } + } + } + + tMessageChan <- msgPack + } + + }() + + return tMessageChan, next, err +} + +func WrapPollerWithTracing(next MessagePoller) MessagePoller { + return &withTracingMessagePoller{ + next: next, + prop: propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + tracer: otel.Tracer(TracerNamePoll)} +} diff --git a/internal/events/outbox/storer_logging.go b/internal/events/outbox/storer_logging.go index a1187eb..61cb43d 100644 --- a/internal/events/outbox/storer_logging.go +++ b/internal/events/outbox/storer_logging.go @@ -9,7 +9,7 @@ import ( const ( opLabel = "op" - op = "thunder.outbox.storer.Store" + opStore = "thunder.outbox.storer.Store" latencyLabel = "latency" messagesNumLabel = "messages_num" ) @@ -21,16 +21,16 @@ type withLoggingStorer struct { // Store implements Storer. func (wts withLoggingStorer) Store(ctx context.Context, tx interface{}, messages []Message) error { logger := zerolog.Ctx(ctx) - logger.Debug().Str(opLabel, op).Msg("starting storing messages") + logger.Debug().Str(opLabel, opStore).Msg("starting storing messages") start := time.Now() defer func() { - logger.Debug().Str(opLabel, op).Msg("finished storing messages") + logger.Debug().Str(opLabel, opStore).Msg("finished storing messages") logger. Info(). Dur(latencyLabel, time.Since(start)). - Str(opLabel, op). + Str(opLabel, opStore). Int(messagesNumLabel, len(messages)). Msg("messages stored") }() diff --git a/internal/events/outbox/storer_tracing.go b/internal/events/outbox/storer_tracing.go index eca738d..7ebfaaf 100644 --- a/internal/events/outbox/storer_tracing.go +++ b/internal/events/outbox/storer_tracing.go @@ -9,8 +9,8 @@ import ( ) const ( - SpanNameStore = "thunder.outbox.storer.Store" - TracerName = "thunder.outbox.storer" + SpanNameStore = "thunder.outbox.storer.Store" + TracerNameStorer = "thunder.outbox.storer" ) type withTracingStorer struct { @@ -52,5 +52,5 @@ func WrapStorerWithTracing(next Storer) Storer { propagation.TraceContext{}, propagation.Baggage{}, ), - tracer: otel.Tracer(TracerName)} + tracer: otel.Tracer(TracerNameStorer)} } diff --git a/internal/utils/reflection.go b/internal/utils/reflection.go index 1d6ace0..3c5bacb 100644 --- a/internal/utils/reflection.go +++ b/internal/utils/reflection.go @@ -31,27 +31,7 @@ func SafeCallMethod(i interface{}, methodName string, args []reflect.Value) ([]r in := make([]reflect.Value, len(args)+1) in[0] = reflect.ValueOf(i) - // check if the arguments are of the correct type for k, arg := range args { - // if is greater than or equal last argument and is variadic - if k+1 >= method.Type.NumIn()-1 && method.Type.IsVariadic() { - if arg.Type() != method.Type.In(method.Type.NumIn()-1).Elem() { - return nil, roxy.Wrapf( - ErrInvalidArgumentType, - "argument %d is of type %s, expected %s", - k+1, arg.Type(), method.Type.In(method.Type.NumIn()-1).Elem(), - ) - } - } else { - if arg.Type() != method.Type.In(k+1) { - return nil, roxy.Wrapf( - ErrInvalidArgumentType, - "argument %d is of type %s, expected %s", - k+1, arg.Type(), method.Type.In(k+1), - ) - } - } - in[k+1] = arg } diff --git a/tests/entInit/generate.go b/tests/entInit/generate.go deleted file mode 100644 index 26f8333..0000000 --- a/tests/entInit/generate.go +++ /dev/null @@ -1,3 +0,0 @@ -package entInit - -//go:generate go run -mod=mod entgo.io/ent/cmd/ent generate ./schema diff --git a/tests/outbox_Storer_test.go b/tests/outbox_Storer_test.go index 6aee2ec..c5db47e 100644 --- a/tests/outbox_Storer_test.go +++ b/tests/outbox_Storer_test.go @@ -4,9 +4,9 @@ import ( "context" "time" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit/enttest" "github.com/gothunder/thunder/pkg/events/outbox" - "github.com/gothunder/thunder/tests/entInit" - "github.com/gothunder/thunder/tests/entInit/enttest" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" diff --git a/tests/thunder_suite_test.go b/tests/thunder_suite_test.go index ce529d5..4d8c410 100644 --- a/tests/thunder_suite_test.go +++ b/tests/thunder_suite_test.go @@ -4,7 +4,7 @@ import ( "os" "testing" - "github.com/gothunder/thunder/tests/entInit" + "github.com/gothunder/thunder/internal/events/outbox/ent/entInit" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" )