From a29b331977d6891face3df2b0eb0299cc69a5a4a Mon Sep 17 00:00:00 2001 From: Pawel Zak Date: Sat, 19 Oct 2024 11:17:22 +0200 Subject: [PATCH] chore: Fix linter findings for `revive:exported` in `plugins/inputs/e*` --- plugins/inputs/ecs/client.go | 37 +-- plugins/inputs/ecs/client_test.go | 52 ++--- plugins/inputs/ecs/ecs.go | 32 +-- plugins/inputs/ecs/ecs_test.go | 8 +- plugins/inputs/ecs/stats.go | 4 +- plugins/inputs/ecs/types.go | 24 +- plugins/inputs/elasticsearch/elasticsearch.go | 156 +++++++------ .../elasticsearch/elasticsearch_test.go | 4 +- .../elasticsearch_query.go | 73 +++--- plugins/inputs/ethtool/ethtool.go | 16 +- plugins/inputs/ethtool/ethtool_linux.go | 102 ++++----- plugins/inputs/ethtool/ethtool_test.go | 214 +++++++++--------- plugins/inputs/ethtool/namespace.go | 16 +- plugins/inputs/ethtool/namespace_linux.go | 76 +++---- .../eventhub_consumer/eventhub_consumer.go | 48 ++-- plugins/inputs/exec/exec.go | 176 +++++++------- plugins/inputs/exec/exec_test.go | 28 +-- plugins/inputs/exec/run_notwinodws.go | 2 +- plugins/inputs/exec/run_windows.go | 2 +- plugins/inputs/execd/execd.go | 14 +- plugins/inputs/execd/shim/goshim.go | 97 ++++---- plugins/inputs/execd/shim/input.go | 3 + 22 files changed, 582 insertions(+), 602 deletions(-) diff --git a/plugins/inputs/ecs/client.go b/plugins/inputs/ecs/client.go index 12f4bf2e2023a..203f3e57f6d9b 100644 --- a/plugins/inputs/ecs/client.go +++ b/plugins/inputs/ecs/client.go @@ -21,18 +21,18 @@ var ( ecsMetaStatsPath = "/task/stats" ) -// Client is the ECS client contract -type Client interface { - Task() (*Task, error) - ContainerStats() (map[string]*container.StatsResponse, error) +// client is the ECS client contract +type client interface { + task() (*ecsTask, error) + containerStats() (map[string]*container.StatsResponse, error) } type httpClient interface { Do(req *http.Request) (*http.Response, error) } -// NewClient constructs an ECS client with the passed configuration params -func NewClient(timeout time.Duration, endpoint string, version int) (*EcsClient, error) { +// newClient constructs an ECS client with the passed configuration params +func newClient(timeout time.Duration, endpoint string, version int) (*ecsClient, error) { if version < 2 || version > 4 { const msg = "expected metadata version 2, 3 or 4, got %d" return nil, fmt.Errorf(msg, version) @@ -47,7 +47,7 @@ func NewClient(timeout time.Duration, endpoint string, version int) (*EcsClient, Timeout: timeout, } - return &EcsClient{ + return &ecsClient{ client: c, baseURL: baseURL, taskURL: resolveTaskURL(baseURL, version), @@ -96,8 +96,8 @@ func resolveURL(base *url.URL, path string) string { return base.String() + path } -// EcsClient contains ECS connection config -type EcsClient struct { +// ecsClient contains ECS connection config +type ecsClient struct { client httpClient version int baseURL *url.URL @@ -105,8 +105,8 @@ type EcsClient struct { statsURL string } -// Task calls the ECS metadata endpoint and returns a populated Task -func (c *EcsClient) Task() (*Task, error) { +// task calls the ECS metadata endpoint and returns a populated task +func (c *ecsClient) task() (*ecsTask, error) { req, err := http.NewRequest("GET", c.taskURL, nil) if err != nil { return nil, err @@ -131,8 +131,8 @@ func (c *EcsClient) Task() (*Task, error) { return task, nil } -// ContainerStats calls the ECS stats endpoint and returns a populated container stats map -func (c *EcsClient) ContainerStats() (map[string]*container.StatsResponse, error) { +// containerStats calls the ECS stats endpoint and returns a populated container stats map +func (c *ecsClient) containerStats() (map[string]*container.StatsResponse, error) { req, err := http.NewRequest("GET", c.statsURL, nil) if err != nil { return nil, err @@ -153,18 +153,19 @@ func (c *EcsClient) ContainerStats() (map[string]*container.StatsResponse, error return unmarshalStats(resp.Body) } -// PollSync executes Task and ContainerStats in parallel. If both succeed, both structs are returned. +// pollSync executes task and containerStats in parallel. +// If both succeed, both structs are returned. // If either errors, a single error is returned. -func PollSync(c Client) (*Task, map[string]*container.StatsResponse, error) { - var task *Task +func pollSync(c client) (*ecsTask, map[string]*container.StatsResponse, error) { + var task *ecsTask var stats map[string]*container.StatsResponse var err error - if stats, err = c.ContainerStats(); err != nil { + if stats, err = c.containerStats(); err != nil { return nil, nil, err } - if task, err = c.Task(); err != nil { + if task, err = c.task(); err != nil { return nil, nil, err } diff --git a/plugins/inputs/ecs/client_test.go b/plugins/inputs/ecs/client_test.go index f451768fdccf1..84ca95656f16e 100644 --- a/plugins/inputs/ecs/client_test.go +++ b/plugins/inputs/ecs/client_test.go @@ -14,33 +14,33 @@ import ( ) type pollMock struct { - task func() (*Task, error) - stats func() (map[string]*container.StatsResponse, error) + getTask func() (*ecsTask, error) + getStats func() (map[string]*container.StatsResponse, error) } -func (p *pollMock) Task() (*Task, error) { - return p.task() +func (p *pollMock) task() (*ecsTask, error) { + return p.getTask() } -func (p *pollMock) ContainerStats() (map[string]*container.StatsResponse, error) { - return p.stats() +func (p *pollMock) containerStats() (map[string]*container.StatsResponse, error) { + return p.getStats() } func TestEcsClient_PollSync(t *testing.T) { tests := []struct { name string mock *pollMock - want *Task + want *ecsTask want1 map[string]*container.StatsResponse wantErr bool }{ { name: "success", mock: &pollMock{ - task: func() (*Task, error) { + getTask: func() (*ecsTask, error) { return &validMeta, nil }, - stats: func() (map[string]*container.StatsResponse, error) { + getStats: func() (map[string]*container.StatsResponse, error) { return validStats, nil }, }, @@ -50,10 +50,10 @@ func TestEcsClient_PollSync(t *testing.T) { { name: "task err", mock: &pollMock{ - task: func() (*Task, error) { + getTask: func() (*ecsTask, error) { return nil, errors.New("err") }, - stats: func() (map[string]*container.StatsResponse, error) { + getStats: func() (map[string]*container.StatsResponse, error) { return validStats, nil }, }, @@ -62,10 +62,10 @@ func TestEcsClient_PollSync(t *testing.T) { { name: "stats err", mock: &pollMock{ - task: func() (*Task, error) { + getTask: func() (*ecsTask, error) { return &validMeta, nil }, - stats: func() (map[string]*container.StatsResponse, error) { + getStats: func() (map[string]*container.StatsResponse, error) { return nil, errors.New("err") }, }, @@ -74,14 +74,14 @@ func TestEcsClient_PollSync(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got1, err := PollSync(tt.mock) + got, got1, err := pollSync(tt.mock) if (err != nil) != tt.wantErr { - t.Errorf("EcsClient.PollSync() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("ecsClient.pollSync() error = %v, wantErr %v", err, tt.wantErr) return } - require.Equal(t, tt.want, got, "EcsClient.PollSync() got = %v, want %v", got, tt.want) - require.Equal(t, tt.want1, got1, "EcsClient.PollSync() got1 = %v, want %v", got1, tt.want1) + require.Equal(t, tt.want, got, "ecsClient.pollSync() got = %v, want %v", got, tt.want) + require.Equal(t, tt.want1, got1, "ecsClient.pollSync() got1 = %v, want %v", got1, tt.want1) }) } } @@ -98,7 +98,7 @@ func TestEcsClient_Task(t *testing.T) { tests := []struct { name string client httpClient - want *Task + want *ecsTask wantErr bool }{ { @@ -154,16 +154,16 @@ func TestEcsClient_Task(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := &EcsClient{ + c := &ecsClient{ client: tt.client, taskURL: "abc", } - got, err := c.Task() + got, err := c.task() if (err != nil) != tt.wantErr { - t.Errorf("EcsClient.Task() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("ecsClient.task() error = %v, wantErr %v", err, tt.wantErr) return } - require.Equal(t, tt.want, got, "EcsClient.Task() = %v, want %v", got, tt.want) + require.Equal(t, tt.want, got, "ecsClient.task() = %v, want %v", got, tt.want) }) } } @@ -231,16 +231,16 @@ func TestEcsClient_ContainerStats(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := &EcsClient{ + c := &ecsClient{ client: tt.client, statsURL: "abc", } - got, err := c.ContainerStats() + got, err := c.containerStats() if (err != nil) != tt.wantErr { - t.Errorf("EcsClient.ContainerStats() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("ecsClient.containerStats() error = %v, wantErr %v", err, tt.wantErr) return } - require.Equal(t, tt.want, got, "EcsClient.ContainerStats() = %v, want %v", got, tt.want) + require.Equal(t, tt.want, got, "ecsClient.containerStats() = %v, want %v", got, tt.want) }) } } diff --git a/plugins/inputs/ecs/ecs.go b/plugins/inputs/ecs/ecs.go index 93ead677e74ae..b537c1fdc09c4 100644 --- a/plugins/inputs/ecs/ecs.go +++ b/plugins/inputs/ecs/ecs.go @@ -16,10 +16,13 @@ import ( //go:embed sample.conf var sampleConfig string -// Ecs config object +const ( + v2Endpoint = "http://169.254.170.2" +) + type Ecs struct { - EndpointURL string `toml:"endpoint_url"` - Timeout config.Duration + EndpointURL string `toml:"endpoint_url"` + Timeout config.Duration `toml:"timeout"` ContainerNameInclude []string `toml:"container_name_include"` ContainerNameExclude []string `toml:"container_name_exclude"` @@ -30,9 +33,9 @@ type Ecs struct { LabelInclude []string `toml:"ecs_label_include"` LabelExclude []string `toml:"ecs_label_exclude"` - newClient func(timeout time.Duration, endpoint string, version int) (*EcsClient, error) + newClient func(timeout time.Duration, endpoint string, version int) (*ecsClient, error) - client Client + client client filtersCreated bool labelFilter filter.Filter containerNameFilter filter.Filter @@ -40,28 +43,17 @@ type Ecs struct { metadataVersion int } -const ( - KB = 1000 - MB = 1000 * KB - GB = 1000 * MB - TB = 1000 * GB - PB = 1000 * TB - - v2Endpoint = "http://169.254.170.2" -) - func (*Ecs) SampleConfig() string { return sampleConfig } -// Gather is the entrypoint for telegraf metrics collection func (ecs *Ecs) Gather(acc telegraf.Accumulator) error { err := initSetup(ecs) if err != nil { return err } - task, stats, err := PollSync(ecs.client) + task, stats, err := pollSync(ecs.client) if err != nil { return err } @@ -145,7 +137,7 @@ func resolveEndpoint(ecs *Ecs) { ecs.metadataVersion = 2 } -func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) { +func (ecs *Ecs) accTask(task *ecsTask, tags map[string]string, acc telegraf.Accumulator) { taskFields := map[string]interface{}{ "desired_status": task.DesiredStatus, "known_status": task.KnownStatus, @@ -156,7 +148,7 @@ func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumul acc.AddFields("ecs_task", taskFields, tags) } -func (ecs *Ecs) accContainers(task *Task, taskTags map[string]string, acc telegraf.Accumulator) { +func (ecs *Ecs) accContainers(task *ecsTask, taskTags map[string]string, acc telegraf.Accumulator) { for i := range task.Containers { c := &task.Containers[i] if !ecs.containerNameFilter.Match(c.Name) { @@ -245,7 +237,7 @@ func init() { return &Ecs{ EndpointURL: "", Timeout: config.Duration(5 * time.Second), - newClient: NewClient, + newClient: newClient, filtersCreated: false, } }) diff --git a/plugins/inputs/ecs/ecs_test.go b/plugins/inputs/ecs/ecs_test.go index ec17bc4000cfb..4266181476eb8 100644 --- a/plugins/inputs/ecs/ecs_test.go +++ b/plugins/inputs/ecs/ecs_test.go @@ -697,14 +697,14 @@ var metaStarted = mustParseNano("2018-11-19T15:31:27.975996351Z") var metaPullStart = mustParseNano("2018-11-19T15:31:27.197327103Z") var metaPullStop = mustParseNano("2018-11-19T15:31:27.609089471Z") -var validMeta = Task{ +var validMeta = ecsTask{ Cluster: "test", TaskARN: "arn:aws:ecs:aws-region-1:012345678901:task/a1234abc-a0a0-0a01-ab01-0abc012a0a0a", Family: "nginx", Revision: "2", DesiredStatus: "RUNNING", KnownStatus: "RUNNING", - Containers: []Container{ + Containers: []ecsContainer{ { ID: pauseStatsKey, Name: "~internal~ecs~pause", @@ -727,7 +727,7 @@ var validMeta = Task{ CreatedAt: metaPauseCreated, StartedAt: metaPauseStarted, Type: "CNI_PAUSE", - Networks: []Network{ + Networks: []network{ { NetworkMode: "awsvpc", IPv4Addresses: []string{ @@ -758,7 +758,7 @@ var validMeta = Task{ CreatedAt: metaCreated, StartedAt: metaStarted, Type: "NORMAL", - Networks: []Network{ + Networks: []network{ { NetworkMode: "awsvpc", IPv4Addresses: []string{ diff --git a/plugins/inputs/ecs/stats.go b/plugins/inputs/ecs/stats.go index 042d6a2627959..7de0e42bcfc85 100644 --- a/plugins/inputs/ecs/stats.go +++ b/plugins/inputs/ecs/stats.go @@ -11,7 +11,7 @@ import ( "github.com/influxdata/telegraf/plugins/common/docker" ) -func parseContainerStats(c *Container, acc telegraf.Accumulator, tags map[string]string) { +func parseContainerStats(c *ecsContainer, acc telegraf.Accumulator, tags map[string]string) { id := c.ID stats := &c.Stats tm := stats.Read @@ -27,7 +27,7 @@ func parseContainerStats(c *Container, acc telegraf.Accumulator, tags map[string blkstats(id, stats, acc, tags, tm) } -func metastats(id string, c *Container, acc telegraf.Accumulator, tags map[string]string, tm time.Time) { +func metastats(id string, c *ecsContainer, acc telegraf.Accumulator, tags map[string]string, tm time.Time) { metafields := map[string]interface{}{ "container_id": id, "docker_name": c.DockerName, diff --git a/plugins/inputs/ecs/types.go b/plugins/inputs/ecs/types.go index 2e171d4d6dadd..86a57f5f0ab0f 100644 --- a/plugins/inputs/ecs/types.go +++ b/plugins/inputs/ecs/types.go @@ -9,22 +9,22 @@ import ( "github.com/docker/docker/api/types/container" ) -// Task is the ECS task representation -type Task struct { +// ecsTask is the ECS task representation +type ecsTask struct { Cluster string TaskARN string Family string Revision string DesiredStatus string KnownStatus string - Containers []Container + Containers []ecsContainer Limits map[string]float64 PullStartedAt time.Time PullStoppedAt time.Time } -// Container is the ECS metadata container representation -type Container struct { +// ecsContainer is the ECS metadata container representation +type ecsContainer struct { ID string `json:"DockerId"` Name string DockerName string @@ -38,17 +38,17 @@ type Container struct { StartedAt time.Time Stats container.StatsResponse Type string - Networks []Network + Networks []network } -// Network is a docker network configuration -type Network struct { +// network is a docker network configuration +type network struct { NetworkMode string IPv4Addresses []string } -func unmarshalTask(r io.Reader) (*Task, error) { - task := &Task{} +func unmarshalTask(r io.Reader) (*ecsTask, error) { + task := &ecsTask{} err := json.NewDecoder(r).Decode(task) return task, err } @@ -62,8 +62,8 @@ func unmarshalStats(r io.Reader) (map[string]*container.StatsResponse, error) { return statsMap, nil } -// interleaves Stats in to the Container objects in the Task -func mergeTaskStats(task *Task, stats map[string]*container.StatsResponse) { +// interleaves Stats in to the Container objects in the ecsTask +func mergeTaskStats(task *ecsTask, stats map[string]*container.StatsResponse) { for i := range task.Containers { c := &task.Containers[i] if strings.Trim(c.ID, " ") == "" { diff --git a/plugins/inputs/elasticsearch/elasticsearch.go b/plugins/inputs/elasticsearch/elasticsearch.go index 199f2827190b3..7256a701a78bb 100644 --- a/plugins/inputs/elasticsearch/elasticsearch.go +++ b/plugins/inputs/elasticsearch/elasticsearch.go @@ -29,9 +29,38 @@ var sampleConfig string // mask for masking username/password from error messages var mask = regexp.MustCompile(`https?:\/\/\S+:\S+@`) -// Nodestats are always generated, so simply define a constant for these endpoints -const statsPath = "/_nodes/stats" -const statsPathLocal = "/_nodes/_local/stats" +const ( + // Node stats are always generated, so simply define a constant for these endpoints + statsPath = "/_nodes/stats" + statsPathLocal = "/_nodes/_local/stats" +) + +type Elasticsearch struct { + Local bool `toml:"local"` + Servers []string `toml:"servers"` + HTTPHeaders map[string]string `toml:"headers"` + HTTPTimeout config.Duration `toml:"http_timeout" deprecated:"1.29.0;1.35.0;use 'timeout' instead"` + ClusterHealth bool `toml:"cluster_health"` + ClusterHealthLevel string `toml:"cluster_health_level"` + ClusterStats bool `toml:"cluster_stats"` + ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"` + EnrichStats bool `toml:"enrich_stats"` + IndicesInclude []string `toml:"indices_include"` + IndicesLevel string `toml:"indices_level"` + NodeStats []string `toml:"node_stats"` + Username string `toml:"username"` + Password string `toml:"password"` + NumMostRecentIndices int `toml:"num_most_recent_indices"` + + Log telegraf.Logger `toml:"-"` + + client *http.Client + common_http.HTTPClientConfig + + serverInfo map[string]serverInfo + serverInfoMutex sync.Mutex + indexMatchers map[string]filter.Filter +} type nodeStat struct { Host string `json:"host"` @@ -109,89 +138,15 @@ type indexStat struct { Total interface{} `json:"total"` Shards map[string][]interface{} `json:"shards"` } - -// Elasticsearch is a plugin to read stats from one or many Elasticsearch -// servers. -type Elasticsearch struct { - Local bool `toml:"local"` - Servers []string `toml:"servers"` - HTTPHeaders map[string]string `toml:"headers"` - HTTPTimeout config.Duration `toml:"http_timeout" deprecated:"1.29.0;1.35.0;use 'timeout' instead"` - ClusterHealth bool `toml:"cluster_health"` - ClusterHealthLevel string `toml:"cluster_health_level"` - ClusterStats bool `toml:"cluster_stats"` - ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"` - EnrichStats bool `toml:"enrich_stats"` - IndicesInclude []string `toml:"indices_include"` - IndicesLevel string `toml:"indices_level"` - NodeStats []string `toml:"node_stats"` - Username string `toml:"username"` - Password string `toml:"password"` - NumMostRecentIndices int `toml:"num_most_recent_indices"` - - Log telegraf.Logger `toml:"-"` - - client *http.Client - common_http.HTTPClientConfig - - serverInfo map[string]serverInfo - serverInfoMutex sync.Mutex - indexMatchers map[string]filter.Filter -} type serverInfo struct { nodeID string masterID string } -func (i serverInfo) isMaster() bool { - return i.nodeID == i.masterID -} - -// NewElasticsearch return a new instance of Elasticsearch -func NewElasticsearch() *Elasticsearch { - return &Elasticsearch{ - ClusterStatsOnlyFromMaster: true, - ClusterHealthLevel: "indices", - HTTPClientConfig: common_http.HTTPClientConfig{ - ResponseHeaderTimeout: config.Duration(5 * time.Second), - Timeout: config.Duration(5 * time.Second), - }, - } -} - -// perform status mapping -func mapHealthStatusToCode(s string) int { - switch strings.ToLower(s) { - case "green": - return 1 - case "yellow": - return 2 - case "red": - return 3 - } - return 0 -} - -// perform shard status mapping -func mapShardStatusToCode(s string) int { - switch strings.ToUpper(s) { - case "UNASSIGNED": - return 1 - case "INITIALIZING": - return 2 - case "STARTED": - return 3 - case "RELOCATING": - return 4 - } - return 0 -} - func (*Elasticsearch) SampleConfig() string { return sampleConfig } -// Init the plugin. func (e *Elasticsearch) Init() error { // Compile the configured indexes to match for sorting. indexMatchers, err := e.compileIndexMatchers() @@ -208,8 +163,6 @@ func (e *Elasticsearch) Start(_ telegraf.Accumulator) error { return nil } -// Gather reads the stats from Elasticsearch and writes it to the -// Accumulator. func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { if e.client == nil { client, err := e.createHTTPClient() @@ -784,8 +737,51 @@ func (e *Elasticsearch) compileIndexMatchers() (map[string]filter.Filter, error) return indexMatchers, nil } +func (i serverInfo) isMaster() bool { + return i.nodeID == i.masterID +} + +// perform status mapping +func mapHealthStatusToCode(s string) int { + switch strings.ToLower(s) { + case "green": + return 1 + case "yellow": + return 2 + case "red": + return 3 + } + return 0 +} + +// perform shard status mapping +func mapShardStatusToCode(s string) int { + switch strings.ToUpper(s) { + case "UNASSIGNED": + return 1 + case "INITIALIZING": + return 2 + case "STARTED": + return 3 + case "RELOCATING": + return 4 + } + return 0 +} + +func newElasticsearch() *Elasticsearch { + return &Elasticsearch{ + ClusterStatsOnlyFromMaster: true, + ClusterHealthLevel: "indices", + HTTPClientConfig: common_http.HTTPClientConfig{ + ResponseHeaderTimeout: config.Duration(5 * time.Second), + Timeout: config.Duration(5 * time.Second), + }, + } +} + func init() { inputs.Add("elasticsearch", func() telegraf.Input { - return NewElasticsearch() + return newElasticsearch() }) } diff --git a/plugins/inputs/elasticsearch/elasticsearch_test.go b/plugins/inputs/elasticsearch/elasticsearch_test.go index 2ee1160492fdb..5db59ba7c5fd8 100644 --- a/plugins/inputs/elasticsearch/elasticsearch_test.go +++ b/plugins/inputs/elasticsearch/elasticsearch_test.go @@ -48,8 +48,6 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) { return res, nil } -func (t *transportMock) CancelRequest(_ *http.Request) {} - func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) { tags := defaultTags() acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags) @@ -369,7 +367,7 @@ func TestGatherClusterIndiceShardsStats(t *testing.T) { } func newElasticsearchWithClient() *Elasticsearch { - es := NewElasticsearch() + es := newElasticsearch() es.client = &http.Client{} return es } diff --git a/plugins/inputs/elasticsearch_query/elasticsearch_query.go b/plugins/inputs/elasticsearch_query/elasticsearch_query.go index 99c7323ebdb6c..0e06b4b049489 100644 --- a/plugins/inputs/elasticsearch_query/elasticsearch_query.go +++ b/plugins/inputs/elasticsearch_query/elasticsearch_query.go @@ -23,7 +23,6 @@ import ( //go:embed sample.conf var sampleConfig string -// ElasticsearchQuery struct type ElasticsearchQuery struct { URLs []string `toml:"urls"` Username string `toml:"username"` @@ -40,7 +39,6 @@ type ElasticsearchQuery struct { esClient *elastic5.Client } -// esAggregation struct type esAggregation struct { Index string `toml:"index"` MeasurementName string `toml:"measurement_name"` @@ -61,7 +59,6 @@ func (*ElasticsearchQuery) SampleConfig() string { return sampleConfig } -// Init the plugin. func (e *ElasticsearchQuery) Init() error { if e.URLs == nil { return errors.New("elasticsearch urls is not defined") @@ -69,7 +66,7 @@ func (e *ElasticsearchQuery) Init() error { err := e.connectToES() if err != nil { - e.Log.Errorf("E! error connecting to elasticsearch: %s", err) + e.Log.Errorf("error connecting to elasticsearch: %s", err) return nil } @@ -92,6 +89,40 @@ func (e *ElasticsearchQuery) Init() error { return nil } +func (e *ElasticsearchQuery) Start(_ telegraf.Accumulator) error { + return nil +} + +// Gather writes the results of the queries from Elasticsearch to the Accumulator. +func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + + err := e.connectToES() + if err != nil { + return err + } + + for i, agg := range e.Aggregations { + wg.Add(1) + go func(agg esAggregation, i int) { + defer wg.Done() + err := e.esAggregationQuery(acc, agg, i) + if err != nil { + acc.AddError(fmt.Errorf("elasticsearch query aggregation %s: %w", agg.MeasurementName, err)) + } + }(agg, i) + } + + wg.Wait() + return nil +} + +func (e *ElasticsearchQuery) Stop() { + if e.httpclient != nil { + e.httpclient.CloseIdleConnections() + } +} + func (e *ElasticsearchQuery) initAggregation(ctx context.Context, agg esAggregation, i int) (err error) { // retrieve field mapping and build queries only once agg.mapMetricFields, err = e.getMetricFields(ctx, agg) @@ -173,40 +204,6 @@ func (e *ElasticsearchQuery) connectToES() error { return nil } -func (e *ElasticsearchQuery) Start(_ telegraf.Accumulator) error { - return nil -} - -// Gather writes the results of the queries from Elasticsearch to the Accumulator. -func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error { - var wg sync.WaitGroup - - err := e.connectToES() - if err != nil { - return err - } - - for i, agg := range e.Aggregations { - wg.Add(1) - go func(agg esAggregation, i int) { - defer wg.Done() - err := e.esAggregationQuery(acc, agg, i) - if err != nil { - acc.AddError(fmt.Errorf("elasticsearch query aggregation %s: %w", agg.MeasurementName, err)) - } - }(agg, i) - } - - wg.Wait() - return nil -} - -func (e *ElasticsearchQuery) Stop() { - if e.httpclient != nil { - e.httpclient.CloseIdleConnections() - } -} - func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) { ctx := context.Background() return e.HTTPClientConfig.CreateClient(ctx, e.Log) diff --git a/plugins/inputs/ethtool/ethtool.go b/plugins/inputs/ethtool/ethtool.go index 70decafef6e95..d59039ace0c4a 100644 --- a/plugins/inputs/ethtool/ethtool.go +++ b/plugins/inputs/ethtool/ethtool.go @@ -5,17 +5,17 @@ import ( _ "embed" ) -const pluginName = "ethtool" - //go:embed sample.conf var sampleConfig string -type Command interface { - Init() error - DriverName(intf NamespacedInterface) (string, error) - Interfaces(includeNamespaces bool) ([]NamespacedInterface, error) - Stats(intf NamespacedInterface) (map[string]uint64, error) - Get(intf NamespacedInterface) (map[string]uint64, error) +const pluginName = "ethtool" + +type command interface { + init() error + driverName(intf namespacedInterface) (string, error) + interfaces(includeNamespaces bool) ([]namespacedInterface, error) + stats(intf namespacedInterface) (map[string]uint64, error) + get(intf namespacedInterface) (map[string]uint64, error) } func (*Ethtool) SampleConfig() string { diff --git a/plugins/inputs/ethtool/ethtool_linux.go b/plugins/inputs/ethtool/ethtool_linux.go index d506a5a235309..0710a3dc1b56a 100644 --- a/plugins/inputs/ethtool/ethtool_linux.go +++ b/plugins/inputs/ethtool/ethtool_linux.go @@ -19,6 +19,8 @@ import ( "github.com/influxdata/telegraf/plugins/inputs" ) +var downInterfacesBehaviors = []string{"expose", "skip"} + const ( tagInterface = "interface" tagNamespace = "namespace" @@ -26,8 +28,6 @@ const ( fieldInterfaceUp = "interface_up" ) -var downInterfacesBehaviors = []string{"expose", "skip"} - type Ethtool struct { // This is the list of interface names to include InterfaceInclude []string `toml:"interface_include"` @@ -54,12 +54,12 @@ type Ethtool struct { includeNamespaces bool // the ethtool command - command Command + command command } -type CommandEthtool struct { - Log telegraf.Logger - namespaceGoroutines map[string]*NamespaceGoroutine +type commandEthtool struct { + log telegraf.Logger + namespaceGoroutines map[string]*namespaceGoroutine } func (e *Ethtool) Init() error { @@ -90,16 +90,16 @@ func (e *Ethtool) Init() error { return err } - if command, ok := e.command.(*CommandEthtool); ok { - command.Log = e.Log + if command, ok := e.command.(*commandEthtool); ok { + command.log = e.Log } - return e.command.Init() + return e.command.init() } func (e *Ethtool) Gather(acc telegraf.Accumulator) error { // Get the list of interfaces - interfaces, err := e.command.Interfaces(e.includeNamespaces) + interfaces, err := e.command.interfaces(e.includeNamespaces) if err != nil { acc.AddError(err) return nil @@ -113,7 +113,7 @@ func (e *Ethtool) Gather(acc telegraf.Accumulator) error { if e.interfaceEligibleForGather(iface) { wg.Add(1) - go func(i NamespacedInterface) { + go func(i namespacedInterface) { e.gatherEthtoolStats(i, acc) wg.Done() }(iface) @@ -125,14 +125,14 @@ func (e *Ethtool) Gather(acc telegraf.Accumulator) error { return nil } -func (e *Ethtool) interfaceEligibleForGather(iface NamespacedInterface) bool { +func (e *Ethtool) interfaceEligibleForGather(iface namespacedInterface) bool { // Don't gather if it is a loop back, or it isn't matched by the filter if isLoopback(iface) || !e.interfaceFilter.Match(iface.Name) { return false } // Don't gather if it's not in a namespace matched by the filter - if !e.namespaceFilter.Match(iface.Namespace.Name()) { + if !e.namespaceFilter.Match(iface.namespace.name()) { return false } @@ -145,12 +145,12 @@ func (e *Ethtool) interfaceEligibleForGather(iface NamespacedInterface) bool { } // Gather the stats for the interface. -func (e *Ethtool) gatherEthtoolStats(iface NamespacedInterface, acc telegraf.Accumulator) { +func (e *Ethtool) gatherEthtoolStats(iface namespacedInterface, acc telegraf.Accumulator) { tags := make(map[string]string) tags[tagInterface] = iface.Name - tags[tagNamespace] = iface.Namespace.Name() + tags[tagNamespace] = iface.namespace.name() - driverName, err := e.command.DriverName(iface) + driverName, err := e.command.driverName(iface) if err != nil { acc.AddError(fmt.Errorf("%q driver: %w", iface.Name, err)) return @@ -159,7 +159,7 @@ func (e *Ethtool) gatherEthtoolStats(iface NamespacedInterface, acc telegraf.Acc tags[tagDriverName] = driverName fields := make(map[string]interface{}) - stats, err := e.command.Stats(iface) + stats, err := e.command.stats(iface) if err != nil { acc.AddError(fmt.Errorf("%q stats: %w", iface.Name, err)) return @@ -170,7 +170,7 @@ func (e *Ethtool) gatherEthtoolStats(iface NamespacedInterface, acc telegraf.Acc fields[e.normalizeKey(k)] = v } - cmdget, err := e.command.Get(iface) + cmdget, err := e.command.get(iface) // error text is directly from running ethtool and syscalls if err != nil && err.Error() != "operation not supported" { acc.AddError(fmt.Errorf("%q get: %w", iface.Name, err)) @@ -228,57 +228,57 @@ func inStringSlice(slice []string, value string) bool { return false } -func isLoopback(iface NamespacedInterface) bool { +func isLoopback(iface namespacedInterface) bool { return (iface.Flags & net.FlagLoopback) != 0 } -func interfaceUp(iface NamespacedInterface) bool { +func interfaceUp(iface namespacedInterface) bool { return (iface.Flags & net.FlagUp) != 0 } -func NewCommandEthtool() *CommandEthtool { - return &CommandEthtool{} +func newCommandEthtool() *commandEthtool { + return &commandEthtool{} } -func (c *CommandEthtool) Init() error { +func (c *commandEthtool) init() error { // Create the goroutine for the initial namespace initialNamespace, err := netns.Get() if err != nil { return err } - namespaceGoroutine := &NamespaceGoroutine{ - name: "", - handle: initialNamespace, - Log: c.Log, + nspaceGoroutine := &namespaceGoroutine{ + namespaceName: "", + handle: initialNamespace, + log: c.log, } - if err := namespaceGoroutine.Start(); err != nil { - c.Log.Errorf(`Failed to start goroutine for the initial namespace: %s`, err) + if err := nspaceGoroutine.start(); err != nil { + c.log.Errorf(`Failed to start goroutine for the initial namespace: %s`, err) return err } - c.namespaceGoroutines = map[string]*NamespaceGoroutine{ - "": namespaceGoroutine, + c.namespaceGoroutines = map[string]*namespaceGoroutine{ + "": nspaceGoroutine, } return nil } -func (c *CommandEthtool) DriverName(intf NamespacedInterface) (driver string, err error) { - return intf.Namespace.DriverName(intf) +func (c *commandEthtool) driverName(intf namespacedInterface) (driver string, err error) { + return intf.namespace.driverName(intf) } -func (c *CommandEthtool) Stats(intf NamespacedInterface) (stats map[string]uint64, err error) { - return intf.Namespace.Stats(intf) +func (c *commandEthtool) stats(intf namespacedInterface) (stats map[string]uint64, err error) { + return intf.namespace.stats(intf) } -func (c *CommandEthtool) Get(intf NamespacedInterface) (stats map[string]uint64, err error) { - return intf.Namespace.Get(intf) +func (c *commandEthtool) get(intf namespacedInterface) (stats map[string]uint64, err error) { + return intf.namespace.get(intf) } -func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterface, error) { +func (c *commandEthtool) interfaces(includeNamespaces bool) ([]namespacedInterface, error) { const namespaceDirectory = "/var/run/netns" initialNamespace, err := netns.Get() if err != nil { - c.Log.Errorf("Could not get initial namespace: %s", err) + c.log.Errorf("Could not get initial namespace: %s", err) return nil, err } defer initialNamespace.Close() @@ -294,7 +294,7 @@ func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterfa if includeNamespaces { namespaces, err := os.ReadDir(namespaceDirectory) if err != nil { - c.Log.Warnf("Could not find namespace directory: %s", err) + c.log.Warnf("Could not find namespace directory: %s", err) } // We'll always have at least the initial namespace, so add one to ensure @@ -306,7 +306,7 @@ func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterfa handle, err := netns.GetFromPath(filepath.Join(namespaceDirectory, name)) if err != nil { - c.Log.Warnf("Could not get handle for namespace %q: %s", name, err.Error()) + c.log.Warnf("Could not get handle for namespace %q: %s", name, err.Error()) continue } handles[name] = handle @@ -323,24 +323,24 @@ func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterfa namespaceNames = append(namespaceNames, "") } - allInterfaces := make([]NamespacedInterface, 0) + allInterfaces := make([]namespacedInterface, 0) for _, namespace := range namespaceNames { if _, ok := c.namespaceGoroutines[namespace]; !ok { - c.namespaceGoroutines[namespace] = &NamespaceGoroutine{ - name: namespace, - handle: handles[namespace], - Log: c.Log, + c.namespaceGoroutines[namespace] = &namespaceGoroutine{ + namespaceName: namespace, + handle: handles[namespace], + log: c.log, } - if err := c.namespaceGoroutines[namespace].Start(); err != nil { - c.Log.Errorf("Failed to start goroutine for namespace %q: %s", namespace, err.Error()) + if err := c.namespaceGoroutines[namespace].start(); err != nil { + c.log.Errorf("Failed to start goroutine for namespace %q: %s", namespace, err.Error()) delete(c.namespaceGoroutines, namespace) continue } } - interfaces, err := c.namespaceGoroutines[namespace].Interfaces() + interfaces, err := c.namespaceGoroutines[namespace].interfaces() if err != nil { - c.Log.Warnf("Could not get interfaces from namespace %q: %s", namespace, err.Error()) + c.log.Warnf("Could not get interfaces from namespace %q: %s", namespace, err.Error()) continue } allInterfaces = append(allInterfaces, interfaces...) @@ -356,7 +356,7 @@ func init() { InterfaceExclude: []string{}, NamespaceInclude: []string{}, NamespaceExclude: []string{}, - command: NewCommandEthtool(), + command: newCommandEthtool(), } }) } diff --git a/plugins/inputs/ethtool/ethtool_test.go b/plugins/inputs/ethtool/ethtool_test.go index 04bcf6ddbd838..9f79d89653cca 100644 --- a/plugins/inputs/ethtool/ethtool_test.go +++ b/plugins/inputs/ethtool/ethtool_test.go @@ -13,77 +13,77 @@ import ( ) var ( - command *Ethtool - interfaceMap map[string]*InterfaceMock + eth *Ethtool + interfaceMap map[string]*interfaceMock ) -type InterfaceMock struct { - Name string - DriverName string - NamespaceName string - Stat map[string]uint64 - LoopBack bool - InterfaceUp bool - CmdGet map[string]uint64 +type interfaceMock struct { + name string + driverName string + namespaceName string + stat map[string]uint64 + loopBack bool + interfaceUp bool + cmdGet map[string]uint64 } -type NamespaceMock struct { - name string +type namespaceMock struct { + namespaceName string } -func (n *NamespaceMock) Name() string { - return n.name +func (n *namespaceMock) name() string { + return n.namespaceName } -func (n *NamespaceMock) Interfaces() ([]NamespacedInterface, error) { +func (n *namespaceMock) interfaces() ([]namespacedInterface, error) { return nil, errors.New("it is a test bug to invoke this function") } -func (n *NamespaceMock) DriverName(_ NamespacedInterface) (string, error) { +func (n *namespaceMock) driverName(_ namespacedInterface) (string, error) { return "", errors.New("it is a test bug to invoke this function") } -func (n *NamespaceMock) Stats(_ NamespacedInterface) (map[string]uint64, error) { +func (n *namespaceMock) stats(_ namespacedInterface) (map[string]uint64, error) { return nil, errors.New("it is a test bug to invoke this function") } -func (n *NamespaceMock) Get(_ NamespacedInterface) (map[string]uint64, error) { +func (n *namespaceMock) get(_ namespacedInterface) (map[string]uint64, error) { return nil, errors.New("it is a test bug to invoke this function") } -type CommandEthtoolMock struct { - InterfaceMap map[string]*InterfaceMock +type commandEthtoolMock struct { + interfaceMap map[string]*interfaceMock } -func (c *CommandEthtoolMock) Init() error { +func (c *commandEthtoolMock) init() error { // Not required for test mock return nil } -func (c *CommandEthtoolMock) DriverName(intf NamespacedInterface) (string, error) { - i := c.InterfaceMap[intf.Name] +func (c *commandEthtoolMock) driverName(intf namespacedInterface) (string, error) { + i := c.interfaceMap[intf.Name] if i != nil { - return i.DriverName, nil + return i.driverName, nil } return "", errors.New("interface not found") } -func (c *CommandEthtoolMock) Interfaces(includeNamespaces bool) ([]NamespacedInterface, error) { - namespaces := map[string]*NamespaceMock{"": {name: ""}} +func (c *commandEthtoolMock) interfaces(includeNamespaces bool) ([]namespacedInterface, error) { + namespaces := map[string]*namespaceMock{"": {namespaceName: ""}} - interfaces := make([]NamespacedInterface, 0) - for k, v := range c.InterfaceMap { - if v.NamespaceName != "" && !includeNamespaces { + interfaces := make([]namespacedInterface, 0) + for k, v := range c.interfaceMap { + if v.namespaceName != "" && !includeNamespaces { continue } var flag net.Flags // When interface is up - if v.InterfaceUp { + if v.interfaceUp { flag |= net.FlagUp } // For loopback interface - if v.LoopBack { + if v.loopBack { flag |= net.FlagLoopback } @@ -97,41 +97,41 @@ func (c *CommandEthtoolMock) Interfaces(includeNamespaces bool) ([]NamespacedInt } // Ensure there is a namespace if necessary - if _, ok := namespaces[v.NamespaceName]; !ok { - namespaces[v.NamespaceName] = &NamespaceMock{ - name: v.NamespaceName, + if _, ok := namespaces[v.namespaceName]; !ok { + namespaces[v.namespaceName] = &namespaceMock{ + namespaceName: v.namespaceName, } } interfaces = append( interfaces, - NamespacedInterface{ + namespacedInterface{ Interface: iface, - Namespace: namespaces[v.NamespaceName], + namespace: namespaces[v.namespaceName], }, ) } return interfaces, nil } -func (c *CommandEthtoolMock) Stats(intf NamespacedInterface) (map[string]uint64, error) { - i := c.InterfaceMap[intf.Name] +func (c *commandEthtoolMock) stats(intf namespacedInterface) (map[string]uint64, error) { + i := c.interfaceMap[intf.Name] if i != nil { - return i.Stat, nil + return i.stat, nil } return nil, errors.New("interface not found") } -func (c *CommandEthtoolMock) Get(intf NamespacedInterface) (map[string]uint64, error) { - i := c.InterfaceMap[intf.Name] +func (c *commandEthtoolMock) get(intf namespacedInterface) (map[string]uint64, error) { + i := c.interfaceMap[intf.Name] if i != nil { - return i.CmdGet, nil + return i.cmdGet, nil } return nil, errors.New("interface not found") } func setup() { - interfaceMap = make(map[string]*InterfaceMock) + interfaceMap = make(map[string]*interfaceMock) eth1Stat := map[string]uint64{ "interface_up": 1, @@ -238,8 +238,8 @@ func setup() { "link": 1, "speed": 1000, } - eth1 := &InterfaceMock{"eth1", "driver1", "", eth1Stat, false, true, eth1Get} - interfaceMap[eth1.Name] = eth1 + eth1 := &interfaceMock{"eth1", "driver1", "", eth1Stat, false, true, eth1Get} + interfaceMap[eth1.name] = eth1 eth2Stat := map[string]uint64{ "interface_up": 0, @@ -346,8 +346,8 @@ func setup() { "link": 0, "speed": 9223372036854775807, } - eth2 := &InterfaceMock{"eth2", "driver1", "", eth2Stat, false, false, eth2Get} - interfaceMap[eth2.Name] = eth2 + eth2 := &interfaceMock{"eth2", "driver1", "", eth2Stat, false, false, eth2Get} + interfaceMap[eth2.name] = eth2 eth3Stat := map[string]uint64{ "interface_up": 1, @@ -454,8 +454,8 @@ func setup() { "link": 1, "speed": 1000, } - eth3 := &InterfaceMock{"eth3", "driver1", "namespace1", eth3Stat, false, true, eth3Get} - interfaceMap[eth3.Name] = eth3 + eth3 := &interfaceMock{"eth3", "driver1", "namespace1", eth3Stat, false, true, eth3Get} + interfaceMap[eth3.name] = eth3 eth4Stat := map[string]uint64{ "interface_up": 1, @@ -562,8 +562,8 @@ func setup() { "link": 1, "speed": 100, } - eth4 := &InterfaceMock{"eth4", "driver1", "namespace2", eth4Stat, false, true, eth4Get} - interfaceMap[eth4.Name] = eth4 + eth4 := &interfaceMock{"eth4", "driver1", "namespace2", eth4Stat, false, true, eth4Get} + interfaceMap[eth4.name] = eth4 // dummy loopback including dummy stat to ensure that the ignore feature is working lo0Stat := map[string]uint64{ @@ -575,11 +575,11 @@ func setup() { "link": 1, "speed": 1000, } - lo0 := &InterfaceMock{"lo0", "", "", lo0Stat, true, true, lo0Get} - interfaceMap[lo0.Name] = lo0 + lo0 := &interfaceMock{"lo0", "", "", lo0Stat, true, true, lo0Get} + interfaceMap[lo0.name] = lo0 - c := &CommandEthtoolMock{interfaceMap} - command = &Ethtool{ + c := &commandEthtoolMock{interfaceMap} + eth = &Ethtool{ InterfaceInclude: []string{}, InterfaceExclude: []string{}, DownInterfaces: "expose", @@ -607,16 +607,16 @@ func toStringMapUint(in map[string]interface{}) map[string]uint64 { func TestGather(t *testing.T) { setup() - err := command.Init() + err := eth.Init() require.NoError(t, err) var acc testutil.Accumulator - err = command.Gather(&acc) + err = eth.Gather(&acc) require.NoError(t, err) require.Len(t, acc.Metrics, 2) - expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) - for k, v := range interfaceMap["eth1"].CmdGet { + expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat) + for k, v := range interfaceMap["eth1"].cmdGet { expectedFieldsEth1[k] = v } expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] @@ -629,8 +629,8 @@ func TestGather(t *testing.T) { } acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1) - expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) - for k, v := range interfaceMap["eth2"].CmdGet { + expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat) + for k, v := range interfaceMap["eth2"].cmdGet { expectedFieldsEth2[k] = v } expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] @@ -646,19 +646,19 @@ func TestGather(t *testing.T) { func TestGatherIncludeInterfaces(t *testing.T) { setup() - command.InterfaceInclude = append(command.InterfaceInclude, "eth1") + eth.InterfaceInclude = append(eth.InterfaceInclude, "eth1") - err := command.Init() + err := eth.Init() require.NoError(t, err) var acc testutil.Accumulator - err = command.Gather(&acc) + err = eth.Gather(&acc) require.NoError(t, err) require.Len(t, acc.Metrics, 1) // Should contain eth1 - expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) - for k, v := range interfaceMap["eth1"].CmdGet { + expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat) + for k, v := range interfaceMap["eth1"].cmdGet { expectedFieldsEth1[k] = v } expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] @@ -671,8 +671,8 @@ func TestGatherIncludeInterfaces(t *testing.T) { acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1) // Should not contain eth2 - expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) - for k, v := range interfaceMap["eth2"].CmdGet { + expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat) + for k, v := range interfaceMap["eth2"].cmdGet { expectedFieldsEth2[k] = v } expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] @@ -688,19 +688,19 @@ func TestGatherIncludeInterfaces(t *testing.T) { func TestGatherIgnoreInterfaces(t *testing.T) { setup() - command.InterfaceExclude = append(command.InterfaceExclude, "eth1") + eth.InterfaceExclude = append(eth.InterfaceExclude, "eth1") - err := command.Init() + err := eth.Init() require.NoError(t, err) var acc testutil.Accumulator - err = command.Gather(&acc) + err = eth.Gather(&acc) require.NoError(t, err) require.Len(t, acc.Metrics, 1) // Should not contain eth1 - expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) - for k, v := range interfaceMap["eth1"].CmdGet { + expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat) + for k, v := range interfaceMap["eth1"].cmdGet { expectedFieldsEth1[k] = v } expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] @@ -713,8 +713,8 @@ func TestGatherIgnoreInterfaces(t *testing.T) { acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1) // Should contain eth2 - expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) - for k, v := range interfaceMap["eth2"].CmdGet { + expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat) + for k, v := range interfaceMap["eth2"].cmdGet { expectedFieldsEth2[k] = v } expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] @@ -730,18 +730,18 @@ func TestGatherIgnoreInterfaces(t *testing.T) { func TestSkipMetricsForInterfaceDown(t *testing.T) { setup() - command.DownInterfaces = "skip" + eth.DownInterfaces = "skip" - err := command.Init() + err := eth.Init() require.NoError(t, err) var acc testutil.Accumulator - err = command.Gather(&acc) + err = eth.Gather(&acc) require.NoError(t, err) require.Len(t, acc.Metrics, 1) - expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) - for k, v := range interfaceMap["eth1"].CmdGet { + expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat) + for k, v := range interfaceMap["eth1"].cmdGet { expectedFieldsEth1[k] = v } expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] @@ -759,18 +759,18 @@ func TestGatherIncludeNamespaces(t *testing.T) { setup() var acc testutil.Accumulator - command.NamespaceInclude = append(command.NamespaceInclude, "namespace1") + eth.NamespaceInclude = append(eth.NamespaceInclude, "namespace1") - err := command.Init() + err := eth.Init() require.NoError(t, err) - err = command.Gather(&acc) + err = eth.Gather(&acc) require.NoError(t, err) require.Len(t, acc.Metrics, 1) // Should contain eth3 - expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].Stat) - for k, v := range interfaceMap["eth3"].CmdGet { + expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].stat) + for k, v := range interfaceMap["eth3"].cmdGet { expectedFieldsEth3[k] = v } expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"] @@ -783,8 +783,8 @@ func TestGatherIncludeNamespaces(t *testing.T) { acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3) // Should not contain eth2 - expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) - for k, v := range interfaceMap["eth2"].CmdGet { + expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat) + for k, v := range interfaceMap["eth2"].cmdGet { expectedFieldsEth2[k] = v } expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] @@ -801,18 +801,18 @@ func TestGatherIgnoreNamespaces(t *testing.T) { setup() var acc testutil.Accumulator - command.NamespaceExclude = append(command.NamespaceExclude, "namespace2") + eth.NamespaceExclude = append(eth.NamespaceExclude, "namespace2") - err := command.Init() + err := eth.Init() require.NoError(t, err) - err = command.Gather(&acc) + err = eth.Gather(&acc) require.NoError(t, err) require.Len(t, acc.Metrics, 3) // Should not contain eth4 - expectedFieldsEth4 := toStringMapInterface(interfaceMap["eth4"].Stat) - for k, v := range interfaceMap["eth4"].CmdGet { + expectedFieldsEth4 := toStringMapInterface(interfaceMap["eth4"].stat) + for k, v := range interfaceMap["eth4"].cmdGet { expectedFieldsEth4[k] = v } expectedFieldsEth4["interface_up_counter"] = expectedFieldsEth4["interface_up"] @@ -825,8 +825,8 @@ func TestGatherIgnoreNamespaces(t *testing.T) { acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth4, expectedTagsEth4) // Should contain eth2 - expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) - for k, v := range interfaceMap["eth2"].CmdGet { + expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat) + for k, v := range interfaceMap["eth2"].cmdGet { expectedFieldsEth2[k] = v } expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] @@ -839,8 +839,8 @@ func TestGatherIgnoreNamespaces(t *testing.T) { acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2) // Should contain eth3 - expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].Stat) - for k, v := range interfaceMap["eth3"].CmdGet { + expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].stat) + for k, v := range interfaceMap["eth3"].cmdGet { expectedFieldsEth3[k] = v } expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"] @@ -853,14 +853,14 @@ func TestGatherIgnoreNamespaces(t *testing.T) { acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3) } -type TestCase struct { +type testCase struct { normalization []string stats map[string]interface{} expectedFields map[string]interface{} } func TestNormalizedKeys(t *testing.T) { - cases := []TestCase{ + cases := []testCase{ { normalization: []string{"underscore"}, stats: map[string]interface{}{ @@ -960,29 +960,29 @@ func TestNormalizedKeys(t *testing.T) { } for _, c := range cases { - eth0 := &InterfaceMock{"eth0", "e1000e", "", toStringMapUint(c.stats), false, true, map[string]uint64{}} + eth0 := &interfaceMock{"eth0", "e1000e", "", toStringMapUint(c.stats), false, true, map[string]uint64{}} expectedTags := map[string]string{ - "interface": eth0.Name, - "driver": eth0.DriverName, + "interface": eth0.name, + "driver": eth0.driverName, "namespace": "", } - interfaceMap = make(map[string]*InterfaceMock) - interfaceMap[eth0.Name] = eth0 + interfaceMap = make(map[string]*interfaceMock) + interfaceMap[eth0.name] = eth0 - cmd := &CommandEthtoolMock{interfaceMap} - command = &Ethtool{ + cmd := &commandEthtoolMock{interfaceMap} + eth = &Ethtool{ InterfaceInclude: []string{}, InterfaceExclude: []string{}, NormalizeKeys: c.normalization, command: cmd, } - err := command.Init() + err := eth.Init() require.NoError(t, err) var acc testutil.Accumulator - err = command.Gather(&acc) + err = eth.Gather(&acc) require.NoError(t, err) require.Len(t, acc.Metrics, 1) diff --git a/plugins/inputs/ethtool/namespace.go b/plugins/inputs/ethtool/namespace.go index c270ff00ce97e..bb2f8f736a627 100644 --- a/plugins/inputs/ethtool/namespace.go +++ b/plugins/inputs/ethtool/namespace.go @@ -2,15 +2,15 @@ package ethtool import "net" -type Namespace interface { - Name() string - Interfaces() ([]NamespacedInterface, error) - DriverName(intf NamespacedInterface) (string, error) - Stats(intf NamespacedInterface) (map[string]uint64, error) - Get(intf NamespacedInterface) (map[string]uint64, error) +type namespace interface { + name() string + interfaces() ([]namespacedInterface, error) + driverName(intf namespacedInterface) (string, error) + stats(intf namespacedInterface) (map[string]uint64, error) + get(intf namespacedInterface) (map[string]uint64, error) } -type NamespacedInterface struct { +type namespacedInterface struct { net.Interface - Namespace Namespace + namespace namespace } diff --git a/plugins/inputs/ethtool/namespace_linux.go b/plugins/inputs/ethtool/namespace_linux.go index a83ba9b62939f..88d109650400a 100644 --- a/plugins/inputs/ethtool/namespace_linux.go +++ b/plugins/inputs/ethtool/namespace_linux.go @@ -11,66 +11,66 @@ import ( "github.com/influxdata/telegraf" ) -type NamespacedAction struct { - result chan<- NamespacedResult - f func(*NamespaceGoroutine) (interface{}, error) +type namespacedAction struct { + result chan<- namespacedResult + f func(*namespaceGoroutine) (interface{}, error) } -type NamespacedResult struct { - Result interface{} - Error error +type namespacedResult struct { + result interface{} + err error } -type NamespaceGoroutine struct { - name string +type namespaceGoroutine struct { + namespaceName string handle netns.NsHandle ethtoolClient *ethtool.Ethtool - c chan NamespacedAction - Log telegraf.Logger + c chan namespacedAction + log telegraf.Logger } -func (n *NamespaceGoroutine) Name() string { - return n.name +func (n *namespaceGoroutine) name() string { + return n.namespaceName } -func (n *NamespaceGoroutine) Interfaces() ([]NamespacedInterface, error) { - interfaces, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { +func (n *namespaceGoroutine) interfaces() ([]namespacedInterface, error) { + interfaces, err := n.do(func(n *namespaceGoroutine) (interface{}, error) { interfaces, err := net.Interfaces() if err != nil { return nil, err } - namespacedInterfaces := make([]NamespacedInterface, 0, len(interfaces)) + namespacedInterfaces := make([]namespacedInterface, 0, len(interfaces)) for _, iface := range interfaces { namespacedInterfaces = append( namespacedInterfaces, - NamespacedInterface{ + namespacedInterface{ Interface: iface, - Namespace: n, + namespace: n, }, ) } return namespacedInterfaces, nil }) - return interfaces.([]NamespacedInterface), err + return interfaces.([]namespacedInterface), err } -func (n *NamespaceGoroutine) DriverName(intf NamespacedInterface) (string, error) { - driver, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { +func (n *namespaceGoroutine) driverName(intf namespacedInterface) (string, error) { + driver, err := n.do(func(n *namespaceGoroutine) (interface{}, error) { return n.ethtoolClient.DriverName(intf.Name) }) return driver.(string), err } -func (n *NamespaceGoroutine) Stats(intf NamespacedInterface) (map[string]uint64, error) { - driver, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { +func (n *namespaceGoroutine) stats(intf namespacedInterface) (map[string]uint64, error) { + driver, err := n.do(func(n *namespaceGoroutine) (interface{}, error) { return n.ethtoolClient.Stats(intf.Name) }) return driver.(map[string]uint64), err } -func (n *NamespaceGoroutine) Get(intf NamespacedInterface) (map[string]uint64, error) { - result, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { +func (n *namespaceGoroutine) get(intf namespacedInterface) (map[string]uint64, error) { + result, err := n.do(func(n *namespaceGoroutine) (interface{}, error) { ecmd := ethtool.EthtoolCmd{} speed32, err := n.ethtoolClient.CmdGet(&ecmd, intf.Name) if err != nil { @@ -102,10 +102,10 @@ func (n *NamespaceGoroutine) Get(intf NamespacedInterface) (map[string]uint64, e return nil, err } -// Start locks a goroutine to an OS thread and ties it to the namespace, then +// start locks a goroutine to an OS thread and ties it to the namespace, then // loops for actions to run in the namespace. -func (n *NamespaceGoroutine) Start() error { - n.c = make(chan NamespacedAction) +func (n *namespaceGoroutine) start() error { + n.c = make(chan namespacedAction) started := make(chan error) go func() { // We're going to hold this thread locked permanently. We're going to @@ -121,13 +121,13 @@ func (n *NamespaceGoroutine) Start() error { // current one. initialNamespace, err := netns.Get() if err != nil { - n.Log.Errorf("Could not get initial namespace: %s", err) + n.log.Errorf("Could not get initial namespace: %s", err) started <- err return } if !initialNamespace.Equal(n.handle) { if err := netns.Set(n.handle); err != nil { - n.Log.Errorf("Could not switch to namespace %q: %s", n.name, err.Error()) + n.log.Errorf("Could not switch to namespace %q: %s", n.namespaceName, err.Error()) started <- err return } @@ -136,7 +136,7 @@ func (n *NamespaceGoroutine) Start() error { // Every namespace needs its own connection to ethtool e, err := ethtool.NewEthtool() if err != nil { - n.Log.Errorf("Could not create ethtool client for namespace %q: %s", n.name, err.Error()) + n.log.Errorf("Could not create ethtool client for namespace %q: %s", n.namespaceName, err.Error()) started <- err return } @@ -144,9 +144,9 @@ func (n *NamespaceGoroutine) Start() error { started <- nil for command := range n.c { result, err := command.f(n) - command.result <- NamespacedResult{ - Result: result, - Error: err, + command.result <- namespacedResult{ + result: result, + err: err, } close(command.result) } @@ -154,13 +154,13 @@ func (n *NamespaceGoroutine) Start() error { return <-started } -// Do runs a function inside the OS thread tied to the namespace. -func (n *NamespaceGoroutine) Do(f func(*NamespaceGoroutine) (interface{}, error)) (interface{}, error) { - result := make(chan NamespacedResult) - n.c <- NamespacedAction{ +// do runs a function inside the OS thread tied to the namespace. +func (n *namespaceGoroutine) do(f func(*namespaceGoroutine) (interface{}, error)) (interface{}, error) { + result := make(chan namespacedResult) + n.c <- namespacedAction{ result: result, f: f, } r := <-result - return r.Result, r.Error + return r.result, r.err } diff --git a/plugins/inputs/eventhub_consumer/eventhub_consumer.go b/plugins/inputs/eventhub_consumer/eventhub_consumer.go index 48d28c3b9aa16..fff52b7e54a0a 100644 --- a/plugins/inputs/eventhub_consumer/eventhub_consumer.go +++ b/plugins/inputs/eventhub_consumer/eventhub_consumer.go @@ -26,10 +26,6 @@ const ( defaultMaxUndeliveredMessages = 1000 ) -type empty struct{} -type semaphore chan empty - -// EventHub is the top level struct for this plugin type EventHub struct { // Configuration ConnectionString string `toml:"connection_string"` @@ -70,21 +66,15 @@ type EventHub struct { in chan []telegraf.Metric } +type ( + empty struct{} + semaphore chan empty +) + func (*EventHub) SampleConfig() string { return sampleConfig } -// SetParser sets the parser -func (e *EventHub) SetParser(parser telegraf.Parser) { - e.parser = parser -} - -// Gather function is unused -func (*EventHub) Gather(telegraf.Accumulator) error { - return nil -} - -// Init the EventHub ServiceInput func (e *EventHub) Init() (err error) { if e.MaxUndeliveredMessages == 0 { e.MaxUndeliveredMessages = defaultMaxUndeliveredMessages @@ -118,7 +108,10 @@ func (e *EventHub) Init() (err error) { return err } -// Start the EventHub ServiceInput +func (e *EventHub) SetParser(parser telegraf.Parser) { + e.parser = parser +} + func (e *EventHub) Start(acc telegraf.Accumulator) error { e.in = make(chan []telegraf.Metric) @@ -155,6 +148,19 @@ func (e *EventHub) Start(acc telegraf.Accumulator) error { return nil } +func (*EventHub) Gather(telegraf.Accumulator) error { + return nil +} + +func (e *EventHub) Stop() { + err := e.hub.Close(context.Background()) + if err != nil { + e.Log.Errorf("Error closing Event Hub connection: %v", err) + } + e.cancel() + e.wg.Wait() +} + func (e *EventHub) configureReceiver() []eventhub.ReceiveOption { receiveOpts := []eventhub.ReceiveOption{} @@ -333,16 +339,6 @@ func (e *EventHub) createMetrics(event *eventhub.Event) ([]telegraf.Metric, erro return metrics, nil } -// Stop the EventHub ServiceInput -func (e *EventHub) Stop() { - err := e.hub.Close(context.Background()) - if err != nil { - e.Log.Errorf("Error closing Event Hub connection: %v", err) - } - e.cancel() - e.wg.Wait() -} - func init() { inputs.Add("eventhub_consumer", func() telegraf.Input { return &EventHub{} diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index 0e0f8f02bff33..efe8c29687a29 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -26,9 +26,7 @@ var sampleConfig string var once sync.Once -const MaxStderrBytes int = 512 - -type exitcodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric +const maxStderrBytes int = 512 type Exec struct { Commands []string `toml:"commands"` @@ -40,100 +38,29 @@ type Exec struct { parser telegraf.Parser - runner Runner + runner runner - // Allow post processing of command exit codes - exitcodeHandler exitcodeHandlerFunc + // Allow post-processing of command exit codes + exitCodeHandler exitCodeHandlerFunc parseDespiteError bool } -func NewExec() *Exec { - return &Exec{ - runner: CommandRunner{}, - Timeout: config.Duration(time.Second * 5), - } -} +type exitCodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric -type Runner interface { - Run(string, []string, time.Duration) ([]byte, []byte, error) +type runner interface { + run(string, []string, time.Duration) ([]byte, []byte, error) } -type CommandRunner struct { +type commandRunner struct { debug bool } -func (c CommandRunner) truncate(buf bytes.Buffer) bytes.Buffer { - // Limit the number of bytes. - didTruncate := false - if buf.Len() > MaxStderrBytes { - buf.Truncate(MaxStderrBytes) - didTruncate = true - } - if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 { - // Only show truncation if the newline wasn't the last character. - if i < buf.Len()-1 { - didTruncate = true - } - buf.Truncate(i) - } - if didTruncate { - buf.WriteString("...") - } - return buf -} - -// removeWindowsCarriageReturns removes all carriage returns from the input if the -// OS is Windows. It does not return any errors. -func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer { - if runtime.GOOS == "windows" { - var buf bytes.Buffer - for { - byt, err := b.ReadBytes(0x0D) - byt = bytes.TrimRight(byt, "\x0d") - if len(byt) > 0 { - buf.Write(byt) - } - if errors.Is(err, io.EOF) { - return buf - } - } - } - return b -} - func (*Exec) SampleConfig() string { return sampleConfig } -func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) { - defer wg.Done() - - out, errBuf, runErr := e.runner.Run(command, e.Environment, time.Duration(e.Timeout)) - if !e.IgnoreError && !e.parseDespiteError && runErr != nil { - err := fmt.Errorf("exec: %w for command %q: %s", runErr, command, string(errBuf)) - acc.AddError(err) - return - } - - metrics, err := e.parser.Parse(out) - if err != nil { - acc.AddError(err) - return - } - - if len(metrics) == 0 { - once.Do(func() { - e.Log.Debug(internal.NoMetricsCreatedMsg) - }) - } - - if e.exitcodeHandler != nil { - metrics = e.exitcodeHandler(metrics, runErr, errBuf) - } - - for _, m := range metrics { - acc.AddMetric(m) - } +func (e *Exec) Init() error { + return nil } func (e *Exec) SetParser(parser telegraf.Parser) { @@ -141,7 +68,7 @@ func (e *Exec) SetParser(parser telegraf.Parser) { unwrapped, ok := parser.(*models.RunningParser) if ok { if _, ok := unwrapped.Parser.(*nagios.Parser); ok { - e.exitcodeHandler = nagiosHandler + e.exitCodeHandler = nagiosHandler e.parseDespiteError = true } } @@ -188,22 +115,95 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { wg.Add(len(commands)) for _, command := range commands { - go e.ProcessCommand(command, acc, &wg) + go e.processCommand(command, acc, &wg) } wg.Wait() return nil } -func (e *Exec) Init() error { - return nil +func (c commandRunner) truncate(buf bytes.Buffer) bytes.Buffer { + // Limit the number of bytes. + didTruncate := false + if buf.Len() > maxStderrBytes { + buf.Truncate(maxStderrBytes) + didTruncate = true + } + if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 { + // Only show truncation if the newline wasn't the last character. + if i < buf.Len()-1 { + didTruncate = true + } + buf.Truncate(i) + } + if didTruncate { + buf.WriteString("...") + } + return buf +} + +// removeWindowsCarriageReturns removes all carriage returns from the input if the +// OS is Windows. It does not return any errors. +func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer { + if runtime.GOOS == "windows" { + var buf bytes.Buffer + for { + byt, err := b.ReadBytes(0x0D) + byt = bytes.TrimRight(byt, "\x0d") + if len(byt) > 0 { + buf.Write(byt) + } + if errors.Is(err, io.EOF) { + return buf + } + } + } + return b +} + +func (e *Exec) processCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) { + defer wg.Done() + + out, errBuf, runErr := e.runner.run(command, e.Environment, time.Duration(e.Timeout)) + if !e.IgnoreError && !e.parseDespiteError && runErr != nil { + err := fmt.Errorf("exec: %w for command %q: %s", runErr, command, string(errBuf)) + acc.AddError(err) + return + } + + metrics, err := e.parser.Parse(out) + if err != nil { + acc.AddError(err) + return + } + + if len(metrics) == 0 { + once.Do(func() { + e.Log.Debug(internal.NoMetricsCreatedMsg) + }) + } + + if e.exitCodeHandler != nil { + metrics = e.exitCodeHandler(metrics, runErr, errBuf) + } + + for _, m := range metrics { + acc.AddMetric(m) + } } func nagiosHandler(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric { return nagios.AddState(err, msg, metrics) } +func newExec() *Exec { + return &Exec{ + runner: commandRunner{}, + Timeout: config.Duration(time.Second * 5), + } +} + func init() { inputs.Add("exec", func() telegraf.Input { - return NewExec() + return newExec() }) } diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index 9ef1f705c88cd..def86cb1c9897 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -44,12 +44,12 @@ const malformedJSON = ` "status": "green", ` -type CarriageReturnTest struct { +type carriageReturnTest struct { input []byte output []byte } -var crTests = []CarriageReturnTest{ +var crTests = []carriageReturnTest{ {[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0d, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x32, 0x0d, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x20, 0x33}, @@ -73,7 +73,7 @@ type runnerMock struct { err error } -func newRunnerMock(out, errout []byte, err error) Runner { +func newRunnerMock(out, errout []byte, err error) runner { return &runnerMock{ out: out, errout: errout, @@ -81,7 +81,7 @@ func newRunnerMock(out, errout []byte, err error) Runner { } } -func (r runnerMock) Run(_ string, _ []string, _ time.Duration) ([]byte, []byte, error) { +func (r runnerMock) run(_ string, _ []string, _ time.Duration) ([]byte, []byte, error) { return r.out, r.errout, r.err } @@ -178,7 +178,7 @@ func TestExecCommandWithGlob(t *testing.T) { } require.NoError(t, parser.Init()) - e := NewExec() + e := newExec() e.Commands = []string{"/bin/ech* metric_value"} e.SetParser(&parser) @@ -198,7 +198,7 @@ func TestExecCommandWithoutGlob(t *testing.T) { } require.NoError(t, parser.Init()) - e := NewExec() + e := newExec() e.Commands = []string{"/bin/echo metric_value"} e.SetParser(&parser) @@ -217,7 +217,7 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) { DataType: "string", } require.NoError(t, parser.Init()) - e := NewExec() + e := newExec() e.Commands = []string{"echo metric_value"} e.SetParser(&parser) @@ -236,7 +236,7 @@ func TestExecCommandWithEnv(t *testing.T) { DataType: "string", } require.NoError(t, parser.Init()) - e := NewExec() + e := newExec() e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"} e.Environment = []string{"METRIC_NAME=metric_value"} e.SetParser(&parser) @@ -283,17 +283,17 @@ func TestTruncate(t *testing.T) { }, }, { - name: "should truncate to the MaxStderrBytes", + name: "should truncate to the maxStderrBytes", bufF: func() *bytes.Buffer { var b bytes.Buffer - for i := 0; i < 2*MaxStderrBytes; i++ { + for i := 0; i < 2*maxStderrBytes; i++ { b.WriteByte('b') } return &b }, expF: func() *bytes.Buffer { var b bytes.Buffer - for i := 0; i < MaxStderrBytes; i++ { + for i := 0; i < maxStderrBytes; i++ { b.WriteByte('b') } b.WriteString("...") @@ -302,7 +302,7 @@ func TestTruncate(t *testing.T) { }, } - c := CommandRunner{} + c := commandRunner{} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { res := c.truncate(*tt.bufF()) @@ -340,7 +340,7 @@ func TestCSVBehavior(t *testing.T) { require.NoError(t, parser.Init()) // Setup the plugin - plugin := NewExec() + plugin := newExec() plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""} plugin.Log = testutil.Logger{} plugin.SetParser(parser) @@ -408,7 +408,7 @@ func TestCSVBehavior(t *testing.T) { func TestCases(t *testing.T) { // Register the plugin inputs.Add("exec", func() telegraf.Input { - return NewExec() + return newExec() }) // Setup the plugin diff --git a/plugins/inputs/exec/run_notwinodws.go b/plugins/inputs/exec/run_notwinodws.go index adad0925bcb9a..0fdaf2e73eb37 100644 --- a/plugins/inputs/exec/run_notwinodws.go +++ b/plugins/inputs/exec/run_notwinodws.go @@ -15,7 +15,7 @@ import ( "github.com/influxdata/telegraf/internal" ) -func (c CommandRunner) Run( +func (c commandRunner) run( command string, environments []string, timeout time.Duration, diff --git a/plugins/inputs/exec/run_windows.go b/plugins/inputs/exec/run_windows.go index e88d268b46633..fad0160b3119a 100644 --- a/plugins/inputs/exec/run_windows.go +++ b/plugins/inputs/exec/run_windows.go @@ -15,7 +15,7 @@ import ( "github.com/influxdata/telegraf/internal" ) -func (c CommandRunner) Run( +func (c commandRunner) run( command string, environments []string, timeout time.Duration, diff --git a/plugins/inputs/execd/execd.go b/plugins/inputs/execd/execd.go index 774c063a91af2..91eef20839c66 100644 --- a/plugins/inputs/execd/execd.go +++ b/plugins/inputs/execd/execd.go @@ -45,6 +45,13 @@ func (*Execd) SampleConfig() string { return sampleConfig } +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return errors.New("no command specified") + } + return nil +} + func (e *Execd) SetParser(parser telegraf.Parser) { e.parser = parser e.outputReader = e.cmdReadOut @@ -168,13 +175,6 @@ func (e *Execd) cmdReadErr(out io.Reader) { } } -func (e *Execd) Init() error { - if len(e.Command) == 0 { - return errors.New("no command specified") - } - return nil -} - func init() { inputs.Add("execd", func() telegraf.Input { return &Execd{ diff --git a/plugins/inputs/execd/shim/goshim.go b/plugins/inputs/execd/shim/goshim.go index 3ec28edca3635..7a06a342f2ae2 100644 --- a/plugins/inputs/execd/shim/goshim.go +++ b/plugins/inputs/execd/shim/goshim.go @@ -23,13 +23,13 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/influx" ) -type empty struct{} - var ( envVarEscaper = strings.NewReplacer( `"`, `\"`, `\`, `\\`, ) + oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim" + newpkg = "github.com/influxdata/telegraf/plugins/common/shim" ) const ( @@ -50,10 +50,7 @@ type Shim struct { stderr io.Writer } -var ( - oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim" - newpkg = "github.com/influxdata/telegraf/plugins/common/shim" -) +type empty struct{} // New creates a new shim interface func New() *Shim { @@ -166,6 +163,50 @@ loop: return nil } +// LoadConfig loads and adds the inputs to the shim +func (s *Shim) LoadConfig(filePath *string) error { + loadedInputs, err := LoadConfig(filePath) + if err != nil { + return err + } + return s.AddInputs(loadedInputs) +} + +// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and +// have registered themselves with the registry. This makes loading plugins +// without having to define a config dead easy. +func DefaultImportedPlugins() (i []telegraf.Input, e error) { + for _, inputCreatorFunc := range inputs.Inputs { + i = append(i, inputCreatorFunc()) + } + return i, nil +} + +// LoadConfig loads the config and returns inputs that later need to be loaded. +func LoadConfig(filePath *string) ([]telegraf.Input, error) { + if filePath == nil || *filePath == "" { + return DefaultImportedPlugins() + } + + b, err := os.ReadFile(*filePath) + if err != nil { + return nil, err + } + + s := expandEnvVars(b) + + conf := struct { + Inputs map[string][]toml.Primitive + }{} + + md, err := toml.Decode(s, &conf) + if err != nil { + return nil, err + } + + return loadConfigIntoInputs(md, conf.Inputs) +} + func hasQuit(ctx context.Context) bool { select { case <-ctx.Done(): @@ -252,50 +293,6 @@ func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc tel } } -// LoadConfig loads and adds the inputs to the shim -func (s *Shim) LoadConfig(filePath *string) error { - loadedInputs, err := LoadConfig(filePath) - if err != nil { - return err - } - return s.AddInputs(loadedInputs) -} - -// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and -// have registered themselves with the registry. This makes loading plugins -// without having to define a config dead easy. -func DefaultImportedPlugins() (i []telegraf.Input, e error) { - for _, inputCreatorFunc := range inputs.Inputs { - i = append(i, inputCreatorFunc()) - } - return i, nil -} - -// LoadConfig loads the config and returns inputs that later need to be loaded. -func LoadConfig(filePath *string) ([]telegraf.Input, error) { - if filePath == nil || *filePath == "" { - return DefaultImportedPlugins() - } - - b, err := os.ReadFile(*filePath) - if err != nil { - return nil, err - } - - s := expandEnvVars(b) - - conf := struct { - Inputs map[string][]toml.Primitive - }{} - - md, err := toml.Decode(s, &conf) - if err != nil { - return nil, err - } - - return loadConfigIntoInputs(md, conf.Inputs) -} - func expandEnvVars(contents []byte) string { return os.Expand(string(contents), getEnv) } diff --git a/plugins/inputs/execd/shim/input.go b/plugins/inputs/execd/shim/input.go index 6dff9cd7f1002..cf100256fe0b3 100644 --- a/plugins/inputs/execd/shim/input.go +++ b/plugins/inputs/execd/shim/input.go @@ -7,14 +7,17 @@ type inputShim struct { Input telegraf.Input } +// LogName satisfies the MetricMaker interface func (i inputShim) LogName() string { return "" } +// MakeMetric satisfies the MetricMaker interface func (i inputShim) MakeMetric(m telegraf.Metric) telegraf.Metric { return m // don't need to do anything to it. } +// Log satisfies the MetricMaker interface func (i inputShim) Log() telegraf.Logger { return nil }