From fce840e5b959b064655853a3347078fa3a23eeb7 Mon Sep 17 00:00:00 2001 From: Bernt-Johan Bergshaven Date: Wed, 3 May 2023 13:22:27 +0200 Subject: [PATCH] fixups - fix change requests - add tests --- plugins/outputs/clarify/README.md | 14 +- plugins/outputs/clarify/clarify.go | 167 ++++++++----- plugins/outputs/clarify/clarify_test.go | 305 ++++++++++++++++++++++++ plugins/outputs/clarify/sample.conf | 11 +- 4 files changed, 422 insertions(+), 75 deletions(-) create mode 100644 plugins/outputs/clarify/clarify_test.go diff --git a/plugins/outputs/clarify/README.md b/plugins/outputs/clarify/README.md index 5ad331db9ec0f..dd400b3271914 100644 --- a/plugins/outputs/clarify/README.md +++ b/plugins/outputs/clarify/README.md @@ -15,16 +15,18 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Configuration ```toml @sample.conf -[[outputs.clarify]] -## Credentials File (OAuth 2.0 from Clarify integration). +## Credentials File (Oauth 2.0 from Clarify integration) credentials_file = "/path/to/clarify/credentials.json" -## Clarify username password (Basic Auth from Clarify integration). +## Clarify username password (Basic Auth from Clarify integration) username = "i-am-bob" password = "secret-password" -## Tags to be included when generating the unique ID for a signal in Clarify. -id_tags = ['sensor'] +## Timeout for Clarify operations +# timeout = "20s" + +## Optional tags to be included when generating the unique ID for a signal in Clarify +# id_tags = ['sensor'] ``` You can use either a credentials file or username/password. @@ -52,7 +54,7 @@ Strings and invalid numbers are ignored. [clarifydoc]: https://docs.clarify.io [credentials]: https://docs.clarify.io/users/admin/integrations/credentials -## Example parsing +## Example The following input would be stored in Clarify with the values shown below: diff --git a/plugins/outputs/clarify/clarify.go b/plugins/outputs/clarify/clarify.go index 8a0d32094ef8f..c46a465dd30bb 100644 --- a/plugins/outputs/clarify/clarify.go +++ b/plugins/outputs/clarify/clarify.go @@ -1,50 +1,73 @@ +//go:generate ../../../tools/readme_config_includer/generator + package clarify import ( "context" _ "embed" + "errors" "fmt" "strings" + "time" "github.com/clarify/clarify-go" "github.com/clarify/clarify-go/fields" "github.com/clarify/clarify-go/views" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" ) type Clarify struct { - Username string `toml:"username"` - Password string `toml:"password"` + Username config.Secret `toml:"username"` + Password config.Secret `toml:"password"` CredentialsFile string `toml:"credentials_file"` + Timeout config.Duration `toml:"timeout"` IDTags []string `toml:"id_tags"` Log telegraf.Logger `toml:"-"` client *clarify.Client } +var errIdTooLong = errors.New("id too long (>128)") + +const allowedIDRunes = `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_:.#+/` + //go:embed sample.conf var sampleConfig string func (c *Clarify) Connect() error { + // No blocking as it doesn't do any http requests, just sets up the necessarry Oauth2 client. ctx := context.Background() - if c.CredentialsFile != "" { + switch { + case c.CredentialsFile != "": creds, err := clarify.CredentialsFromFile(c.CredentialsFile) if err != nil { return err } c.client = creds.Client(ctx) return nil - } - if c.Username != "" && c.Password != "" { - creds := clarify.BasicAuthCredentials(c.Username, c.Password) + case !c.Username.Empty() && !c.Password.Empty(): + username, err := c.Username.Get() + if err != nil { + return fmt.Errorf("getting username failed: %w", err) + } + password, err := c.Password.Get() + if err != nil { + config.ReleaseSecret(username) + return fmt.Errorf("getting password failed: %w", err) + } + creds := clarify.BasicAuthCredentials(string(username), string(password)) c.client = creds.Client(ctx) + config.ReleaseSecret(username) + config.ReleaseSecret(password) return nil } return fmt.Errorf("no Clarify credentials provided") } -func verifyValue(v interface{}) (float64, error) { +func toFloat64(v interface{}) (float64, error) { var value float64 switch v := v.(type) { case bool: @@ -52,90 +75,102 @@ func verifyValue(v interface{}) (float64, error) { if v { value = float64(1) } - case uint8: - value = float64(v) - case uint16: - value = float64(v) - case uint32: - value = float64(v) - case uint64: - value = float64(v) - case int8: - value = float64(v) - case int16: - value = float64(v) - case int32: - value = float64(v) - case int64: - value = float64(v) - case float32: - value = float64(v) - case float64: - value = v + return value, nil default: - return value, fmt.Errorf("unsupported field type: %T", v) + return internal.ToFloat64(v) } - return value, nil } func (c *Clarify) Write(metrics []telegraf.Metric) error { + + frame, signals := c.processMetrics(metrics) + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(c.Timeout)) + defer cancel() + + if _, err := c.client.Insert(frame).Do(ctx); err != nil { + return err + } + + if _, err := c.client.SaveSignals(signals).Do(ctx); err != nil { + return err + } + + return nil +} + +func (c *Clarify) processMetrics(metrics []telegraf.Metric) (views.DataFrame, map[string]views.SignalSave) { signals := make(map[string]views.SignalSave) frame := views.DataFrame{} for _, m := range metrics { for _, f := range m.FieldList() { - if value, err := verifyValue(f.Value); err == nil { - id := c.generateID(m, f) - ts := fields.AsTimestamp(m.Time()) - - if _, ok := frame[id]; ok { - frame[id][ts] = value - } else { - frame[id] = views.DataSeries{ts: value} - } - - s := views.SignalSave{} - s.Name = fmt.Sprintf("%s.%s", m.Name(), f.Key) - - for _, t := range m.TagList() { - labelName := strings.ReplaceAll(t.Key, " ", "-") - labelName = strings.ReplaceAll(labelName, "_", "-") - labelName = strings.ToLower(labelName) - s.Labels.Add(labelName, t.Value) - } - - signals[id] = s + value, err := toFloat64(f.Value) + if err != nil { + c.Log.Warnf("Unable to add field `%s` for metric `%s` due to error '%v', skipping", f.Key, m.Name(), err) + continue + } + id, err := c.generateID(m, f) + if err != nil { + c.Log.Warnf("Unable to add field `%s` for metric `%s` due to error '%v', skipping", f.Key, m.Name(), err) + continue + } + ts := fields.AsTimestamp(m.Time()) + + if _, ok := frame[id]; ok { + frame[id][ts] = value } else { - c.Log.Infof("Unable to add field `%s` for metric `%s` due to error '%v', skipping", f.Key, m.Name(), err) + frame[id] = views.DataSeries{ts: value} } - } - } - if _, err := c.client.Insert(frame).Do(context.Background()); err != nil { - return err - } + s := views.SignalSave{} + s.Name = m.Name() + "." + f.Key - if _, err := c.client.SaveSignals(signals).Do(context.Background()); err != nil { - return err + for _, t := range m.TagList() { + labelName := strings.ReplaceAll(t.Key, " ", "-") + labelName = strings.ReplaceAll(labelName, "_", "-") + labelName = strings.ToLower(labelName) + s.Labels.Add(labelName, t.Value) + } + + signals[id] = s + + } } + return frame, signals +} - return nil +func normalizeID(id string) string { + return strings.Map(func(r rune) rune { + if strings.ContainsRune(allowedIDRunes, r) { + return r + } + return '_' + }, id) } -func (c *Clarify) generateID(m telegraf.Metric, f *telegraf.Field) string { +func (c *Clarify) generateID(m telegraf.Metric, f *telegraf.Field) (string, error) { var id string cid, exist := m.GetTag("clarify_input_id") if exist && len(m.FieldList()) == 1 { id = cid } else { - id = fmt.Sprintf("%s.%s", m.Name(), f.Key) + parts := make([]string, 0, len(c.IDTags)+2) + parts = append(parts, m.Name(), f.Key) + for _, idTag := range c.IDTags { - if m.HasTag(idTag) { - id = fmt.Sprintf("%s.%s", id, m.Tags()[idTag]) + if k, found := m.GetTag(idTag); found { + parts = append(parts, k) } } + id = strings.Join(parts, ".") } - return strings.ToLower(id) + id = normalizeID(id) + if len(id) > 128 { + return id, errIdTooLong + } + return id, nil } func (c *Clarify) SampleConfig() string { @@ -149,6 +184,8 @@ func (c *Clarify) Close() error { func init() { outputs.Add("clarify", func() telegraf.Output { - return &Clarify{} + return &Clarify{ + Timeout: config.Duration(20 * time.Second), + } }) } diff --git a/plugins/outputs/clarify/clarify_test.go b/plugins/outputs/clarify/clarify_test.go new file mode 100644 index 0000000000000..0dc4c5e2c3e67 --- /dev/null +++ b/plugins/outputs/clarify/clarify_test.go @@ -0,0 +1,305 @@ +package clarify + +import ( + "context" + "encoding/json" + "errors" + "math" + "reflect" + "testing" + "time" + + "github.com/clarify/clarify-go" + "github.com/clarify/clarify-go/jsonrpc" + "github.com/clarify/clarify-go/views" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var errTimeout = errors.New("timeout: operation timed out") + +const validResponse = `{ + "signalsByInput" : { + "test1.value" : { + "id": "c8bvu9fqfsjctpv7b6fg", + "created" : true + } + } +}` + +type MockHandler struct { + jsonResult string + sleep time.Duration +} + +func (m *MockHandler) Do(ctx context.Context, req jsonrpc.Request, result any) error { + err := json.Unmarshal([]byte(m.jsonResult), result) + if m.sleep > 0 { + timer1 := time.NewTimer(m.sleep) + select { + case <-ctx.Done(): + return errTimeout + case <-timer1.C: + timer1.Stop() + return nil + } + } + return err +} + +func TestGenerateID(t *testing.T) { + clfy := &Clarify{ + Log: testutil.Logger{}, + IDTags: []string{"tag1", "tag2"}, + } + var idTests = []struct { + inMetric telegraf.Metric + outId []string + err error + }{ + { + testutil.MustMetric( + "cpu+='''..2!@#$abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890", + map[string]string{ + "tag1": "78sx", + }, + map[string]interface{}{ + "time_idle": math.NaN(), + }, + time.Now()), + []string{"cpu.time_idle.78sx"}, + errIdTooLong, + }, + { + testutil.MustMetric( + "cpu@@", + map[string]string{ + "tag1": "78sx", + "tag2": "33t2", + }, + map[string]interface{}{ + "time_idle": math.NaN(), + }, + time.Now()), + []string{"cpu__.time_idle.78sx.33t2"}, + nil, + }, + { + testutil.MustMetric( + "temperature", + map[string]string{}, + map[string]interface{}{ + "cpu1": 12, + "cpu2": 13, + }, + time.Now()), + []string{"temperature.cpu1", "temperature.cpu2"}, + nil, + }, + { + testutil.MustMetric( + "legacy_measurement", + map[string]string{ + "clarify_input_id": "e5e82f63-3700-4997-835d-eb366b7294a2", + "xid": "78sx", + }, + map[string]interface{}{ + "value": 1337, + }, + time.Now()), + []string{"e5e82f63-3700-4997-835d-eb366b7294a2"}, + nil, + }, + } + for _, tt := range idTests { + for n, f := range tt.inMetric.FieldList() { + id, err := clfy.generateID(tt.inMetric, f) + if tt.err != nil { + require.ErrorIs(t, err, tt.err) + } else { + require.NoError(t, err) + if !reflect.DeepEqual(id, tt.outId[n]) { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outId[n], id) + } + } + } + } +} + +func TestProcessMetrics(t *testing.T) { + clfy := &Clarify{ + Log: testutil.Logger{}, + IDTags: []string{"tag1", "tag2", "node_id"}, + } + var idTests = []struct { + inMetric telegraf.Metric + outFrame views.DataFrame + outSignals map[string]views.SignalSave + }{ + { + testutil.MustMetric( + "cpu1", + map[string]string{ + "tag1": "78sx", + }, + map[string]interface{}{ + "time_idle": 1337.3, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "cpu1.time_idle.78sx": views.DataSeries{ + 1257894000000000: 1337.3, + }, + }, + map[string]views.SignalSave{ + "cpu1.time_idle.78sx": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "cpu1.time_idle", + Labels: map[string][]string{ + "tag1": {"78sx"}, + }, + }, + }, + }, + }, + { + testutil.MustMetric( + "cpu2", + map[string]string{ + "tag1": "78sx", + "tag2": "33t2", + }, + map[string]interface{}{ + "time_idle": 200, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "cpu2.time_idle.78sx.33t2": views.DataSeries{ + 1257894000000000: 200, + }, + }, + map[string]views.SignalSave{ + "cpu2.time_idle.78sx.33t2": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "cpu2.time_idle", + Labels: map[string][]string{ + "tag1": {"78sx"}, + "tag2": {"33t2"}, + }, + }, + }, + }, + }, + { + testutil.MustMetric( + "temperature", + map[string]string{}, + map[string]interface{}{ + "cpu1": 12, + "cpu2": 13, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "temperature.cpu1": views.DataSeries{ + 1257894000000000: 12, + }, + "temperature.cpu2": views.DataSeries{ + 1257894000000000: 13, + }, + }, + map[string]views.SignalSave{ + "temperature.cpu1": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "temperature.cpu1", + }, + }, + "temperature.cpu2": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "temperature.cpu2", + }, + }, + }, + }, + { + testutil.MustMetric( + "legacy_measurement", + map[string]string{ + "clarify_input_id": "e5e82f63-3700-4997-835d-eb366b7294a2", + "xid": "78sx", + }, + map[string]interface{}{ + "value": 123.333, + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "e5e82f63-3700-4997-835d-eb366b7294a2": views.DataSeries{ + 1257894000000000: 123.333, + }, + }, + map[string]views.SignalSave{ + "e5e82f63-3700-4997-835d-eb366b7294a2": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "legacy_measurement.value", + Labels: map[string][]string{ + "clarify-input-id": {"e5e82f63-3700-4997-835d-eb366b7294a2"}, + "xid": {"78sx"}, + }, + }, + }, + }, + }, + { + testutil.MustMetric( + "opc_metric", + map[string]string{ + "node_id": "ns=1;s=Omron PLC.Objects.new_Controller_0.GlobalVars.counter1", + }, + map[string]interface{}{ + "value": 12345.6789, + "quality": "GOOD", + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)), + views.DataFrame{ + "opc_metric.value.ns_1_s_Omron_PLC.Objects.new_Controller_0.GlobalVars.counter1": views.DataSeries{ + 1257894000000000: 12345.6789, + }, + }, + map[string]views.SignalSave{ + "opc_metric.value.ns_1_s_Omron_PLC.Objects.new_Controller_0.GlobalVars.counter1": { + SignalSaveAttributes: views.SignalSaveAttributes{ + Name: "opc_metric.value", + Labels: map[string][]string{ + "node-id": {"ns=1;s=Omron PLC.Objects.new_Controller_0.GlobalVars.counter1"}, + }, + }, + }, + }, + }, + } + for _, tt := range idTests { + of, os := clfy.processMetrics([]telegraf.Metric{tt.inMetric}) + if !reflect.DeepEqual(of, tt.outFrame) { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outFrame, of) + } + if !reflect.DeepEqual(os, tt.outSignals) { + t.Errorf("\nexpected %+v\ngot %+v\n", tt.outSignals, os) + } + } +} + +func TestTimeout(t *testing.T) { + clfy := &Clarify{ + Log: testutil.Logger{}, + Timeout: config.Duration(1 * time.Millisecond), + client: clarify.NewClient("c8bvu9fqfsjctpv7b6fg", &MockHandler{ + sleep: 6 * time.Millisecond, + jsonResult: validResponse, + }), + } + + metrics := []telegraf.Metric{} + err := clfy.Write(metrics) + require.ErrorIs(t, err, errTimeout) +} diff --git a/plugins/outputs/clarify/sample.conf b/plugins/outputs/clarify/sample.conf index 803474c4b81ce..9b65109948f4f 100644 --- a/plugins/outputs/clarify/sample.conf +++ b/plugins/outputs/clarify/sample.conf @@ -1,9 +1,12 @@ -## Credentials File (Oauth2 from Clarify Integration) +## Credentials File (Oauth 2.0 from Clarify integration) credentials_file = "/path/to/clarify/credentials.json" -## Clarify username password (Basic Auth from Clarify Integration) +## Clarify username password (Basic Auth from Clarify integration) username = "i-am-bob" password = "secret-password" -## Tags to be included when generating the unique ID for a signal in Clarify -id_tags = ['sensor'] +## Timeout for Clarify operations +# timeout = "20s" + +## Optional tags to be included when generating the unique ID for a signal in Clarify +# id_tags = ['sensor']