Skip to content

Commit

Permalink
fixups
Browse files Browse the repository at this point in the history
- fix change requests
- add tests
  • Loading branch information
bbergshaven committed May 3, 2023
1 parent 674dd1c commit fce840e
Show file tree
Hide file tree
Showing 4 changed files with 422 additions and 75 deletions.
14 changes: 8 additions & 6 deletions plugins/outputs/clarify/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Configuration

```toml @sample.conf
[[outputs.clarify]]
## Credentials File (OAuth 2.0 from Clarify integration).
## Credentials File (Oauth 2.0 from Clarify integration)
credentials_file = "/path/to/clarify/credentials.json"

## Clarify username password (Basic Auth from Clarify integration).
## Clarify username password (Basic Auth from Clarify integration)
username = "i-am-bob"
password = "secret-password"

## Tags to be included when generating the unique ID for a signal in Clarify.
id_tags = ['sensor']
## Timeout for Clarify operations
# timeout = "20s"

## Optional tags to be included when generating the unique ID for a signal in Clarify
# id_tags = ['sensor']
```

You can use either a credentials file or username/password.
Expand Down Expand Up @@ -52,7 +54,7 @@ Strings and invalid numbers are ignored.
[clarifydoc]: https://docs.clarify.io
[credentials]: https://docs.clarify.io/users/admin/integrations/credentials

## Example parsing
## Example

The following input would be stored in Clarify with the values shown below:

Expand Down
167 changes: 102 additions & 65 deletions plugins/outputs/clarify/clarify.go
Original file line number Diff line number Diff line change
@@ -1,141 +1,176 @@
//go:generate ../../../tools/readme_config_includer/generator

package clarify

import (
"context"
_ "embed"
"errors"
"fmt"
"strings"
"time"

"github.com/clarify/clarify-go"
"github.com/clarify/clarify-go/fields"
"github.com/clarify/clarify-go/views"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
)

