Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Cache processor docs and memory fixes. #38561

Merged
merged 22 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6f0019a
Fix write_period option name to match documentation
marc-gr Mar 22, 2024
9a91a1e
Change naming to write_interval
marc-gr Mar 25, 2024
f85f559
Merge branch 'main' into fix/cache-docs
marc-gr Mar 25, 2024
09315aa
Fix expiries heap cleanup on partial file writes
marc-gr Mar 25, 2024
b329901
Fix expiries infinite growth when large TTLs and recurring keys are c…
marc-gr Mar 25, 2024
d881e0e
Fix linting errors
marc-gr Mar 25, 2024
149aa9b
Merge branch 'main' into fix/cache-docs
marc-gr Mar 25, 2024
4d64a50
Add changelog entries
marc-gr Mar 25, 2024
dd4c956
Merge branch 'main' into fix/cache-docs
marc-gr Mar 25, 2024
00e7bde
Revert "Fix linting errors"
marc-gr Mar 26, 2024
c639c1e
Use heap.Fix instead of heap.Remove+heap.Push
marc-gr Mar 26, 2024
441d9cc
Add tests for the new functionality
marc-gr Mar 26, 2024
f0576fe
Merge branch 'main' into fix/cache-docs
marc-gr Mar 26, 2024
cd07008
Merge branch 'main' into fix/cache-docs
marc-gr Mar 26, 2024
2bd167c
Merge branch 'main' into fix/cache-docs
marc-gr Mar 27, 2024
efbf245
Merge branch 'main' into fix/cache-docs
marc-gr Mar 27, 2024
2e18707
Merge branch 'main' into fix/cache-docs
marc-gr Mar 27, 2024
12346cc
Merge branch 'main' into fix/cache-docs
marc-gr Mar 27, 2024
de7f23b
Merge branch 'main' into fix/cache-docs
marc-gr Apr 2, 2024
312fe92
Merge branch 'main' into fix/cache-docs
marc-gr Apr 3, 2024
3a0423a
Merge branch 'main' into fix/cache-docs
marc-gr Apr 5, 2024
4575de0
Merge branch 'main' into fix/cache-docs
andrewkroh Apr 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- Fix the paths in the .cmd script added to the path by the Windows MSI to point to the new C:\Program Files installation location. https://github.com/elastic/elastic-stack-installers/pull/238
- Change cache processor documentation from `write_period` to `write_interval`. {pull}38561[38561]
- Fix cache processor expiries heap cleanup on partial file writes. {pull}38561[38561]
- Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/cache/docs/cache.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ One of `backend.memory.id` or `backend.file.id` must be provided.
`backend.capacity`:: The number of elements that can be stored in the cache. `put` operations that would cause the capacity to be exceeded will result in evictions of the oldest elements. Values at or below zero indicate no limit. The capacity should not be lower than the number of elements that are expected to be referenced when processing the input as evicted elements are lost. The default is `0`, no limit.
`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache.
`backend.file.write_period`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_period` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.
`backend.file.write_interval`:: The interval between periodic cache writes to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_interval` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.

One of `put`, `get` or `delete` must be provided.

Expand Down
4 changes: 2 additions & 2 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@
var e CacheEntry
err = dec.Decode(&e)
if err != nil {
if err != io.EOF {

Check failure on line 194 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 194 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
switch err := err.(type) {

Check failure on line 195 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (windows)

type switch on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)

Check failure on line 195 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (linux)

type switch on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
case *json.SyntaxError:
c.log.Errorw("failed to read state element", "error", err, "path", c.path, "offset", err.Offset)
default:
Expand Down Expand Up @@ -287,8 +287,8 @@
enc := json.NewEncoder(f)
enc.SetEscapeHTML(false)
now := time.Now()
for c.expiries.Len() != 0 {
e := c.expiries.pop()
for i := 0; i < c.expiries.Len(); i++ {
e := c.expiries[i]
Comment on lines +290 to +291
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i := 0; i < c.expiries.Len(); i++ {
e := c.expiries[i]
for _, e := range c.expiries {

The previously logic was obviously written when we were only planning to write out on exit, so the iterator was destructive. It would be nice if the write out also dropped expired elements, but that cannot happen non-destructively within this iterator. It could be done by a mark during this iteration, followed by a sweep in the postamble.

Not for now, but this would look like

diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go
index d3820600ac..062a64a916 100644
--- a/libbeat/processors/cache/file_store.go
+++ b/libbeat/processors/cache/file_store.go
@@ -287,10 +287,12 @@ func (c *fileStore) writeState(final bool) {
        enc := json.NewEncoder(f)
        enc.SetEscapeHTML(false)
        now := time.Now()
-       for c.expiries.Len() != 0 {
-               e := c.expiries.pop()
+       var expired int
+       for _, e := range c.expiries {
                if e.Expires.Before(now) {
-                       // Don't write expired elements.
+                       // Don't write expired elements; mark and continue.
+                       e.Expires = time.Time{}
+                       expired++
                        continue
                }
                err = enc.Encode(e)
@@ -299,6 +301,17 @@ func (c *fileStore) writeState(final bool) {
                        return
                }
        }
+       if expired != 0 {
+               // Fix up heap and sweep cache.
+               heap.Init(&c.expiries)
+               for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ {
+                       if c.expiries[0].Expires.After(now) {
+                               break
+                       }
+                       e := c.expiries.pop()
+                       delete(c.cache, e.Key)
+               }
+       }
        // Only mark as not dirty if we succeeded in the write.
        c.dirty = false
        c.log.Infow("write state to file succeeded", "id", c.id, "path", c.path)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is super worth it, does not this operation of dropping also happen at every Put?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: not really related, but I noticed the file write can lead to some issues when the cache is really large, since it stops the world until done due to the need of locking the cache. Not sure about how to avoid it, but just pointing it out as it could cause issues if write_interval is on the low end and cache size is on the larger.

Copy link
Contributor

@efd6 efd6 Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is super worth it, does not this operation of dropping also happen at every Put?

It does, but only "just enough". This takes a small amount of time relative to the file write to minimize the memory usage.

PS: not really related, but I noticed the file write can lead to some issues when the cache is really large, since it stops the world until done due to the need of locking the cache. Not sure about how to avoid it, but just pointing it out as it could cause issues if write_interval is on the low end and cache size is on the larger.

If this does become an issue, the approach would be to:

try lock write
return if unsuccess

go:
    defer unlock write
    lock cache
    obtain worklist of entries more recent than the expiry time
    for each worklist item:
        write element if still more recent than noted time of heap front
    update note of time of current heap front
    unlock cache
    for each remaining worklist item: # or maybe a small set of worklist
        lock cache
        write element if still more recent than expiry time
        unlock cache

This requires adding a new field to the memStore type that keeps a track of the front of the heap for this purpose, and a lock to fileStore.

The approach here will potentially write out deleted items, but should never write out invalid items.

However, it's significantly more complex and unless there is actually an issue, I don't think this is worth it; tuning should be a first option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does, but only "just enough". This takes a small amount of time relative to the file write to minimize the memory usage.

The only thing a bit odd to me, unless I got something wrong, is that this memory benefit only applies if using the file write intervals, while when using just memory backend or file without the write intervals, this would just be skipped and the "just enough" effort would be done. That is why I mention that I am not sure is worth it, unless we want to do it somewhere else to benefit all configurations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm fine with things how they are; the code looks good to me. The only reason I suggested it is that the most common (and likely recommended use would be file-backed) and the cost of this work is small in comparison to the cost of the file write, so it was a reasonable place to insert the work in a way that is routinely run.

if e.Expires.Before(now) {
// Don't write expired elements.
continue
Expand Down
103 changes: 103 additions & 0 deletions libbeat/processors/cache/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,109 @@
{Key: "three", Value: 3.0},
},
},
{
name: "periodic_write",
cfg: config{
Store: &storeConfig{
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Get: &getConfig{},
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 1,
// TTL, capacity and effort are set only by put.
ttl: -1,
cap: -1,
effort: -1,
}},
steps: []fileStoreTestSteps{
0: {
doTo: func(s *fileStore) error {
putCfg := config{
Store: &storeConfig{
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Put: &putConfig{
TTL: ptrTo(time.Second),
},
}
s.add(putCfg)
return nil
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
}},
},
1: {
doTo: func(s *fileStore) error {
s.Put("one", 1)
s.Put("two", 2)
s.Put("three", 3)
return nil
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 0},
"two": {Key: "two", Value: int(2), index: 1},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "one", Value: int(1), index: 0},
{Key: "two", Value: int(2), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
}},
},
2: {
doTo: func(s *fileStore) error {
s.writeState(false)
return nil
},
want: &fileStore{path: "testdata/periodic_write", memStore: memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 0},
"two": {Key: "two", Value: int(2), index: 1},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "one", Value: int(1), index: 0},
{Key: "two", Value: int(2), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
}},
},
},
wantPersisted: []*CacheEntry{
// Numeric values are float due to JSON round-trip.
{Key: "one", Value: 1.0},
{Key: "two", Value: 2.0},
{Key: "three", Value: 3.0},
},
},
}

func TestFileStore(t *testing.T) {
Expand Down Expand Up @@ -410,7 +513,7 @@
var e CacheEntry
err = dec.Decode(&e)
if err != nil {
if err != io.EOF {

Check failure on line 516 in libbeat/processors/cache/file_store_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)

Check failure on line 516 in libbeat/processors/cache/file_store_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
t.Fatalf("unexpected error reading persisted cache data: %v", err)
}
break
Expand Down
20 changes: 14 additions & 6 deletions libbeat/processors/cache/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,21 @@ func (c *memStore) Put(key string, val any) error {
defer c.mu.Unlock()
now := time.Now()
c.evictExpired(now)
e := &CacheEntry{
Key: key,
Value: val,
Expires: now.Add(c.ttl),
// If the key is being overwritten we remove its previous expiry entry
// this will prevent expiries heap to grow with large TTLs and recurring keys.
if prev, found := c.cache[key]; found {
prev.Value = val
prev.Expires = now.Add(c.ttl)
heap.Fix(&c.expiries, prev.index)
} else {
e := &CacheEntry{
Key: key,
Value: val,
Expires: now.Add(c.ttl),
}
c.cache[key] = e
heap.Push(&c.expiries, e)
}
c.cache[key] = e
heap.Push(&c.expiries, e)
c.dirty = true
return nil
}
Expand Down
97 changes: 97 additions & 0 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,103 @@ var memStoreTests = []struct {
},
},
},
{
name: "re-hit",
cfg: config{
Store: &storeConfig{
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Get: &getConfig{},
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 1,
// TTL, capacity and effort are set only by put.
ttl: -1,
cap: -1,
effort: -1,
},
steps: []memStoreTestSteps{
0: {
doTo: func(s *memStore) error {
putCfg := config{
Store: &storeConfig{
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Put: &putConfig{
TTL: ptrTo(10 * time.Minute),
},
}
s.add(putCfg)
return nil
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: 10 * time.Minute,
cap: 1000,
effort: 10,
},
},
1: {
doTo: func(s *memStore) error {
s.Put("one", 1)
s.Put("two", 2)
s.Put("three", 3)
return nil
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 0},
"two": {Key: "two", Value: int(2), index: 1},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "one", Value: int(1), index: 0},
{Key: "two", Value: int(2), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: 10 * time.Minute,
cap: 1000,
effort: 10,
},
},
2: {
doTo: func(s *memStore) error {
s.Put("one", 1)
return nil
},
want: &memStore{
id: "test",
cache: map[string]*CacheEntry{
"one": {Key: "one", Value: int(1), index: 1},
"two": {Key: "two", Value: int(2), index: 0},
"three": {Key: "three", Value: int(3), index: 2},
},
expiries: expiryHeap{
{Key: "two", Value: int(2), index: 0},
{Key: "one", Value: int(1), index: 1},
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: 10 * time.Minute,
cap: 1000,
effort: 10,
},
},
},
},
}

func TestMemStore(t *testing.T) {
Expand Down
Loading