Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Disk queue implementation #21176

Merged
merged 97 commits into from
Sep 28, 2020
Merged

[libbeat] Disk queue implementation #21176

merged 97 commits into from
Sep 28, 2020

Conversation

faec
Copy link
Contributor

@faec faec commented Sep 18, 2020

What does this PR do?

This PR implements a new disk-based queue for libbeat.

This is a draft PR: it is ready to start receiving feedback and rolling reviews, but some implementation details are still pending and it isn't ready for checkin.

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

How to test this PR locally

Add to the beat configuration:

queue.disk:
  max_size: 1GB

The beat should operate as usual, and the event data should be stored in data/diskqueue while waiting to be ingested.

Release testing

While the preceding is good enough for smoke testing, we should include a few more scenarios for release prep:

  • Ingest from multiple beats (Filebeat and Metricbeat are essential since that's where we expect the most use, the rest would be nice to include as time allows)
  • Ingest to both standalone Elasticsearch and Elastic Cloud
  • Confirm that queued events persist through a beat restart (an easy way to test this is to run a beat initially without a working Elasticsearch, so it fills the disk queue, then restart it with Elasticsearch running as well. If you want to get extra granular, you can ingest small amounts at a time and view the queue's data files in data/diskqueue/NNN.seg in a hex editor -- the event data blobs are JSON and can be recognized visually -- but that is optional as that level of detail belongs more in the automated tests).
  • Try a wide range for max_size, both small enough that the queue consistently reaches its maximum size (which can be seen by checking the files in data/diskqueue) and large or unbounded (max_size=0).
  • Run with unbounded size targeting a nearly-full partition (using the queue.disk.path setting so the beat itself isn't on a full partition).
    • It's not recommended for many reasons to fill up the main system partition or the partition a beat is running on, but this queue should be able to operate with no problems (beyond the intrinsic throughput decrease) if the partition hosting its data is full.

Since the queue itself operates strictly locally, it is fine to test each of these scenarios in isolation -- we wouldn't gain much practically by testing on the powerset of the preceding conditions, so follow whatever sequence makes things simplest.

Copy link
Contributor

@kvch kvch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome PR so far. I've left a few notes for you. Tomorrow I will look at it again with fresh eyes.

Copy link
Contributor

@kvch kvch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge it! I assume when tests are added a few bugs might come out, but that is fine.
Awesome PR \o/

@faec faec marked this pull request as ready for review September 24, 2020 14:38
@elasticmachine
Copy link
Collaborator

Pinging @elastic/integrations (Team:Integrations)

func SettingsForUserConfig(config *common.Config) (Settings, error) {
userConfig := userConfig{}
if err := config.Unpack(&userConfig); err != nil {
return Settings{}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of inconsistent error return styles here. Sometimes we wrap an error, sometimes we don't. Unless something else is requesting a specific error type, we might want to just use errors.wrap or fmt.Errorf for everything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for fmt.Errorf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heuristic I aim for is to ask whether the error being returned is understandable in the context of the caller. For example, errors in the deleter loop aren't wrapped, because they are reported (and grouped / wrapped) when they get back to the core loop, and it looks silly to have a message that looks like "couldn't delete segment file: [couldn't delete segment file: [actual error]]". So for low-level helpers I often leave it unwrapped, knowing that the caller is responsible for reporting it comprehensibly.

That said, config.go doesn't seem to follow that convention, and could use a little more verbosity in the messages, so I fixed the calls here :-)

@fearful-symmetry
Copy link
Contributor

I think I understand most of this. Good work!

@faec
Copy link
Contributor Author

faec commented Sep 24, 2020

Updates:

  • This is nearly ready so it's no longer a draft PR :-)
  • I added unit tests! Only a couple, to core_loop_test.go. The intention is for there to be many more like these, covering the other possible queue states, but I wanted the initial checkin to at least have a proof of concept showing how the state transitions / representation invariants can be unit tested.

@faec faec changed the title [draft] Disk queue implementation [libbeat] Disk queue implementation Sep 24, 2020
dq.handleProducerWriteRequest(request)

// The request inserts 100 bytes into an empty queue, so it should succeed.
// We expect:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: describing the expectations in the comment do not provide value as the messages passed to t.Error functions tell the same story.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's technically redundant, but the verbal description is shorter and easier to parse. I also like having a description of what I'm about to check, because the two listed invariants are logically equivalent to the 5 conditions in the code, but a reader seeing the test for the first time only knows what the code actually tests, and not what the author thought they were testing (which isn't always the same). This way if a test fails it's easier to recognize whether the problem is in the package or the test itself (i.e. whether the test is checking the invariants incorrectly, rather than the invariants actually failing).

(The secret long-term plan is to have verbal descriptions like this for every logically distinct state change, and to coalesce them into package documentation so it's easier to see how the pieces fit together.)

@faec faec merged commit 2b8fd7c into elastic:master Sep 28, 2020
@faec faec added the v7.10.0 label Sep 28, 2020
faec added a commit to faec/beats that referenced this pull request Sep 28, 2020
Initial implementation of the new libbeat disk queue

(cherry picked from commit 2b8fd7c)
v1v added a commit to v1v/beats that referenced this pull request Sep 29, 2020
* upstream/master:
  feat: prepare release pipelines (elastic#21238)
  Add IP validation to Security module (elastic#21325)
  Fixes for new 7.10 rsa2elk datasets (elastic#21240)
  o365input: Restart after fatal error (elastic#21258)
  Fix panic in cgroups monitoring (elastic#21355)
  Handle multiple upstreams in ingress-controller (elastic#21215)
  [CI] Fix runbld when workspace does not exist (elastic#21350)
  [Filebeat] Fix checkpoint (elastic#21344)
  [CI] Archive build reasons (elastic#21347)
  Add dashboard for pubsub metricset in googlecloud module (elastic#21326)
  [Elastic Agent] Allow embedding of certificate (elastic#21179)
  Adds a default for failure_cache.min_ttl (elastic#21085)
  [libbeat] Disk queue implementation (elastic#21176)
faec added a commit that referenced this pull request Sep 29, 2020
Initial implementation of the new libbeat disk queue

(cherry picked from commit 2b8fd7c)
@faec faec deleted the disk-queue-0 branch September 29, 2020 14:43
@faec faec added the test-plan Add this PR to be manual test plan label Oct 5, 2020
@andresrc andresrc added the test-plan-added This PR has been added to the test plan label Oct 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement libbeat Team:Integrations Label for the Integrations team test-plan Add this PR to be manual test plan test-plan-added This PR has been added to the test plan v7.10.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants