diff --git a/plugins/outputs/all/opensearch.go b/plugins/outputs/all/opensearch.go new file mode 100644 index 0000000000000..3a0f964e5efa1 --- /dev/null +++ b/plugins/outputs/all/opensearch.go @@ -0,0 +1,5 @@ +//go:build !custom || outputs || outputs.opensearch + +package all + +import _ "github.com/influxdata/telegraf/plugins/outputs/opensearch" // register plugin diff --git a/plugins/outputs/opensearch/README.md b/plugins/outputs/opensearch/README.md new file mode 100644 index 0000000000000..5572b3ae30dba --- /dev/null +++ b/plugins/outputs/opensearch/README.md @@ -0,0 +1,352 @@ +# OpenSearch Output Plugin + +This plugin writes to [OpenSearch](https://opensearch.org/) via HTTP + +It supports OpenSearch releases from 1 and 2. Future comparability with 1.x is +not guaranteed and instead will focus on 2.x support. Consider using the +existing Elasticsearch plugin for 1.x. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# Configuration for OpenSearch to send metrics to. +[[outputs.OpenSearch]] + ## URLs + ## The full HTTP endpoint URL for your OpenSearch instance. Multiple URLs can + ## be specified as part of the same cluster, but only one URLs is used to + ## write during each interval. + urls = ["http://node1.os.example.com:9200"] + + ## Index Name + ## Target index name for metrics (OpenSearch will create if it not exists). + ## This is a Golang template (see https://pkg.go.dev/text/template) + ## You can also specify + ## metric name (`{{.Name}}`), tag value (`{{.Tag "tag_name"}}`), field value (`{{.Field "feild_name"}}`) + ## If the tag does not exist, the default tag value will be empty string "". + ## the timestamp (`{{.Time.Format "xxxxxxxxx"}}`). + ## For example: "telegraf-{{.Time.Format "2006-01-02"}}-{{.Tag "host"}}" would set it to telegraf-2023-07-27-HostName + index_name = "" + + ## Timeout + ## OpenSearch client timeout + # timeout = "5s" + + ## Sniffer + ## Set to true to ask OpenSearch a list of all cluster nodes, + ## thus it is not necessary to list all nodes in the urls config option + # enable_sniffer = false + + ## GZIP Compression + ## Set to true to enable gzip compression + # enable_gzip = false + + ## Health Check Interval + ## Set the interval to check if the OpenSearch nodes are available + ## Setting to "0s" will disable the health check (not recommended in production) + # health_check_interval = "10s" + + ## Set the timeout for periodic health checks. + # health_check_timeout = "1s" + ## HTTP basic authentication details. + # username = "" + # password = "" + ## HTTP bearer token authentication details + # auth_bearer_token = "" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Template Config + ## Manage templates + ## Set to true if you want telegraf to manage its index template. + ## If enabled it will create a recommended index template for telegraf indexes + # manage_template = true + + ## Template Name + ## The template name used for telegraf indexes + # template_name = "telegraf" + + ## Overwrite Templates + ## Set to true if you want telegraf to overwrite an existing template + # overwrite_template = false + + ## Document ID + ## If set to true a unique ID hash will be sent as + ## sha256(concat(timestamp,measurement,series-hash)) string. It will enable + ## data resend and update metric points avoiding duplicated metrics with + ## different id's + # force_document_id = false + + ## Value Handling + ## Specifies the handling of NaN and Inf values. + ## This option can have the following values: + ## none -- do not modify field-values (default); will produce an error + ## if NaNs or infs are encountered + ## drop -- drop fields containing NaNs or infs + ## replace -- replace with the value in "float_replacement_value" (default: 0.0) + ## NaNs and inf will be replaced with the given number, -inf with the negative of that number + # float_handling = "none" + # float_replacement_value = 0.0 + + ## Pipeline Config + ## To use a ingest pipeline, set this to the name of the pipeline you want to use. + # use_pipeline = "my_pipeline" + + ## Pipeline Name + ## Additionally, you can specify a tag name using the notation (`{{.Tag "tag_name"}}`) + ## which will be used as the pipeline name (e.g. "{{.Tag "os_pipeline"}}"). + ## If the tag does not exist, the default pipeline will be used as the pipeline. + ## If no default pipeline is set, no pipeline is used for the metric. + # default_pipeline = "" +``` + +### Required parameters + +* `urls`: A list containing the full HTTP URL of one or more nodes from your + OpenSearch instance. +* `index_name`: The target index for metrics. You can use the date format + +For example: "telegraf-{{.Time.Format "2006-01-02"}}" would set it to +"telegraf-2023-07-27". You can also specify metric name (`{{ .Name }}`), tag +value (`{{ .Tag "tag_name" }}`), and field value (`{{ .Field "field_name" }}`). + +If the tag does not exist, the default tag value will be empty string "" + +## Permissions + +If you are using authentication within your OpenSearch cluster, you need to +create an account and create a role with at least the manage role in the Cluster +Privileges category. Otherwise, your account will not be able to connect to your +OpenSearch cluster and send logs to your cluster. After that, you need to +add "create_index" and "write" permission to your specific index pattern. + +## OpenSearch indexes and templates + +### Indexes per time-frame + +This plugin can manage indexes per time-frame, as commonly done in other tools +with OpenSearch. The timestamp of the metric collected will be used to decide +the index destination. For more information about this usage on OpenSearch, +check [the docs][1]. + +[1]: https://opensearch.org/docs/latest/ + +### Template management + +Index templates are used in OpenSearch to define settings and mappings for +the indexes and how the fields should be analyzed. For more information on how +this works, see [the docs][2]. + +This plugin can create a working template for use with telegraf metrics. It uses +OpenSearch dynamic templates feature to set proper types for the tags and +metrics fields. If the template specified already exists, it will not overwrite +unless you configure this plugin to do so. Thus you can customize this template +after its creation if necessary. + +Example of an index template created by telegraf on OpenSearch 2.x: + +```json +{ + "telegraf-2022.10.02" : { + "aliases" : { }, + "mappings" : { + "properties" : { + "@timestamp" : { + "type" : "date" + }, + "disk" : { + "properties" : { + "free" : { + "type" : "long" + }, + "inodes_free" : { + "type" : "long" + }, + "inodes_total" : { + "type" : "long" + }, + "inodes_used" : { + "type" : "long" + }, + "total" : { + "type" : "long" + }, + "used" : { + "type" : "long" + }, + "used_percent" : { + "type" : "float" + } + } + }, + "measurement_name" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "tag" : { + "properties" : { + "cpu" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "device" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "host" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "mode" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "path" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + } + } + } + } + }, + "settings" : { + "index" : { + "creation_date" : "1664693522789", + "number_of_shards" : "1", + "number_of_replicas" : "1", + "uuid" : "TYugdmvsQfmxjzbGRJ8FIw", + "version" : { + "created" : "136247827" + }, + "provided_name" : "telegraf-2022.10.02" + } + } + } +} + +``` + +[2]: https://opensearch.org/docs/latest/opensearch/index-templates/ + +### Example events + +This plugin will format the events in the following way: + +```json +{ + "@timestamp": "2017-01-01T00:00:00+00:00", + "measurement_name": "cpu", + "cpu": { + "usage_guest": 0, + "usage_guest_nice": 0, + "usage_idle": 71.85413456197966, + "usage_iowait": 0.256805341656516, + "usage_irq": 0, + "usage_nice": 0, + "usage_softirq": 0.2054442732579466, + "usage_steal": 0, + "usage_system": 15.04879301548127, + "usage_user": 12.634822807288275 + }, + "tag": { + "cpu": "cpu-total", + "host": "opensearhhost", + "dc": "datacenter1" + } +} +``` + +```json +{ + "@timestamp": "2017-01-01T00:00:00+00:00", + "measurement_name": "system", + "system": { + "load1": 0.78, + "load15": 0.8, + "load5": 0.8, + "n_cpus": 2, + "n_users": 2 + }, + "tag": { + "host": "opensearhhost", + "dc": "datacenter1" + } +} +``` + +## Known issues + +Integer values collected that are bigger than 2^63 and smaller than 1e21 (or in +this exact same window of their negative counterparts) are encoded by golang +JSON encoder in decimal format and that is not fully supported by OpenSearch +dynamic field mapping. This causes the metrics with such values to be dropped in +case a field mapping has not been created yet on the telegraf index. If that's +the case you will see an exception on OpenSearch side like this: + +```json +{ + "error": { + "root_cause": [ + {"type": "mapper_parsing_exception", "reason": "failed to parse"} + ], + "type": "mapper_parsing_exception", + "reason": "failed to parse", + "caused_by": { + "type": "illegal_state_exception", + "reason": "No matching token for number_type [BIG_INTEGER]" + } + }, + "status": 400 +} +``` + +The correct field mapping will be created on the telegraf index as soon as a +supported JSON value is received by OpenSearch, and subsequent insertions +will work because the field mapping will already exist. + +This issue is caused by the way OpenSearch tries to detect integer fields, +and by how golang encodes numbers in JSON. There is no clear workaround for this +at the moment. diff --git a/plugins/outputs/opensearch/opensearch.go b/plugins/outputs/opensearch/opensearch.go new file mode 100644 index 0000000000000..02623e36a5576 --- /dev/null +++ b/plugins/outputs/opensearch/opensearch.go @@ -0,0 +1,445 @@ +//go:generate ../../../tools/readme_config_includer/generator +package opensearch + +import ( + "bytes" + "context" + "crypto/sha256" + "crypto/tls" + _ "embed" + "encoding/json" + "fmt" + "math" + "net/http" + "strconv" + "strings" + "text/template" + "time" + + "github.com/opensearch-project/opensearch-go/v2" + "github.com/opensearch-project/opensearch-go/v2/opensearchapi" + "github.com/opensearch-project/opensearch-go/v2/opensearchutil" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" + httpconfig "github.com/influxdata/telegraf/plugins/common/http" + "github.com/influxdata/telegraf/plugins/outputs" +) + +//go:embed sample.conf +var sampleConfig string + +type Opensearch struct { + Username config.Secret `toml:"username"` + Password config.Secret `toml:"password"` + AuthBearerToken config.Secret `toml:"auth_bearer_token"` + EnableGzip bool `toml:"enable_gzip"` + EnableSniffer bool `toml:"enable_sniffer"` + FloatHandling string `toml:"float_handling"` + FloatReplacement float64 `toml:"float_replacement_value"` + ForceDocumentID bool `toml:"force_document_id"` + IndexName string `toml:"index_name"` + TemplateName string `toml:"template_name"` + ManageTemplate bool `toml:"manage_template"` + OverwriteTemplate bool `toml:"overwrite_template"` + pipelineName string + DefaultPipeline string `toml:"default_pipeline"` + UsePipeline string `toml:"use_pipeline"` + Timeout config.Duration `toml:"timeout"` + HealthCheckInterval config.Duration `toml:"health_check_interval"` + HealthCheckTimeout config.Duration `toml:"health_check_timeout"` + URLs []string `toml:"urls"` + Log telegraf.Logger `toml:"-"` + indexTmpl *template.Template + pipelineTmpl *template.Template + onSucc func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem) + onFail func(context.Context, opensearchutil.BulkIndexerItem, opensearchutil.BulkIndexerResponseItem, error) + configOptions httpconfig.HTTPClientConfig + osClient *opensearch.Client +} + +//go:embed template.json +var indexTemplate string + +type templatePart struct { + TemplatePattern string +} + +func (*Opensearch) SampleConfig() string { + return sampleConfig +} + +func (o *Opensearch) Init() error { + if len(o.URLs) == 0 || o.IndexName == "" { + return fmt.Errorf("opensearch urls or index_name is not defined") + } + + // Determine if we should process NaN and inf values + valOptions := []string{"", "none", "drop", "replace"} + if err := choice.Check(o.FloatHandling, valOptions); err != nil { + return fmt.Errorf("config float_handling type: %w", err) + } + + if o.FloatHandling == "" { + o.FloatHandling = "none" + } + + indexTmpl, err := template.New("index").Parse(o.IndexName) + if err != nil { + return fmt.Errorf("error parsing index_name template: %w", err) + } + o.indexTmpl = indexTmpl + + pipelineTmpl, err := template.New("index").Parse(o.UsePipeline) + if err != nil { + return fmt.Errorf("error parsing use_pipeline template: %w", err) + } + o.pipelineTmpl = pipelineTmpl + + o.onSucc = func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem) { + o.Log.Debugf("Indexed to OpenSearch with status- [%d] Result- %s DocumentID- %s ", res.Status, res.Result, res.DocumentID) + } + o.onFail = func(ctx context.Context, item opensearchutil.BulkIndexerItem, res opensearchutil.BulkIndexerResponseItem, err error) { + if err != nil { + o.Log.Errorf("error while OpenSearch bulkIndexing: %v", err) + } else { + o.Log.Errorf("error while OpenSearch bulkIndexing: %s: %s", res.Error.Type, res.Error.Reason) + } + } + + if o.TemplateName == "" { + return fmt.Errorf("template_name configuration not defined") + } + + return nil +} + +func init() { + outputs.Add("opensearch", func() telegraf.Output { + return &Opensearch{ + Timeout: config.Duration(time.Second * 5), + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + } + }) +} + +func (o *Opensearch) Connect() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout)) + defer cancel() + + err := o.newClient() + if err != nil { + o.Log.Errorf("error creating OpenSearch client: %v", err) + } + + if o.ManageTemplate { + err := o.manageTemplate(ctx) + if err != nil { + return err + } + } + + return nil +} + +func (o *Opensearch) newClient() error { + username, err := o.Username.Get() + if err != nil { + return fmt.Errorf("getting username failed: %w", err) + } + defer config.ReleaseSecret(username) + + password, err := o.Password.Get() + if err != nil { + return fmt.Errorf("getting password failed: %w", err) + } + defer config.ReleaseSecret(password) + + clientConfig := opensearch.Config{ + Addresses: o.URLs, + Username: string(username), + Password: string(password), + } + + if o.configOptions.InsecureSkipVerify { + clientConfig.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } + + header := http.Header{} + if o.EnableGzip { + header.Add("Content-Encoding", "gzip") + header.Add("Content-Type", "application/json") + header.Add("Accept-Encoding", "gzip") + } + + if !o.AuthBearerToken.Empty() { + token, err := o.AuthBearerToken.Get() + if err != nil { + return fmt.Errorf("getting token failed: %w", err) + } + if string(token) != "" { + header.Add("Authorization", "Bearer "+string(token)) + } + } + clientConfig.Header = header + + client, err := opensearch.NewClient(clientConfig) + o.osClient = client + + return err +} + +// getPointID generates a unique ID for a Metric Point +// Timestamp(ns),measurement name and Series Hash for compute the final +// SHA256 based hash ID +func getPointID(m telegraf.Metric) string { + var buffer bytes.Buffer + buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10)) + buffer.WriteString(m.Name()) + buffer.WriteString(strconv.FormatUint(m.HashID(), 10)) + + return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes())) +} + +func (o *Opensearch) Write(metrics []telegraf.Metric) error { + // get indexers based on unique pipeline values + indexers := getTargetIndexers(metrics, o) + if len(indexers) == 0 { + return fmt.Errorf("failed to instantiate OpenSearch bulkindexer") + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Timeout)) + defer cancel() + + for _, metric := range metrics { + var name = metric.Name() + + // index name has to be re-evaluated each time for telegraf + // to send the metric to the correct time-based index + indexName, err := o.GetIndexName(metric) + if err != nil { + return fmt.Errorf("generating indexname failed: %w", err) + } + + // Handle NaN and inf field-values + fields := make(map[string]interface{}) + for k, value := range metric.Fields() { + v, ok := value.(float64) + if !ok || o.FloatHandling == "none" || !(math.IsNaN(v) || math.IsInf(v, 0)) { + fields[k] = value + continue + } + if o.FloatHandling == "drop" { + continue + } + + if math.IsNaN(v) || math.IsInf(v, 1) { + fields[k] = o.FloatReplacement + } else { + fields[k] = -o.FloatReplacement + } + } + + m := make(map[string]interface{}) + + m["@timestamp"] = metric.Time() + m["measurement_name"] = name + m["tag"] = metric.Tags() + m[name] = fields + + body, err := json.Marshal(m) + if err != nil { + return fmt.Errorf("failed to marshal body: %w", err) + } + + bulkIndxrItem := opensearchutil.BulkIndexerItem{ + Action: "index", + Index: indexName, + Body: strings.NewReader(string(body)), + OnSuccess: o.onSucc, + OnFailure: o.onFail, + } + if o.ForceDocumentID { + bulkIndxrItem.DocumentID = getPointID(metric) + } + + if o.UsePipeline != "" { + pipelineName, err := o.getPipelineName(metric) + if err != nil { + return fmt.Errorf("failed to evaluate pipeline name: %w", err) + } + + if pipelineName != "" { + if indexers[pipelineName] != nil { + if err := indexers[pipelineName].Add(ctx, bulkIndxrItem); err != nil { + o.Log.Errorf("error adding metric entry to OpenSearch bulkIndexer: %v for pipeline %s", err, pipelineName) + } + continue + } + } + } + + if err := indexers["default"].Add(ctx, bulkIndxrItem); err != nil { + o.Log.Errorf("error adding metric entry to OpenSearch default bulkIndexer: %v", err) + } + } + + for _, bulkIndxr := range indexers { + if err := bulkIndxr.Close(ctx); err != nil { + return fmt.Errorf("error sending bulk request to OpenSearch: %w", err) + } + + // Report the indexer statistics + stats := bulkIndxr.Stats() + if stats.NumAdded < uint64(len(metrics)) { + return fmt.Errorf("indexed [%d] documents with [%d] errors", stats.NumAdded, stats.NumFailed) + } + o.Log.Debugf("Successfully indexed [%d] documents", stats.NumAdded) + } + + return nil +} + +// BulkIndexer supports pipeline at config level so seperate indexer instance for each unique pipeline +func getTargetIndexers(metrics []telegraf.Metric, osInst *Opensearch) map[string]opensearchutil.BulkIndexer { + var indexers = make(map[string]opensearchutil.BulkIndexer) + + if osInst.UsePipeline != "" { + for _, metric := range metrics { + pipelineName, err := osInst.getPipelineName(metric) + if err != nil { + osInst.Log.Errorf("error while evaluating pipeline name: %v for pipeline %s", err, pipelineName) + } + + if pipelineName != "" { + // BulkIndexer supports pipeline at config level not metric level + if _, ok := indexers[osInst.pipelineName]; ok { + continue + } + bulkIndxr, err := createBulkIndexer(osInst, pipelineName) + if err != nil { + osInst.Log.Errorf("error while intantiating OpenSearch NewBulkIndexer: %v for pipeline: %s", err, pipelineName) + } else { + indexers[pipelineName] = bulkIndxr + } + } + } + } + + bulkIndxr, err := createBulkIndexer(osInst, "") + if err != nil { + osInst.Log.Errorf("error while intantiating OpenSearch NewBulkIndexer: %v for default pipeline", err) + } else { + indexers["default"] = bulkIndxr + } + return indexers +} + +func createBulkIndexer(osInst *Opensearch, pipelineName string) (opensearchutil.BulkIndexer, error) { + var bulkIndexerConfig = opensearchutil.BulkIndexerConfig{ + Client: osInst.osClient, + NumWorkers: 4, // The number of worker goroutines (default: number of CPUs) + FlushBytes: 5e+6, // The flush threshold in bytes (default: 5M) + } + if pipelineName != "" { + bulkIndexerConfig.Pipeline = pipelineName + } + + return opensearchutil.NewBulkIndexer(bulkIndexerConfig) +} + +func (o *Opensearch) GetIndexName(metric telegraf.Metric) (string, error) { + var buf bytes.Buffer + err := o.indexTmpl.Execute(&buf, metric) + + if err != nil { + return "", fmt.Errorf("creating index name failed: %w", err) + } + var indexName = buf.String() + if strings.Contains(indexName, "{{") { + return "", fmt.Errorf("failed to evaluate valid indexname: %s", indexName) + } + o.Log.Debugf("indexName- %s", indexName) + return indexName, nil +} + +func (o *Opensearch) getPipelineName(metric telegraf.Metric) (string, error) { + if o.UsePipeline == "" || !strings.Contains(o.UsePipeline, "{{") { + return o.UsePipeline, nil + } + + var buf bytes.Buffer + err := o.pipelineTmpl.Execute(&buf, metric) + if err != nil { + return "", fmt.Errorf("creating pipeline name failed: %w", err) + } + var pipelineName = buf.String() + if strings.Contains(pipelineName, "{{") { + return "", fmt.Errorf("failed to evaluate valid pipelineName: %s", pipelineName) + } + o.Log.Debugf("PipelineTemplate- %s", pipelineName) + + if pipelineName == "" { + pipelineName = o.DefaultPipeline + } + return pipelineName, nil +} + +func (o *Opensearch) manageTemplate(ctx context.Context) error { + tempReq := opensearchapi.CatTemplatesRequest{ + Name: o.TemplateName, + } + + resp, err := tempReq.Do(ctx, o.osClient.Transport) + if err != nil { + return fmt.Errorf("template check failed, template name: %s, error: %w", o.TemplateName, err) + } + + templateExists := resp.Body != http.NoBody + templatePattern := o.IndexName + + if strings.Contains(templatePattern, "{{") { + templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")] + } + + if templatePattern == "" { + return fmt.Errorf("template cannot be created for dynamic index names without an index prefix") + } + + if o.OverwriteTemplate || !templateExists || templatePattern != "" { + tp := templatePart{ + TemplatePattern: templatePattern + "*", + } + + t := template.Must(template.New("template").Parse(indexTemplate)) + var tmpl bytes.Buffer + + if err := t.Execute(&tmpl, tp); err != nil { + return err + } + + indexTempReq := opensearchapi.IndicesPutTemplateRequest{ + Name: o.TemplateName, + Body: strings.NewReader(tmpl.String()), + } + indexTempResp, err := indexTempReq.Do(ctx, o.osClient.Transport) + + if err != nil || indexTempResp.StatusCode != 200 { + return fmt.Errorf("creating index template %q failed: %w", o.TemplateName, err) + } + + o.Log.Debugf("Template %s created or updated", o.TemplateName) + } else { + o.Log.Debug("Found existing OpenSearch template. Skipping template management") + } + return nil +} + +func (o *Opensearch) Close() error { + o.osClient = nil + return nil +} diff --git a/plugins/outputs/opensearch/opensearch_test.go b/plugins/outputs/opensearch/opensearch_test.go new file mode 100644 index 0000000000000..0659e5469f2ed --- /dev/null +++ b/plugins/outputs/opensearch/opensearch_test.go @@ -0,0 +1,254 @@ +package opensearch + +import ( + "net/http" + "net/http/httptest" + "testing" + "text/template" + "time" + + "github.com/docker/go-connections/nat" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" +) + +const servicePort = "9200" +const imageVersion1 = "1.1.0" +const imageVersion2 = "2.8.0" + +func launchTestContainer(t *testing.T, imageVersion string) *testutil.Container { + container := testutil.Container{ + Image: "opensearchproject/opensearch:" + imageVersion, + ExposedPorts: []string{servicePort}, + Env: map[string]string{ + "discovery.type": "single-node", + "DISABLE_INSTALL_DEMO_CONFIG": "true", + "DISABLE_SECURITY_PLUGIN": "true", + }, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Init AD version hash ring successfully"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + + return &container +} + +func TestGetIndexName(t *testing.T) { + e := &Opensearch{ + Log: testutil.Logger{}, + } + + tests := []struct { + EventTime time.Time + Tags map[string]string + IndexName string + Expected string + }{ + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + "indexname", + "indexname", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{}, + `indexname-{{.Time.Format "2006-01-02"}}`, + "indexname-2014-12-01", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{}, + `indexname-{{.Tag "tag2"}}-{{.Time.Format "2006-01-02"}}`, + "indexname--2014-12-01", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1"}, + `indexname-{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`, + "indexname-value1-2014-12-01", + }, + { + time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC), + map[string]string{"tag1": "value1", "tag2": "value2"}, + `indexname-{{.Tag "tag1"}}-{{.Tag "tag2"}}-{{.Tag "tag3"}}-{{.Time.Format "2006-01-02"}}`, + "indexname-value1-value2--2014-12-01", + }, + } + for _, test := range tests { + mockMetric := testutil.MockMetrics()[0] + mockMetric.SetTime(test.EventTime) + for key, val := range test.Tags { + mockMetric.AddTag(key, val) + } + e.indexTmpl, _ = template.New("index").Parse(test.IndexName) + indexName, err := e.GetIndexName(mockMetric) + require.NoError(t, err) + if indexName != test.Expected { + t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName) + } + } +} + +func TestGetPipelineName(t *testing.T) { + e := &Opensearch{ + DefaultPipeline: "myDefaultPipeline", + Log: testutil.Logger{}, + } + + tests := []struct { + Tags map[string]string + PipelineTagKeys []string + UsePipeline string + Expected string + }{ + { + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + `{{.Tag "es-pipeline"}}`, + "myDefaultPipeline", + }, + { + map[string]string{"tag1": "value1", "tag2": "value2"}, + []string{}, + ``, + "", + }, + { + map[string]string{"tag1": "value1", "es-pipeline": "myOtherPipeline"}, + []string{}, + `{{.Tag "es-pipeline"}}`, + "myOtherPipeline", + }, + { + map[string]string{"tag1": "pipeline2", "es-pipeline": "myOtherPipeline"}, + []string{}, + `{{.Tag "tag1"}}`, + "pipeline2", + }, + } + for _, test := range tests { + e.UsePipeline = test.UsePipeline + e.pipelineTmpl, _ = template.New("index").Parse(test.UsePipeline) + mockMetric := testutil.MockMetrics()[0] + for key, val := range test.Tags { + mockMetric.AddTag(key, val) + } + + pipelineName, err := e.getPipelineName(mockMetric) + require.NoError(t, err) + require.Equal(t, test.Expected, pipelineName) + } +} + +func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/_bulk": + require.Equal(t, "gzip", r.Header.Get("Content-Encoding")) + require.Equal(t, "gzip", r.Header.Get("Accept-Encoding")) + _, err := w.Write([]byte("{}")) + require.NoError(t, err) + return + default: + _, err := w.Write([]byte(`{"version": {"number": "7.8"}}`)) + require.NoError(t, err) + return + } + })) + defer ts.Close() + + urls := []string{"http://" + ts.Listener.Addr().String()} + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: true, + ManageTemplate: false, + Log: testutil.Logger{}, + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + var indexName, err = e.GetIndexName(testutil.MockMetrics()[0]) + require.NoError(t, err) + e.IndexName = indexName + + err = e.Connect() + require.NoError(t, err) + + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestRequestHeaderWhenGzipIsDisabled(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/_bulk": + require.NotEqual(t, "gzip", r.Header.Get("Content-Encoding")) + _, err := w.Write([]byte("{}")) + require.NoError(t, err) + return + default: + _, err := w.Write([]byte(`{"version": {"number": "7.8"}}`)) + require.NoError(t, err) + return + } + })) + defer ts.Close() + + urls := []string{"http://" + ts.Listener.Addr().String()} + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: false, + Log: testutil.Logger{}, + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + err := e.Connect() + require.NoError(t, err) + + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestAuthorizationHeaderWhenBearerTokenIsPresent(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/_bulk": + require.Equal(t, "Bearer 0123456789abcdef", r.Header.Get("Authorization")) + _, err := w.Write([]byte("{}")) + require.NoError(t, err) + return + default: + _, err := w.Write([]byte(`{"version": {"number": "7.8"}}`)) + require.NoError(t, err) + return + } + })) + defer ts.Close() + + urls := []string{"http://" + ts.Listener.Addr().String()} + + e := &Opensearch{ + URLs: urls, + IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: false, + Log: testutil.Logger{}, + AuthBearerToken: config.NewSecret([]byte("0123456789abcdef")), + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + err := e.Connect() + require.NoError(t, err) + + err = e.Write(testutil.MockMetrics()) + require.NoError(t, err) +} diff --git a/plugins/outputs/opensearch/opensearch_v1_test.go b/plugins/outputs/opensearch/opensearch_v1_test.go new file mode 100644 index 0000000000000..47c7110709a5e --- /dev/null +++ b/plugins/outputs/opensearch/opensearch_v1_test.go @@ -0,0 +1,330 @@ +package opensearch + +import ( + "context" + "fmt" + "math" + "testing" + "text/template" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWriteIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + Log: testutil.Logger{}, + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + // Verify that we can connect to Opensearch + require.NoError(t, e.Connect()) + + // Verify that we can successfully write data to Opensearch + require.NoError(t, e.Write(testutil.MockMetrics())) +} + +func TestConnectAndWriteMetricWithNaNValueEmptyIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Opensearch + require.NoError(t, e.Init()) + require.NoError(t, e.Connect()) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + require.Error(t, e.Write([]telegraf.Metric{m}), "error sending bulk request to Opensearch: "+ + "json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueNoneIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: "none", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.Error(t, err, "error sending bulk request to Opensearch: json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueDropIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: "drop", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.NoError(t, err) + } +} + +func TestConnectAndWriteMetricWithNaNValueReplacementIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + tests := []struct { + floatHandle string + floatReplacement float64 + expectError bool + }{ + { + "none", + 0.0, + true, + }, + { + "drop", + 0.0, + false, + }, + { + "replace", + 0.0, + false, + }, + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + for _, test := range tests { + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: test.floatHandle, + FloatReplacement: test.floatReplacement, + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + err := e.Connect() + require.NoError(t, err) + + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + + if test.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + } + } +} + +func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + + err := e.Init() + require.Error(t, err) +} + +func TestTemplateManagementIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + + err := e.Connect() + require.NoError(t, err) + + err = e.manageTemplate(ctx) + require.NoError(t, err) +} + +func TestTemplateInvalidIndexPatternIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion1) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + err := e.Connect() + require.Error(t, err) +} diff --git a/plugins/outputs/opensearch/opensearch_v2_test.go b/plugins/outputs/opensearch/opensearch_v2_test.go new file mode 100644 index 0000000000000..2371afb0138ce --- /dev/null +++ b/plugins/outputs/opensearch/opensearch_v2_test.go @@ -0,0 +1,330 @@ +package opensearch + +import ( + "context" + "fmt" + "math" + "testing" + "text/template" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestConnectAndWriteIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + Log: testutil.Logger{}, + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + // Verify that we can connect to Opensearch + require.NoError(t, e.Connect()) + + // Verify that we can successfully write data to Opensearch + require.NoError(t, e.Write(testutil.MockMetrics())) +} + +func TestConnectAndWriteMetricWithNaNValueEmptyIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + + // Verify that we can connect to Opensearch + require.NoError(t, e.Init()) + require.NoError(t, e.Connect()) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + require.Error(t, e.Write([]telegraf.Metric{m}), "error sending bulk request to Opensearch: "+ + "json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueNoneIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: "none", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.Error(t, err, "error sending bulk request to Opensearch: json: unsupported value: NaN") + } +} + +func TestConnectAndWriteMetricWithNaNValueDropIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: "drop", + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + // Verify that we can connect to Opensearch + err := e.Connect() + require.NoError(t, err) + + // Verify that we can fail for metric with unhandled NaN/inf/-inf values + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + require.NoError(t, err) + } +} + +func TestConnectAndWriteMetricWithNaNValueReplacementIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + tests := []struct { + floatHandle string + floatReplacement float64 + expectError bool + }{ + { + "none", + 0.0, + true, + }, + { + "drop", + 0.0, + false, + }, + { + "replace", + 0.0, + false, + }, + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + for _, test := range tests { + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: false, + HealthCheckInterval: config.Duration(time.Second * 10), + HealthCheckTimeout: config.Duration(time.Second * 1), + FloatHandling: test.floatHandle, + FloatReplacement: test.floatReplacement, + Log: testutil.Logger{}, + } + + metrics := []telegraf.Metric{ + testutil.TestMetric(math.NaN()), + testutil.TestMetric(math.Inf(1)), + testutil.TestMetric(math.Inf(-1)), + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + err := e.Connect() + require.NoError(t, err) + + for _, m := range metrics { + err = e.Write([]telegraf.Metric{m}) + + if test.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + } + } +} + +func TestTemplateManagementEmptyTemplateIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + + err := e.Init() + require.Error(t, err) +} + +func TestTemplateManagementIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `test-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) + defer cancel() + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + + err := e.Connect() + require.NoError(t, err) + + err = e.manageTemplate(ctx) + require.NoError(t, err) +} + +func TestTemplateInvalidIndexPatternIntegrationV2(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t, imageVersion2) + defer container.Terminate() + + urls := []string{ + fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]), + } + + e := &Opensearch{ + URLs: urls, + IndexName: `{{.Tag "tag1"}}-{{.Time.Format "2006-01-02"}}`, + Timeout: config.Duration(time.Second * 5), + EnableGzip: false, + ManageTemplate: true, + TemplateName: "telegraf", + OverwriteTemplate: true, + Log: testutil.Logger{}, + } + e.indexTmpl, _ = template.New("index").Parse(e.IndexName) + err := e.Connect() + require.Error(t, err) +} diff --git a/plugins/outputs/opensearch/sample.conf b/plugins/outputs/opensearch/sample.conf new file mode 100644 index 0000000000000..c4a5451c7f41d --- /dev/null +++ b/plugins/outputs/opensearch/sample.conf @@ -0,0 +1,93 @@ +# Configuration for OpenSearch to send metrics to. +[[outputs.OpenSearch]] + ## URLs + ## The full HTTP endpoint URL for your OpenSearch instance. Multiple URLs can + ## be specified as part of the same cluster, but only one URLs is used to + ## write during each interval. + urls = ["http://node1.os.example.com:9200"] + + ## Index Name + ## Target index name for metrics (OpenSearch will create if it not exists). + ## This is a Golang template (see https://pkg.go.dev/text/template) + ## You can also specify + ## metric name (`{{.Name}}`), tag value (`{{.Tag "tag_name"}}`), field value (`{{.Field "feild_name"}}`) + ## If the tag does not exist, the default tag value will be empty string "". + ## the timestamp (`{{.Time.Format "xxxxxxxxx"}}`). + ## For example: "telegraf-{{.Time.Format "2006-01-02"}}-{{.Tag "host"}}" would set it to telegraf-2023-07-27-HostName + index_name = "" + + ## Timeout + ## OpenSearch client timeout + # timeout = "5s" + + ## Sniffer + ## Set to true to ask OpenSearch a list of all cluster nodes, + ## thus it is not necessary to list all nodes in the urls config option + # enable_sniffer = false + + ## GZIP Compression + ## Set to true to enable gzip compression + # enable_gzip = false + + ## Health Check Interval + ## Set the interval to check if the OpenSearch nodes are available + ## Setting to "0s" will disable the health check (not recommended in production) + # health_check_interval = "10s" + + ## Set the timeout for periodic health checks. + # health_check_timeout = "1s" + ## HTTP basic authentication details. + # username = "" + # password = "" + ## HTTP bearer token authentication details + # auth_bearer_token = "" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Template Config + ## Manage templates + ## Set to true if you want telegraf to manage its index template. + ## If enabled it will create a recommended index template for telegraf indexes + # manage_template = true + + ## Template Name + ## The template name used for telegraf indexes + # template_name = "telegraf" + + ## Overwrite Templates + ## Set to true if you want telegraf to overwrite an existing template + # overwrite_template = false + + ## Document ID + ## If set to true a unique ID hash will be sent as + ## sha256(concat(timestamp,measurement,series-hash)) string. It will enable + ## data resend and update metric points avoiding duplicated metrics with + ## different id's + # force_document_id = false + + ## Value Handling + ## Specifies the handling of NaN and Inf values. + ## This option can have the following values: + ## none -- do not modify field-values (default); will produce an error + ## if NaNs or infs are encountered + ## drop -- drop fields containing NaNs or infs + ## replace -- replace with the value in "float_replacement_value" (default: 0.0) + ## NaNs and inf will be replaced with the given number, -inf with the negative of that number + # float_handling = "none" + # float_replacement_value = 0.0 + + ## Pipeline Config + ## To use a ingest pipeline, set this to the name of the pipeline you want to use. + # use_pipeline = "my_pipeline" + + ## Pipeline Name + ## Additionally, you can specify a tag name using the notation (`{{.Tag "tag_name"}}`) + ## which will be used as the pipeline name (e.g. "{{.Tag "os_pipeline"}}"). + ## If the tag does not exist, the default pipeline will be used as the pipeline. + ## If no default pipeline is set, no pipeline is used for the metric. + # default_pipeline = "" diff --git a/plugins/outputs/opensearch/template.json b/plugins/outputs/opensearch/template.json new file mode 100644 index 0000000000000..ba1dcb71bc144 --- /dev/null +++ b/plugins/outputs/opensearch/template.json @@ -0,0 +1,55 @@ +{ + "index_patterns" : [ "{{.TemplatePattern}}" ], + "settings": { + "index": { + "refresh_interval": "10s", + "mapping.total_fields.limit": 5000, + "auto_expand_replicas" : "0-1", + "codec" : "best_compression" + } + }, + "mappings" : { + "properties" : { + "@timestamp" : { "type" : "date" }, + "measurement_name" : { "type" : "keyword" } + }, + "dynamic_templates": [ + { + "tags": { + "match_mapping_type": "string", + "path_match": "tag.*", + "mapping": { + "ignore_above": 512, + "type": "keyword" + } + } + }, + { + "metrics_long": { + "match_mapping_type": "long", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "metrics_double": { + "match_mapping_type": "double", + "mapping": { + "type": "float", + "index": false + } + } + }, + { + "text_fields": { + "match": "*", + "mapping": { + "norms": false + } + } + } + ] + } +} \ No newline at end of file