Skip to content

Commit

Permalink
feat(handywares): nats middleware stack
Browse files Browse the repository at this point in the history
* panic recover
* otel

Signed-off-by: Pouyan Heyratpour <[email protected]>
  • Loading branch information
pouyanh committed Sep 28, 2024
1 parent 74b414b commit c22f71e
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
5 changes: 5 additions & 0 deletions handywares/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/hibiken/asynq v0.24.1
github.com/janstoon/toolbox/bricks v0.7.2
github.com/janstoon/toolbox/tricks v0.10.0
github.com/nats-io/nats.go v1.37.0
github.com/rs/cors v1.11.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.29.0
Expand All @@ -33,14 +34,18 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/redis/go-redis/v9 v9.6.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/cast v1.7.0 // indirect
go.mongodb.org/mongo-driver v1.16.1 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions handywares/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/janstoon/toolbox/tricks v0.10.0 h1:cz+y7f6OWm7MU5alsd2s+jEtReDSPNbBG2
github.com/janstoon/toolbox/tricks v0.10.0/go.mod h1:G5vKiYk5opiGDy9aYLmIVZ5pZYNwI5UGrw9gCtI7WlU=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
Expand All @@ -66,6 +68,12 @@ github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -98,6 +106,8 @@ go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down
111 changes: 111 additions & 0 deletions handywares/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package handywares

import (
"context"
"errors"
"fmt"
"runtime/debug"
"strings"

"github.com/janstoon/toolbox/bricks"
"github.com/janstoon/toolbox/tricks"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel/trace"
)

type NatsMsgHandler func(ctx context.Context, msg *nats.Msg) error

type NatsMiddlewareStack = tricks.MiddlewareStack[NatsMsgHandler]

type PanicRecoverNatsMiddlewareOpt = tricks.InPlaceOption[any]

func NatsPanicRecoverMiddleware(options ...PanicRecoverAsynqMiddlewareOpt) tricks.Middleware[NatsMsgHandler] {
return func(next NatsMsgHandler) NatsMsgHandler {
return func(ctx context.Context, msg *nats.Msg) error {
defer func() {
if r := recover(); r != nil {
span := trace.SpanFromContext(ctx)
span.AddEvent("panic recovered", trace.WithAttributes(
oaPanicValue.String(fmt.Sprintf("%+v", r)),
oaDebugStack.String(string(debug.Stack())),
))
}
}()

return next(ctx, msg)
}
}
}

type CompensatorNatsMiddlewareOpt = tricks.InPlaceOption[any]

// NatsCompensatorMiddleware tries to compensate the task if error is not bricks.ErrRetryable.
// It searches for a bricks.Compensator in returned error by msg handler and runs the first one.
func NatsCompensatorMiddleware(options ...CompensatorNatsMiddlewareOpt) tricks.Middleware[NatsMsgHandler] {
return func(next NatsMsgHandler) NatsMsgHandler {
return func(ctx context.Context, msg *nats.Msg) error {
err := next(ctx, msg)
if err == nil {
return nil
}

if !errors.Is(err, bricks.ErrRetryable) {
var c bricks.Compensator
if errors.As(err, &c) {
err = errors.Join(err, c.Compensate(ctx, err))
}
}

return err
}
}
}

type OtelNmw struct {
tracer trace.Tracer

namePrefix string
}

type OpenTelemetryNatsMiddlewareOpt = tricks.Option[OtelNmw]

func OtelNatsSpanNamePrefix(prefix string) OpenTelemetryNatsMiddlewareOpt {
return tricks.OutOfPlaceOption[OtelNmw](func(nmw OtelNmw) OtelNmw {
nmw.namePrefix = prefix

return nmw
})
}

func NatsOpenTelemetryMiddleware(
tracer trace.Tracer, options ...OpenTelemetryNatsMiddlewareOpt,
) tricks.Middleware[NatsMsgHandler] {
amw := &OtelNmw{
tracer: tracer,
}
amw = tricks.ApplyOptions(amw, options...)

return amw.builder
}

func (nmw OtelNmw) builder(next NatsMsgHandler) NatsMsgHandler {
return func(ctx context.Context, msg *nats.Msg) error {
var span trace.Span
ctx, span = nmw.tracer.Start(ctx, nmw.spanName(msg.Subject), trace.WithSpanKind(trace.SpanKindConsumer))
defer span.End()

return next(ctx, msg)
}
}

func (nmw OtelNmw) spanName(subject string) string {
sb := strings.Builder{}
if len(strings.TrimSpace(nmw.namePrefix)) > 0 {
sb.WriteString(nmw.namePrefix)
sb.WriteRune('/')
}

sb.WriteString(subject)

return sb.String()
}

0 comments on commit c22f71e

Please sign in to comment.