Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: metrics #1442

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions openmeter/entitlement/balanceworker/recalculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (

defaultLRUCacheSize = 10_000

metricNameRecalculationTime = "balance_worker_entitlement_recalculation_time"
metricNameRecalculationTime = "balance_worker.entitlement_recalculation_time_ms"
)

var (
Expand Down Expand Up @@ -70,7 +70,7 @@ type Recalculator struct {
featureCache *lru.Cache[string, productcatalog.Feature]
subjectCache *lru.Cache[string, models.Subject]

metricRecalculationTime metric.Float64Histogram
metricRecalculationTime metric.Int64Histogram
}

func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
Expand All @@ -88,7 +88,7 @@ func NewRecalculator(opts RecalculatorOptions) (*Recalculator, error) {
return nil, fmt.Errorf("failed to create subject ID cache: %w", err)
}

metricRecalculationTime, err := opts.MetricMeter.Float64Histogram(
metricRecalculationTime, err := opts.MetricMeter.Int64Histogram(
metricNameRecalculationTime,
metric.WithDescription("Entitlement recalculation time"),
metric.WithExplicitBucketBoundaries(metricRecalculationBuckets...),
Expand Down Expand Up @@ -135,7 +135,7 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Seconds(),
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeDeleteAttribute))
} else {
err := r.sendEntitlementUpdatedEvent(ctx, ent)
Expand All @@ -144,7 +144,7 @@ func (r *Recalculator) processEntitlements(ctx context.Context, entitlements []e
}

r.metricRecalculationTime.Record(ctx,
time.Since(start).Seconds(),
time.Since(start).Milliseconds(),
metric.WithAttributes(recalculationTimeUpdateAttribute))
}
}
Expand Down
10 changes: 8 additions & 2 deletions openmeter/entitlement/balanceworker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ThreeDotsLabs/watermill/message"
lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/openmeter/credit/grant"
"github.com/openmeterio/openmeter/openmeter/entitlement"
Expand Down Expand Up @@ -85,7 +86,11 @@ func New(opts WorkerOptions) (*Worker, error) {

worker.router = router

eventHandler := worker.eventHandler()
eventHandler, err := worker.eventHandler(opts.Router.MetricMeter)
if err != nil {
return nil, err
}

router.AddNoPublisherHandler(
"balance_worker_system_events",
opts.SystemEventsTopic,
Expand All @@ -105,9 +110,10 @@ func New(opts WorkerOptions) (*Worker, error) {
return worker, nil
}

func (w *Worker) eventHandler() message.NoPublishHandlerFunc {
func (w *Worker) eventHandler(metricMeter metric.Meter) (message.NoPublishHandlerFunc, error) {
return grouphandler.NewNoPublishingHandler(
w.opts.EventBus.Marshaler(),
metricMeter,

// Entitlement created event
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *entitlement.EntitlementCreatedEvent) error {
Expand Down
23 changes: 14 additions & 9 deletions openmeter/notification/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,24 @@ func New(opts Options) (*Consumer, error) {
balanceThresholdHandler: balanceThresholdEventHandler,
}

handler, err := grouphandler.NewNoPublishingHandler(opts.Marshaler, opts.Router.MetricMeter,
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error {
if event == nil {
return nil
}

return consumer.balanceThresholdHandler.Handle(ctx, *event)
}),
)
if err != nil {
return nil, err
}

_ = router.AddNoPublisherHandler(
"balance_consumer_system_events",
opts.SystemEventsTopic,
opts.Router.Subscriber,
grouphandler.NewNoPublishingHandler(opts.Marshaler,
grouphandler.NewGroupEventHandler(func(ctx context.Context, event *snapshot.SnapshotEvent) error {
if event == nil {
return nil
}

return consumer.balanceThresholdHandler.Handle(ctx, *event)
}),
),
handler,
)

return consumer, nil
Expand Down
2 changes: 1 addition & 1 deletion openmeter/sink/flushhandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (f *flushEventHandler) invokeCallback(ctx context.Context, events []models.
return err
}

f.metrics.eventProcessingTime.Record(ctx, time.Since(startTime).Seconds())
f.metrics.eventProcessingTime.Record(ctx, time.Since(startTime).Milliseconds())
f.metrics.eventsProcessed.Add(ctx, 1)

return nil
Expand Down
4 changes: 2 additions & 2 deletions openmeter/sink/flushhandler/meters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type metrics struct {
eventsReceived metric.Int64Counter
eventsProcessed metric.Int64Counter
eventsFailed metric.Int64Counter
eventProcessingTime metric.Float64Histogram
eventProcessingTime metric.Int64Histogram
eventChannelFull metric.Int64Counter
}

Expand All @@ -35,7 +35,7 @@ func newMetrics(handlerName string, meter metric.Meter) (*metrics, error) {
return nil, err
}

if r.eventProcessingTime, err = meter.Float64Histogram(fmt.Sprintf("sink.flush_handler.%s.event_processing_time", handlerName)); err != nil {
if r.eventProcessingTime, err = meter.Int64Histogram(fmt.Sprintf("sink.flush_handler.%s.event_processing_time_ms", handlerName)); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion openmeter/watermill/driver/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
defaultMeterPrefix = "sarama.publisher."
defaultMeterPrefix = "sarama."
defaultKeepalive = time.Minute
)

Expand Down
75 changes: 73 additions & 2 deletions openmeter/watermill/grouphandler/grouphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,23 @@ package grouphandler

import (
"context"
"time"

"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
meterNameHandlerMessageCount = "watermill.grouphandler.message_count"
meterNameHandlerProcessingTime = "watermill.grouphandler.processing_time_ms"
)

var (
meterAttributeStatusIgnored = attribute.String("status", "ignored")
meterAttributeStatusFailed = attribute.String("status", "failed")
meterAttributeStatusSuccess = attribute.String("status", "success")
)

type GroupEventHandler = cqrs.GroupEventHandler
Expand All @@ -14,7 +28,12 @@ func NewGroupEventHandler[T any](handleFunc func(ctx context.Context, event *T)
}

// NewNoPublishingHandler creates a NoPublishHandlerFunc that will handle events with the provided GroupEventHandlers.
func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers ...GroupEventHandler) message.NoPublishHandlerFunc {
func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, metricMeter metric.Meter, groupHandlers ...GroupEventHandler) (message.NoPublishHandlerFunc, error) {
meters, err := getMeters(metricMeter)
if err != nil {
return nil, err
}

typeHandlerMap := make(map[string]cqrs.GroupEventHandler)
for _, groupHandler := range groupHandlers {
event := groupHandler.NewEvent()
Expand All @@ -24,8 +43,14 @@ func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers
return func(msg *message.Message) error {
eventName := marshaler.NameFromMessage(msg)

meterAttributeCEType := attribute.String("ce_type", eventName)

groupHandler, ok := typeHandlerMap[eventName]
if !ok {
meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusIgnored,
))
return nil
}

Expand All @@ -35,6 +60,52 @@ func NewNoPublishingHandler(marshaler cqrs.CommandEventMarshaler, groupHandlers
return err
}

return groupHandler.Handle(msg.Context(), event)
startedAt := time.Now()
err := groupHandler.Handle(msg.Context(), event)
if err != nil {
meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusFailed,
))
meters.handlerProcessingTime.Record(msg.Context(), time.Since(startedAt).Milliseconds(), metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusFailed,
))

return err
}

meters.handlerProcessingTime.Record(msg.Context(), time.Since(startedAt).Milliseconds(), metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusSuccess,
))
meters.handlerMessageCount.Add(msg.Context(), 1, metric.WithAttributes(
meterAttributeCEType,
meterAttributeStatusSuccess,
))

return nil
}, nil
}

type meters struct {
handlerMessageCount metric.Int64Counter
handlerProcessingTime metric.Int64Histogram
}

func getMeters(meter metric.Meter) (*meters, error) {
handlerMessageCount, err := meter.Int64Counter(meterNameHandlerMessageCount)
if err != nil {
return nil, err
}

handlerProcessingTime, err := meter.Int64Histogram(meterNameHandlerProcessingTime)
if err != nil {
return nil, err
}

return &meters{
handlerMessageCount: handlerMessageCount,
handlerProcessingTime: handlerProcessingTime,
}, nil
}
Loading
Loading