Skip to content

Commit

Permalink
Refactor state_* metricsets to share response from endpoint (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored May 18, 2021
1 parent a183d65 commit 96481f1
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Apache: convert status.total_kbytes to status.total_bytes in fleet mode. {pull}23022[23022]
- Release MSSQL as GA {pull}23146[23146]
- Add support for SASL/SCRAM authentication to the Kafka module. {pull}24810[24810]
- Refactor state_* metricsets to share response from endpoint. {pull}25640[25640]
- Add server id to zookeeper events. {pull}25550[25550]
- Add additional network metrics to docker/network {pull}25354[25354]

Expand Down
16 changes: 11 additions & 5 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Prometheus interface {

GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error)

ProcessMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]common.MapStr, error)

ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error
}

Expand Down Expand Up @@ -139,11 +141,7 @@ type MetricsMapping struct {
ExtraFields map[string]string
}

func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) {
families, err := p.GetFamilies()
if err != nil {
return nil, err
}
func (p *prometheus) ProcessMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]common.MapStr, error) {

eventsMap := map[string]common.MapStr{}
infoMetrics := []*infoMetricData{}
Expand Down Expand Up @@ -260,6 +258,14 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS
return events, nil
}

func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) {
families, err := p.GetFamilies()
if err != nil {
return nil, err
}
return p.ProcessMetrics(families, mapping)
}

// infoMetricData keeps data about an infoMetric
type infoMetricData struct {
Labels common.MapStr
Expand Down
113 changes: 113 additions & 0 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kubernetes

import (
"sync"
"time"

"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"

p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
)

func init() {
// Register the ModuleFactory function for the "kubernetes" module.
if err := mb.Registry.AddModule("kubernetes", ModuleBuilder()); err != nil {
panic(err)
}
}

type Module interface {
mb.Module
GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
}

type familiesCache struct {
sharedFamilies []*dto.MetricFamily
lastFetchErr error
lastFetchTimestamp time.Time
}

type kubeStateMetricsCache struct {
cacheMap map[uint64]*familiesCache
lock sync.Mutex
}

func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.cacheMap[hash]; !ok {
c.cacheMap[hash] = &familiesCache{}
}
return c.cacheMap[hash]
}

type module struct {
mb.BaseModule

kubeStateMetricsCache *kubeStateMetricsCache
familiesCache *familiesCache
}

func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeStateMetricsCache := &kubeStateMetricsCache{
cacheMap: make(map[uint64]*familiesCache),
}
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache")
}
// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := kubeStateMetricsCache.getCacheMapEntry(hash)
m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
familiesCache: familiesCache,
}
return &m, nil
}
}

func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) {
m.kubeStateMetricsCache.lock.Lock()
defer m.kubeStateMetricsCache.lock.Unlock()

now := time.Now()

if m.familiesCache.lastFetchTimestamp.IsZero() || now.Sub(m.familiesCache.lastFetchTimestamp) > m.Config().Period {
m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr = prometheus.GetFamilies()
m.familiesCache.lastFetchTimestamp = now
}

return m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr
}

func generateCacheHash(host []string) (uint64, error) {
id, err := hashstructure.Hash(host, nil)
if err != nil {
return 0, err
}
return id, nil
}
14 changes: 13 additions & 1 deletion metricbeat/module/kubernetes/state_container/state_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package state_container

import (
"fmt"
"strings"

"github.com/pkg/errors"
Expand All @@ -26,6 +27,7 @@ import (
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -89,6 +91,7 @@ type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -99,10 +102,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewContainerMetadataEnricher(base, false),
mod: mod,
}, nil
}

Expand All @@ -112,7 +120,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting families")
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
return errors.Wrap(err, "error getting event")
}
Expand Down
16 changes: 15 additions & 1 deletion metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package state_cronjob

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
)

func init() {
Expand All @@ -40,6 +43,7 @@ type CronJobMetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
mapping *p.MetricsMapping
mod k8smod.Module
}

// NewCronJobMetricSet returns a prometheus based metricset for CronJobs
Expand All @@ -49,9 +53,15 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

return &CronJobMetricSet{
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{
"kube_cronjob_info": p.InfoMetric(),
Expand All @@ -77,7 +87,11 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheus.GetProcessedMetrics(m.mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting family metrics")
}
events, err := m.prometheus.ProcessMetrics(families, m.mapping)
if err != nil {
return errors.Wrap(err, "error getting metrics")
}
Expand Down
17 changes: 16 additions & 1 deletion metricbeat/module/kubernetes/state_daemonset/state_daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package state_daemonset

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -69,6 +72,7 @@ type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -79,10 +83,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false),
mod: mod,
}, nil
}

Expand All @@ -92,7 +101,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package state_deployment

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -70,6 +73,7 @@ type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -80,10 +84,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false),
mod: mod,
}, nil
}

Expand All @@ -93,7 +102,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Loading

0 comments on commit 96481f1

Please sign in to comment.