diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 525dee11d47..d9f94a3e633 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -61,6 +61,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] - Fix the paths in the .cmd script added to the path by the Windows MSI to point to the new C:\Program Files installation location. https://github.com/elastic/elastic-stack-installers/pull/238 +- Change cache processor documentation from `write_period` to `write_interval`. {pull}38561[38561] +- Fix cache processor expiries heap cleanup on partial file writes. {pull}38561[38561] +- Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561] *Auditbeat* diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc index 6c5ccf2d197..bdd9629dea2 100644 --- a/libbeat/processors/cache/docs/cache.asciidoc +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -54,7 +54,7 @@ One of `backend.memory.id` or `backend.file.id` must be provided. `backend.capacity`:: The number of elements that can be stored in the cache. `put` operations that would cause the capacity to be exceeded will result in evictions of the oldest elements. Values at or below zero indicate no limit. The capacity should not be lower than the number of elements that are expected to be referenced when processing the input as evicted elements are lost. The default is `0`, no limit. `backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache. `backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache. -`backend.file.write_period`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_period` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes. +`backend.file.write_interval`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_interval` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes. One of `put`, `get` or `delete` must be provided. diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go index d3820600acf..884a80a2d1d 100644 --- a/libbeat/processors/cache/file_store.go +++ b/libbeat/processors/cache/file_store.go @@ -287,8 +287,8 @@ func (c *fileStore) writeState(final bool) { enc := json.NewEncoder(f) enc.SetEscapeHTML(false) now := time.Now() - for c.expiries.Len() != 0 { - e := c.expiries.pop() + for i := 0; i < c.expiries.Len(); i++ { + e := c.expiries[i] if e.Expires.Before(now) { // Don't write expired elements. continue diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go index 22d43083e04..163899eca0b 100644 --- a/libbeat/processors/cache/file_store_test.go +++ b/libbeat/processors/cache/file_store_test.go @@ -352,6 +352,109 @@ var fileStoreTests = []struct { {Key: "three", Value: 3.0}, }, }, + { + name: "periodic_write", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/periodic_write", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/periodic_write", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + dirty: false, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 1: { + doTo: func(s *fileStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &fileStore{path: "testdata/periodic_write", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + dirty: true, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 2: { + doTo: func(s *fileStore) error { + s.writeState(false) + return nil + }, + want: &fileStore{path: "testdata/periodic_write", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + dirty: false, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + wantPersisted: []*CacheEntry{ + // Numeric values are float due to JSON round-trip. + {Key: "one", Value: 1.0}, + {Key: "two", Value: 2.0}, + {Key: "three", Value: 3.0}, + }, + }, } func TestFileStore(t *testing.T) { diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index a2fec841d27..09ea2c81f53 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -172,13 +172,21 @@ func (c *memStore) Put(key string, val any) error { defer c.mu.Unlock() now := time.Now() c.evictExpired(now) - e := &CacheEntry{ - Key: key, - Value: val, - Expires: now.Add(c.ttl), + // If the key is being overwritten we remove its previous expiry entry + // this will prevent expiries heap to grow with large TTLs and recurring keys. + if prev, found := c.cache[key]; found { + prev.Value = val + prev.Expires = now.Add(c.ttl) + heap.Fix(&c.expiries, prev.index) + } else { + e := &CacheEntry{ + Key: key, + Value: val, + Expires: now.Add(c.ttl), + } + c.cache[key] = e + heap.Push(&c.expiries, e) } - c.cache[key] = e - heap.Push(&c.expiries, e) c.dirty = true return nil } diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 4a6cf500e05..d6ca53694ce 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -335,6 +335,103 @@ var memStoreTests = []struct { }, }, }, + { + name: "re-hit", + cfg: config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + steps: []memStoreTestSteps{ + 0: { + doTo: func(s *memStore) error { + putCfg := config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(10 * time.Minute), + }, + } + s.add(putCfg) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + dirty: false, + ttl: 10 * time.Minute, + cap: 1000, + effort: 10, + }, + }, + 1: { + doTo: func(s *memStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + dirty: true, + ttl: 10 * time.Minute, + cap: 1000, + effort: 10, + }, + }, + 2: { + doTo: func(s *memStore) error { + s.Put("one", 1) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 1}, + "two": {Key: "two", Value: int(2), index: 0}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "two", Value: int(2), index: 0}, + {Key: "one", Value: int(1), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + dirty: true, + ttl: 10 * time.Minute, + cap: 1000, + effort: 10, + }, + }, + }, + }, } func TestMemStore(t *testing.T) {