type Clarify struct {
Username string `toml:"username"`
Password string `toml:"password"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
CredentialsFile string `toml:"credentials_file"`
Timeout config.Duration `toml:"timeout"`
IDTags []string `toml:"id_tags"`
Log telegraf.Logger `toml:"-"`

client *clarify.Client
}

var errIdTooLong = errors.New("id too long (>128)")

const allowedIDRunes = `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890-_:.#+/`

//go:embed sample.conf
var sampleConfig string

func (c *Clarify) Connect() error {
// No blocking as it doesn't do any http requests, just sets up the necessarry Oauth2 client.
ctx := context.Background()
if c.CredentialsFile != "" {
switch {
case c.CredentialsFile != "":
creds, err := clarify.CredentialsFromFile(c.CredentialsFile)
if err != nil {
return err
}
c.client = creds.Client(ctx)
return nil
}
if c.Username != "" && c.Password != "" {
creds := clarify.BasicAuthCredentials(c.Username, c.Password)
case !c.Username.Empty() && !c.Password.Empty():
username, err := c.Username.Get()
if err != nil {
return fmt.Errorf("getting username failed: %w", err)
}
password, err := c.Password.Get()
if err != nil {
config.ReleaseSecret(username)
return fmt.Errorf("getting password failed: %w", err)
}
creds := clarify.BasicAuthCredentials(string(username), string(password))
c.client = creds.Client(ctx)
config.ReleaseSecret(username)
config.ReleaseSecret(password)
return nil
}
return fmt.Errorf("no Clarify credentials provided")
}

func verifyValue(v interface{}) (float64, error) {
func toFloat64(v interface{}) (float64, error) {
var value float64
switch v := v.(type) {
case bool:
value = float64(0)
if v {
value = float64(1)
}
case uint8:
value = float64(v)
case uint16:
value = float64(v)
case uint32:
value = float64(v)
case uint64:
value = float64(v)
case int8:
value = float64(v)
case int16:
value = float64(v)
case int32:
value = float64(v)
case int64:
value = float64(v)
case float32:
value = float64(v)
case float64:
value = v
return value, nil
default:
return value, fmt.Errorf("unsupported field type: %T", v)
return internal.ToFloat64(v)
}
return value, nil
}

func (c *Clarify) Write(metrics []telegraf.Metric) error {

frame, signals := c.processMetrics(metrics)

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(c.Timeout))
defer cancel()

if _, err := c.client.Insert(frame).Do(ctx); err != nil {
return err
}

if _, err := c.client.SaveSignals(signals).Do(ctx); err != nil {
return err
}

return nil
}

func (c *Clarify) processMetrics(metrics []telegraf.Metric) (views.DataFrame, map[string]views.SignalSave) {
signals := make(map[string]views.SignalSave)
frame := views.DataFrame{}

for _, m := range metrics {
for _, f := range m.FieldList() {
if value, err := verifyValue(f.Value); err == nil {
id := c.generateID(m, f)
ts := fields.AsTimestamp(m.Time())

if _, ok := frame[id]; ok {
frame[id][ts] = value
} else {
frame[id] = views.DataSeries{ts: value}
}

s := views.SignalSave{}
s.Name = fmt.Sprintf("%s.%s", m.Name(), f.Key)

for _, t := range m.TagList() {
labelName := strings.ReplaceAll(t.Key, " ", "-")
labelName = strings.ReplaceAll(labelName, "_", "-")
labelName = strings.ToLower(labelName)
s.Labels.Add(labelName, t.Value)
}

signals[id] = s
value, err := toFloat64(f.Value)
if err != nil {
c.Log.Warnf("Unable to add field `%s` for metric `%s` due to error '%v', skipping", f.Key, m.Name(), err)
continue
}
id, err := c.generateID(m, f)
if err != nil {
c.Log.Warnf("Unable to add field `%s` for metric `%s` due to error '%v', skipping", f.Key, m.Name(), err)
continue
}
ts := fields.AsTimestamp(m.Time())

if _, ok := frame[id]; ok {
frame[id][ts] = value
} else {
c.Log.Infof("Unable to add field `%s` for metric `%s` due to error '%v', skipping", f.Key, m.Name(), err)
frame[id] = views.DataSeries{ts: value}
}
}
}

if _, err := c.client.Insert(frame).Do(context.Background()); err != nil {
return err
}
s := views.SignalSave{}
s.Name = m.Name() + "." + f.Key

if _, err := c.client.SaveSignals(signals).Do(context.Background()); err != nil {
return err
for _, t := range m.TagList() {
labelName := strings.ReplaceAll(t.Key, " ", "-")
labelName = strings.ReplaceAll(labelName, "_", "-")
labelName = strings.ToLower(labelName)
s.Labels.Add(labelName, t.Value)
}

signals[id] = s

}
}
return frame, signals
}

return nil
func normalizeID(id string) string {
return strings.Map(func(r rune) rune {
if strings.ContainsRune(allowedIDRunes, r) {
return r
}
return '_'
}, id)
}

func (c *Clarify) generateID(m telegraf.Metric, f *telegraf.Field) string {
func (c *Clarify) generateID(m telegraf.Metric, f *telegraf.Field) (string, error) {
var id string
cid, exist := m.GetTag("clarify_input_id")
if exist && len(m.FieldList()) == 1 {
id = cid
} else {
id = fmt.Sprintf("%s.%s", m.Name(), f.Key)
parts := make([]string, 0, len(c.IDTags)+2)
parts = append(parts, m.Name(), f.Key)

for _, idTag := range c.IDTags {
if m.HasTag(idTag) {
id = fmt.Sprintf("%s.%s", id, m.Tags()[idTag])
if k, found := m.GetTag(idTag); found {
parts = append(parts, k)
}
}
id = strings.Join(parts, ".")
}
return strings.ToLower(id)
id = normalizeID(id)
if len(id) > 128 {
return id, errIdTooLong
}
return id, nil
}

func (c *Clarify) SampleConfig() string {
Expand All @@ -149,6 +184,8 @@ func (c *Clarify) Close() error {

func init() {
outputs.Add("clarify", func() telegraf.Output {
return &Clarify{}
return &Clarify{
Timeout: config.Duration(20 * time.Second),
}
})
}
Loading

0 comments on commit fce840e

Please sign in to comment.