Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for queue settings under outputs #36788

Merged
merged 13 commits into from
Oct 19, 2023

Conversation

leehinman
Copy link
Contributor

@leehinman leehinman commented Oct 6, 2023

Proposed commit message

  • add support for queue settings under output. Validation ensure only top level or output level is specified.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

Stand alone Filebeat

Start Filebeat with following config file:

output.elasticsearch:
  queue.mem:
    events: 1024
    flush.min_events: 2
    flush.timeout: 15s

Should show queue.max_events in the metrics to be 1024.

Agent

  1. Build metricbeat and filebeat from this branch.
  2. Build or download an 8.11 snapshot agent.
  3. Copy the built metricbeat and filebeat into the data/elastic-agent-$hash/components directory of the agent.
  4. Install the agent. Enrolled it with Fleet (standalone agent should work too)
  5. Configure the queue as shown above
  6. Check logs that queue.max_events in the metrics is 1024

Related issues

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Oct 6, 2023
@mergify
Copy link
Contributor

mergify bot commented Oct 6, 2023

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @leehinman? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-v8./d.0 is the label to automatically backport to the 8./d branch. /d is the digit

@leehinman leehinman force-pushed the 35615_queue_under_output branch 2 times, most recently from 441c87c to 52b0111 Compare October 6, 2023 23:08
@elasticmachine
Copy link
Collaborator

elasticmachine commented Oct 6, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-10-19T21:54:11.727+0000

  • Duration: 70 min 0 sec

Test stats 🧪

Test Results
Failed 0
Passed 28482
Skipped 2013
Total 30495

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

@leehinman leehinman force-pushed the 35615_queue_under_output branch 4 times, most recently from 4b506ee to ca41e03 Compare October 9, 2023 19:43
@leehinman leehinman marked this pull request as ready for review October 9, 2023 21:43
@leehinman leehinman requested review from a team as code owners October 9, 2023 21:43
@pierrehilbert pierrehilbert added the Team:Elastic-Agent Label for the Agent team label Oct 10, 2023
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Oct 10, 2023
@leehinman
Copy link
Contributor Author

Sorry, I had trouble re-opening #36693, so this a new PR number but is really a continuation.

tests := map[string]struct {
input []byte
memEvents int
expectValidationError bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this field being used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, thanks.

mem:
events: 8096
`),
memEvents: 8096},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could totally be just me, but I found these test cases a bit hard to read. I think it might've helped to have the opening and closing braces of each test case struct on their own lines so the struct fields all lined up nicely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if the new fmt is better. I'm open to any suggestions, I just want to be sure the yaml looks like yaml.

// Fail helper can be used by output factories, to create a failure response when
// loading an output must return an error.
func Fail(err error) (Group, error) { return Group{}, err }

// Success create a valid output Group response for a set of client instances.
func Success(batchSize, retry int, clients ...Client) (Group, error) {
func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document here that cfg is expected to contain queue configuration, since its datatype here config.Namespace doesn't quite make that obvious.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for SuccessNet below.

@@ -135,14 +135,15 @@ func NewQueue(
logger *logp.Logger,
ackCallback func(eventCount int),
settings Settings,
inputQueueSize int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd not to have inputQueueSize be part of settings as before. I agree with removing the special override in the publisher pipeline code but I think that can be done while leaving inputQueueSize as part of settings so this constructor here has still has the same signature as before, with all queue settings being in settings. For that you'll need to set settings.InputQueueSize wherever you call this constructor from (unless, of course, you want to leave that field at it's zero-value, e.g. in test code).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I don't like about that is it is hard to tell if you are supposed to set settings.InputQueueSize before you pass it into FactoryForSettings or allow FactoryForSettings to set it. Also what happens if you pass in a Settings with that field set? Do you override or keep?

I don't like changing the signature, but I'm not fond of the ambiguous nature of who sets InputQueueSize if we leave it in settings. I can be convinced to go either way, do you have a strong opinion?

@ycombinator
Copy link
Contributor

This PR seems to consist of three types of unrelated changes: some refactoring/cleaning, queue setting changes, idle connection timeout setting changes. That's making it a bit hard to review the impact of each change in isolation.

Would it be a lot of work to break this up into three smaller PRs? In general, I think that's a better approach, not just to make reviews easier but also to allow rolling back changes in isolation, if necessary.

@leehinman
Copy link
Contributor Author

This PR seems to consist of three types of unrelated changes: some refactoring/cleaning, queue setting changes, idle connection timeout setting changes. That's making it a bit hard to review the impact of each change in isolation.

Would it be a lot of work to break this up into three smaller PRs? In general, I think that's a better approach, not just to make reviews easier but also to allow rolling back changes in isolation, if necessary.

Good idea, I'll split out the idle connection timeout. Unfortunately all the other changes are necessary for the queue under outputs change. I'll add some more comments as to why.

@@ -1480,3 +1485,42 @@ func sanitizeIPs(ips []string) []string {
}
return validIPs
}

func promoteOutputQueueSettings(bc *beatConfig) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promoteOutputQueueSettings handles the use case where the beat is started with a configuration file that has outputs defined (normal stand alone beat).

@leehinman
Copy link
Contributor Author

#36843 now contains the idle_connection_timeout changes.

@@ -41,20 +41,20 @@ func makeES(
) (outputs.Group, error) {
log := logp.NewLogger(logSelector)
if !cfg.HasField("bulk_max_size") {
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
_ = cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have a merge conflict here once #36843 is merged.

- add support for `idle_connection_timeout` for ES output
- add support for queue settings under output

Closes elastic#35615
@leehinman leehinman force-pushed the 35615_queue_under_output branch from 505fa7c to 4dc26ba Compare October 19, 2023 18:39
Copy link
Contributor

@ycombinator ycombinator left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@leehinman leehinman enabled auto-merge (squash) October 19, 2023 21:54
@leehinman leehinman merged commit 0bd2d73 into elastic:main Oct 19, 2023
8 checks passed
@leehinman leehinman deleted the 35615_queue_under_output branch October 30, 2023 14:08
@bschaeffer
Copy link

bschaeffer commented Jan 25, 2024

This is a confusing change for folks building custom output plugins. Where do I put the configuration for queues now?

# global
queue.mem:
  events: 1024

# output local
output.custom:
  queue.mem:
    events: 1024

If I can leave them as a global config... how do I access the config using the output factory function:

type Factory func(
im IndexManager,
beat beat.Info,
stats Observer,
cfg *config.C) (Group, error)

@bschaeffer
Copy link

Looks like only one is allowed:

error unpacking config data: top level queue and output level queue settings defined, only one is allowed accessing config

Scholar-Li pushed a commit to Scholar-Li/beats that referenced this pull request Feb 5, 2024
* add support for queue settings under outputs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Team:Elastic-Agent Label for the Agent team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow queue configuration to be specified under the output type
5 participants