diff --git a/plugins/processors/ifname/README.md b/plugins/processors/ifname/README.md index d06f9e9fd0a28..d68899db40a53 100644 --- a/plugins/processors/ifname/README.md +++ b/plugins/processors/ifname/README.md @@ -59,6 +59,11 @@ Telegraf minimum version: Telegraf 1.15.0 ## stay in order set this to true. keeping the metrics ordered may ## be slightly slower # ordered = false + + ## cache_ttl is the amount of time interface names are cached for a + ## given agent. After this period elapses if names are needed they + ## will be retrieved again. + # cache_ttl = "8h" ``` ### Example processing: diff --git a/plugins/processors/ifname/cache.go b/plugins/processors/ifname/cache.go index c3232f5310b75..20c821aef2f43 100644 --- a/plugins/processors/ifname/cache.go +++ b/plugins/processors/ifname/cache.go @@ -6,9 +6,7 @@ import ( "container/list" ) -// Type aliases let the implementation be more generic -type keyType = string -type valType = nameMap +type LRUValType = TTLValType type hashType map[keyType]*list.Element @@ -21,7 +19,7 @@ type LRUCache struct { // Pair is the value of a list node. type Pair struct { key keyType - value valType + value LRUValType } // initializes a new LRUCache. @@ -34,7 +32,7 @@ func NewLRUCache(capacity uint) LRUCache { } // Get a list node from the hash map. -func (c *LRUCache) Get(key keyType) (valType, bool) { +func (c *LRUCache) Get(key keyType) (LRUValType, bool) { // check if list node exists if node, ok := c.m[key]; ok { val := node.Value.(*list.Element).Value.(Pair).value @@ -42,11 +40,11 @@ func (c *LRUCache) Get(key keyType) (valType, bool) { c.l.MoveToFront(node) return val, true } - return valType{}, false + return LRUValType{}, false } // Put key and value in the LRUCache -func (c *LRUCache) Put(key keyType, value valType) { +func (c *LRUCache) Put(key keyType, value LRUValType) { // check if list node exists if node, ok := c.m[key]; ok { // move the node to front @@ -76,3 +74,10 @@ func (c *LRUCache) Put(key keyType, value valType) { c.m[key] = ptr } } + +func (c *LRUCache) Delete(key keyType) { + if node, ok := c.m[key]; ok { + c.l.Remove(node) + delete(c.m, key) + } +} diff --git a/plugins/processors/ifname/cache_test.go b/plugins/processors/ifname/cache_test.go index 986c2d494f741..7d11ee29a8a55 100644 --- a/plugins/processors/ifname/cache_test.go +++ b/plugins/processors/ifname/cache_test.go @@ -9,10 +9,10 @@ import ( func TestCache(t *testing.T) { c := NewLRUCache(2) - c.Put("ones", nameMap{1: "one"}) - twoMap := nameMap{2: "two"} + c.Put("ones", LRUValType{val: nameMap{1: "one"}}) + twoMap := LRUValType{val: nameMap{2: "two"}} c.Put("twos", twoMap) - c.Put("threes", nameMap{3: "three"}) + c.Put("threes", LRUValType{val: nameMap{3: "three"}}) _, ok := c.Get("ones") require.False(t, ok) diff --git a/plugins/processors/ifname/ifname.go b/plugins/processors/ifname/ifname.go index be57fb14c6ece..ad633a0ae2900 100644 --- a/plugins/processors/ifname/ifname.go +++ b/plugins/processors/ifname/ifname.go @@ -7,6 +7,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/snmp" si "github.com/influxdata/telegraf/plugins/inputs/snmp" @@ -66,11 +67,18 @@ var sampleConfig = ` ## stay in order set this to true. keeping the metrics ordered may ## be slightly slower # ordered = false + + ## cache_ttl is the amount of time interface names are cached for a + ## given agent. After this period elapses if names are needed they + ## will be retrieved again. + # cache_ttl = "8h" ` type nameMap map[uint64]string -type mapFunc func(agent string) (nameMap, error) +type keyType = string +type valType = nameMap +type mapFunc func(agent string) (nameMap, error) type makeTableFunc func(string) (*si.Table, error) type sigMap map[string](chan struct{}) @@ -82,9 +90,10 @@ type IfName struct { snmp.ClientConfig - CacheSize uint `toml:"max_cache_entries"` - MaxParallelLookups int `toml:"max_parallel_lookups"` - Ordered bool `toml:"ordered"` + CacheSize uint `toml:"max_cache_entries"` + MaxParallelLookups int `toml:"max_parallel_lookups"` + Ordered bool `toml:"ordered"` + CacheTTL config.Duration `toml:"cache_ttl"` Log telegraf.Logger `toml:"-"` @@ -92,7 +101,7 @@ type IfName struct { ifXTable *si.Table `toml:"-"` rwLock sync.RWMutex `toml:"-"` - cache *LRUCache `toml:"-"` + cache *TTLCache `toml:"-"` parallel parallel.Parallel `toml:"-"` acc telegraf.Accumulator `toml:"-"` @@ -106,6 +115,8 @@ type IfName struct { sigsLock sync.Mutex `toml:"-"` } +const minRetry time.Duration = 5 * time.Minute + func (d *IfName) SampleConfig() string { return sampleConfig } @@ -118,7 +129,7 @@ func (d *IfName) Init() error { d.getMapRemote = d.getMapRemoteNoMock d.makeTable = makeTableNoMock - c := NewLRUCache(d.CacheSize) + c := NewTTLCache(time.Duration(d.CacheTTL), d.CacheSize) d.cache = &c d.sigs = make(sigMap) @@ -144,17 +155,42 @@ func (d *IfName) addTag(metric telegraf.Metric) error { return fmt.Errorf("couldn't parse source tag as uint") } - m, err := d.getMap(agent) - if err != nil { - return fmt.Errorf("couldn't retrieve the table of interface names: %w", err) - } + firstTime := true + for { + m, age, err := d.getMap(agent) + if err != nil { + return fmt.Errorf("couldn't retrieve the table of interface names: %w", err) + } - name, found := m[num] - if !found { - return fmt.Errorf("interface number %d isn't in the table of interface names", num) + name, found := m[num] + if found { + // success + metric.AddTag(d.DestTag, name) + return nil + } + + // We have the agent's interface map but it doesn't contain + // the interface we're interested in. If the entry is old + // enough, retrieve it from the agent once more. + if age < minRetry { + return fmt.Errorf("interface number %d isn't in the table of interface names", num) + } + + if firstTime { + d.invalidate(agent) + firstTime = false + continue + } + + // not found, cache hit, retrying + return fmt.Errorf("missing interface but couldn't retrieve table") } - metric.AddTag(d.DestTag, name) - return nil +} + +func (d *IfName) invalidate(agent string) { + d.rwLock.RLock() + d.cache.Delete(agent) + d.rwLock.RUnlock() } func (d *IfName) Start(acc telegraf.Accumulator) error { @@ -203,15 +239,15 @@ func (d *IfName) Stop() error { // getMap gets the interface names map either from cache or from the SNMP // agent -func (d *IfName) getMap(agent string) (nameMap, error) { +func (d *IfName) getMap(agent string) (entry nameMap, age time.Duration, err error) { var sig chan struct{} // Check cache d.rwLock.RLock() - m, ok := d.cache.Get(agent) + m, ok, age := d.cache.Get(agent) d.rwLock.RUnlock() if ok { - return m, nil + return m, age, nil } // Is this the first request for this agent? @@ -229,12 +265,12 @@ func (d *IfName) getMap(agent string) (nameMap, error) { <-sig // Check cache again d.rwLock.RLock() - m, ok := d.cache.Get(agent) + m, ok, age := d.cache.Get(agent) d.rwLock.RUnlock() if ok { - return m, nil + return m, age, nil } else { - return nil, fmt.Errorf("getting remote table from cache") + return nil, 0, fmt.Errorf("getting remote table from cache") } } @@ -242,7 +278,7 @@ func (d *IfName) getMap(agent string) (nameMap, error) { // agent. // Make the SNMP request - m, err := d.getMapRemote(agent) + m, err = d.getMapRemote(agent) if err != nil { //failure. signal without saving to cache d.sigsLock.Lock() @@ -250,7 +286,7 @@ func (d *IfName) getMap(agent string) (nameMap, error) { delete(d.sigs, agent) d.sigsLock.Unlock() - return nil, fmt.Errorf("getting remote table: %w", err) + return nil, 0, fmt.Errorf("getting remote table: %w", err) } // Cache it @@ -264,7 +300,7 @@ func (d *IfName) getMap(agent string) (nameMap, error) { delete(d.sigs, agent) d.sigsLock.Unlock() - return m, nil + return m, 0, nil } func (d *IfName) getMapRemoteNoMock(agent string) (nameMap, error) { @@ -310,6 +346,7 @@ func init() { Version: 2, Community: "public", }, + CacheTTL: config.Duration(8 * time.Hour), } }) } diff --git a/plugins/processors/ifname/ifname_test.go b/plugins/processors/ifname/ifname_test.go index 99354e7b1cfce..8eff68006f99b 100644 --- a/plugins/processors/ifname/ifname_test.go +++ b/plugins/processors/ifname/ifname_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/snmp" si "github.com/influxdata/telegraf/plugins/inputs/snmp" @@ -100,6 +101,7 @@ func TestGetMap(t *testing.T) { Version: 2, Timeout: internal.Duration{Duration: 5 * time.Second}, // doesn't work with 0 timeout }, + CacheTTL: config.Duration(10 * time.Second), } // This test mocks the snmp transaction so don't run net-snmp @@ -137,7 +139,7 @@ func TestGetMap(t *testing.T) { wgReq.Add(1) go func() { defer wgReq.Done() - m, err := d.getMap("agent") + m, _, err := d.getMap("agent") require.NoError(t, err) require.Equal(t, expected, m) }() diff --git a/plugins/processors/ifname/ttl_cache.go b/plugins/processors/ifname/ttl_cache.go new file mode 100644 index 0000000000000..8f9c4ae653499 --- /dev/null +++ b/plugins/processors/ifname/ttl_cache.go @@ -0,0 +1,52 @@ +package ifname + +import ( + "time" +) + +type TTLValType struct { + time time.Time // when entry was added + val valType +} + +type timeFunc func() time.Time + +type TTLCache struct { + validDuration time.Duration + lru LRUCache + now timeFunc +} + +func NewTTLCache(valid time.Duration, capacity uint) TTLCache { + return TTLCache{ + lru: NewLRUCache(capacity), + validDuration: valid, + now: time.Now, + } +} + +func (c *TTLCache) Get(key keyType) (valType, bool, time.Duration) { + v, ok := c.lru.Get(key) + if !ok { + return valType{}, false, 0 + } + age := c.now().Sub(v.time) + if age < c.validDuration { + return v.val, ok, age + } else { + c.lru.Delete(key) + return valType{}, false, 0 + } +} + +func (c *TTLCache) Put(key keyType, value valType) { + v := TTLValType{ + val: value, + time: c.now(), + } + c.lru.Put(key, v) +} + +func (c *TTLCache) Delete(key keyType) { + c.lru.Delete(key) +} diff --git a/plugins/processors/ifname/ttl_cache_test.go b/plugins/processors/ifname/ttl_cache_test.go new file mode 100644 index 0000000000000..8ae57d6df9265 --- /dev/null +++ b/plugins/processors/ifname/ttl_cache_test.go @@ -0,0 +1,43 @@ +package ifname + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestTTLCacheExpire(t *testing.T) { + c := NewTTLCache(1*time.Second, 100) + + c.now = func() time.Time { + return time.Unix(0, 0) + } + + c.Put("ones", nameMap{1: "one"}) + require.Len(t, c.lru.m, 1) + + c.now = func() time.Time { + return time.Unix(1, 0) + } + + _, ok, _ := c.Get("ones") + require.False(t, ok) + require.Len(t, c.lru.m, 0) + require.Equal(t, c.lru.l.Len(), 0) +} + +func TestTTLCache(t *testing.T) { + c := NewTTLCache(1*time.Second, 100) + + c.now = func() time.Time { + return time.Unix(0, 0) + } + + expected := nameMap{1: "one"} + c.Put("ones", expected) + + actual, ok, _ := c.Get("ones") + require.True(t, ok) + require.Equal(t, expected, actual) +}