Skip to content

Commit

Permalink
feat(transform): Add Metrics Bytes Transform (#142)
Browse files Browse the repository at this point in the history
* feat(transform): Add utilMetricBytes

* docs(examples): Add utilMetricBytes

* docs(examples): Comments
  • Loading branch information
jshlbrd authored Mar 7, 2024
1 parent fe90269 commit d708580
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 0 deletions.
8 changes: 8 additions & 0 deletions build/config/substation.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,14 @@
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
metric: {
bytes(settings={}): {
local default = {
metric: $.config.metric,
},

type: 'utility_metric_bytes',
settings: std.prune(std.mergePatch(default, $.helpers.abbv(settings))),
},
count(settings={}): {
local default = {
metric: $.config.metric,
Expand Down
19 changes: 19 additions & 0 deletions examples/config/transform/utility/message_bytes/config.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// This example shows how to use the `utility_metric_bytes` transform to
// sum the amount of data received and transformed by Substation.
local sub = import '../../../../../build/config/substation.libsonnet';

local attr = { AppName: 'example' };
local dest = { type: 'aws_cloudwatch_embedded_metrics' };

{
transforms: [
// If the transform is configured first, then the metric reflects
// the sum of bytes received by Substation.
sub.transform.utility.metric.bytes({ metric: { name: 'BytesReceived', attributes: attr, destination: dest } }),
// This inserts a value into the object so that the message size increases.
sub.transform.object.insert({obj: {target_key: '_'}, value: 1}),
// If the transform is configured last, then the metric reflects
// the sum of bytes transformed by Substation.
sub.transform.utility.metric.bytes({ metric: { name: 'BytesTransformed', attributes: attr, destination: dest } }),
],
}
13 changes: 13 additions & 0 deletions examples/config/transform/utility/message_bytes/data.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{"a":"b"}
{"c":"d"}
{"e":"f"}
{"g":"h"}
{"i":"j"}
{"k":"l"}
{"m":"n"}
{"o":"p"}
{"q":"r"}
{"s":"t"}
{"u":"v"}
{"w":"x"}
{"y":"z"}
2 changes: 2 additions & 0 deletions transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func New(ctx context.Context, cfg config.Config) (Transformer, error) { //nolint
return newUtilityDrop(ctx, cfg)
case "utility_err":
return newUtilityErr(ctx, cfg)
case "utility_metric_bytes":
return newUtilityMetricBytes(ctx, cfg)
case "utility_metric_count":
return newUtilityMetricCount(ctx, cfg)
case "utility_secret":
Expand Down
71 changes: 71 additions & 0 deletions transform/utility_metric_bytes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package transform

import (
"context"
"encoding/json"
"fmt"
"sync/atomic"

"github.com/brexhq/substation/config"
iconfig "github.com/brexhq/substation/internal/config"
"github.com/brexhq/substation/internal/metrics"
"github.com/brexhq/substation/message"
)

type utilityMetricBytesConfig struct {
Metric iconfig.Metric `json:"metric"`
}

func (c *utilityMetricBytesConfig) Decode(in interface{}) error {
return iconfig.Decode(in, c)
}

func newUtilityMetricBytes(ctx context.Context, cfg config.Config) (*utilityMetricBytes, error) {
// conf gets validated when calling metrics.New.
conf := utilityMetricBytesConfig{}
if err := conf.Decode(cfg.Settings); err != nil {
return nil, fmt.Errorf("transform: utility_metric_bytes: %v", err)
}

m, err := metrics.New(ctx, conf.Metric.Destination)
if err != nil {
return nil, fmt.Errorf("transform: utility_metric_bytes: %v", err)
}

tf := utilityMetricBytes{
conf: conf,
metric: m,
}

return &tf, nil
}

type utilityMetricBytes struct {
conf utilityMetricBytesConfig

metric metrics.Generator
bytes uint32
}

func (tf *utilityMetricBytes) Transform(ctx context.Context, msg *message.Message) ([]*message.Message, error) {
if msg.IsControl() {
if err := tf.metric.Generate(ctx, metrics.Data{
Name: tf.conf.Metric.Name,
Value: tf.bytes,
Attributes: tf.conf.Metric.Attributes,
}); err != nil {
return nil, fmt.Errorf("transform: utility_metric_bytes: %v", err)
}

atomic.StoreUint32(&tf.bytes, 0)
return []*message.Message{msg}, nil
}

atomic.AddUint32(&tf.bytes, uint32(len(msg.Data())))
return []*message.Message{msg}, nil
}

func (tf *utilityMetricBytes) String() string {
b, _ := json.Marshal(tf.conf)
return string(b)
}

0 comments on commit d708580

Please sign in to comment.