Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative implementation for closing a beat.Client #13031

Merged
merged 4 commits into from
Jul 26, 2019

Conversation

urso
Copy link

@urso urso commented Jul 23, 2019

With the help of this change one can close a beat.Client instance
indirectly, by signaling instead of calling Close(). One still should
use Close() on shutdown, so to make sure that runner using the
client for publishing blocks and keeps resources intact if
WaitCloseTimeout has been configured.

The interface CloseRef implements a subset of context.Context, which can
be used to control shutdown. For example filebeat inputs normally run in
a loop reading messages, transforming those into events, and publishing
them to libbeat via a beat.Client instance. If the input accepts
context.Context for cancellation, then the run loop follows this code
pattern:

    func newInput(...) (*input, error) {
        // configure input

        // create context which is close/cancelled in `Close` method
        ctx, cancelFn := ...

        return &input{
            ctx: ctx,
            cancel: cancelFn,
            ...
        }, nil
    }

    func (in *input) Start() {
        in.wg.Add(1)
        go func(){
	        defer in.wg.Done()
	        in.Run()
	    }()
    }

    func (in *input) Run() {
	reader := ... // init reader for collection the raw data
	defer reader.Close()
        outlet := connector.ConnectWith(beat.ClientConfig{
            // underlying beat.Client will be closed if ctx is cancelled
            CloseRef: in.ctx,

            Processing: ...

            // give pipeline and ACKer a chance to ACK some inflight events during shutdown
            WaitClose: ...
            ACKEvents: func(private []interface{}) {
	    	    for _, p := range private {
			        reader.ACK(p)
		        }
	        },
        })

        // this blocks until all events have been acked or for a duration of WaitClose
        defer outlet.Close()

        for err := ctx.Err(); err == nil; err = ctx.Err() {
            // Read returns error if ctx is cancelled
            message, err := source.Read(in.ctx, ...)
            if err != nil {
               ...
            }

            // OnEvent blocks if queue is full, but unblocks if ctx is
	        // cancelled.
            outlet.OnEvent(makeEvent(message))
        }
    }

    func (in *input) Close() {
	    // cancel context
	    // -> reader or outleter unblock
	    // -> run loop returns
        in.cancel()
	    in.wg.Wait() // wait until run loop returned
    }

active: atomic.MakeBool(true),
acker: acker,
signalAll: make(chan struct{}, 1),
signalDone: make(chan struct{}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am gonna start using signal prefix in my code, I see how you are using it and I think it made code clearer.

libbeat/publisher/pipeline/pipeline.go Outdated Show resolved Hide resolved
}

func (p *Pipeline) runSignalPropagation() {
var channels []reflect.SelectCase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL, never use that before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to become more experienced with the reflect package, I don't use it that much.


clients[i], clients[last] = clients[last], nil
channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{}
channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso oh seriously cool reordering here technique and optimization on the slice usage, I had to write it down to verify it but it all make sense. :)

@ph ph added the libbeat label Jul 25, 2019
With the help of this change one can close a beat.Client instance
indirectly, by signaling instead of calling Close(). One still should
use `Close()` on shutdown, so to make sure that runner using the
client for publishing blocks and keeps resources intact if
WaitCloseTimeout has been configured.

The interface CloseRef implements a subset of context.Context, which can
be used to control shutdown. For example filebeat inputs normally run in
a loop reading messages, transforming those into events, and publishing
them to libbeat via a beat.Client instance. If the input accepts
context.Context for cancellation, then the run loop follows this code
pattern:

    func newInput(...) (*input, error) {
        // configure input

        // create context which is close/cancelled in `Close` method
	ctx, cancelFn := ...

	return &input{
	    ctx: ctx,
	    cancel: cancelFn,
	    ...
	}, nil
    }

    func (in *input) Start() {
        in.wg.Add(1)
        go func(){
	    defer in.wg.Done()
	    in.Run()
	}()
    }

    func (in *input) Run() {
	reader := ... // init reader for collection the raw data
	defer reader.Close()

        outlet := connector.ConnectWith(beat.ClientConfig{
            // underlying beat.Client will be closed if ctx is cancelled
            CloseRef: in.ctx,

            Processing: ...

            // give pipeline and ACKer a chance to ACK some inflight events during shutdown
            WaitClose: ...
            ACKEvents: func(private []interface{}) {
	    	for _, p := range private {
			reader.ACK(p)
		}
	    },
        })

	// this blocks until all events have been acked or for a duration of WaitClose
	defer outlet.Close()

        for err := ctx.Err(); err == nil; err = ctx.Err() {
            // Read returns error if ctx is cancelled
            message, err := source.Read(in.ctx, ...)
            if err != nil {
               ...
            }

            // OnEvent blocks if queue is full, but unblocks if ctx is
	    // cancelled.
            outlet.OnEvent(makeEvent(message))
        }
    }

    func (in *input) Close() {
	// cancel context
	// -> reader or outleter unblock
	// -> run loop returns
        in.cancel()
	in.wg.Wait() // wait until run loop returned
    }
@urso urso force-pushed the pipeline-with-context branch from 03370d3 to 794f8f5 Compare July 25, 2019 23:07
@urso urso added the review label Jul 26, 2019
@urso urso marked this pull request as ready for review July 26, 2019 00:58
@urso urso requested a review from a team as a code owner July 26, 2019 00:58
Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, +1 for the changes and also the ncie suite of test in the PR.

@ph
Copy link
Contributor

ph commented Jul 26, 2019

CI issues should be fixed in #13077

@urso urso merged commit 86db61c into elastic:master Jul 26, 2019
@urso urso deleted the pipeline-with-context branch July 26, 2019 13:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants