-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor: Move queue management logic into outputController, clarify language around pipeline notifications #35078
Conversation
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
The failing test is flaky/unrelated and was supposed to be disabled, I have a PR out to correct it #35083 |
/test |
2 similar comments
/test |
/test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…language around pipeline notifications (#35078)
This is an internal Beats refactor and should cause no change in behavior. It is a preparation for #34396, but the functional changes are split out into a followup PR to simplify review / testing. The core change needed to support the proxy queue and the shipper is to defer queue creation until after the output configuration is received, which under Agent typically happens only after the pipeline is already created, whereas initialization logic has previously assumed that the queue could be created and used independently before knowing the output configuration.
To prepare for this change, this PR moves the queue management logic out of
Pipeline
and intooutputController
. It also renames several related fields and structures to clarify their purpose in the pipeline. The changes here are:queue
field fromPipeline
tooutputController
. Instead of creating a queue factory inpipeline.LoadWithSettings
and instantiating it to create the queue inpipeline.New
, these functions now create aqueueConfig
structure that is passed tonewOutputController
, and the output controller itself creates the queue directly. (Currently this is still done on initialization, to avoid behavior changes in this PR.) When new clients connect to the pipeline, it now fetches the queue producer fromoutputController
instead of creating one directly (this is the hook that will allow clients to block until the queue is ready in the followup).queue.ACKListener
interface and replace it with a simple callback. This interface had only one function, and making it an interface created awkward reference dependencies between the queue and thePipeline
object. This change made the API slightly simpler and allowed acknowledgment bookkeeping to be handled via a simple callback field in thequeueConfig
structure.Eventer
->ClientListener
. This interface is used to track client handling of events as well as client shutdown stages.ACKer
->EventListener
. This interface is used to track events entering and leaving the pipeline.(ACKer).Close()
is now(EventListener).ClientClosed()
, sinceClose
gave the confusing impression of performing a close operation, when in fact it was used to notify listeners that the pipeline client was closing.Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added tests that prove my fix is effective or that my feature worksCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
Since this involved no external behavior change, it is already exercised by the existing tests (a few needed to be updated to handle the new data structures). However, the most important things to test are that beats continue to start up successfully both standalone and under agent. These tests can use simple file or stdout-based output, and any events reaching the output are a successful test -- since the queue is created only once, and data cannot flow without it, any problems with this PR should manifest immediately on startup.