diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index aeba9c202cf..42423bd9116 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -46,3 +46,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543] - Introduce beat.OutputChooses publisher mode. {pull}12996[12996] - Ensure that beat.Processor, beat.ProcessorList, and processors.ProcessorList are compatible and can be composed more easily. {pull}12996[12996] +- Add support to close beat.Client via beat.CloseRef (a subset of context.Context). {pull}13031[13031] diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index d49a10f085b..5e1fe15e48c 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -48,6 +48,8 @@ type ClientConfig struct { Processing ProcessingConfig + CloseRef CloseRef + // WaitClose sets the maximum duration to wait on ACK, if client still has events // active non-acknowledged events in the publisher pipeline. // WaitClose is only effective if one of ACKCount, ACKEvents and ACKLastEvents @@ -78,6 +80,13 @@ type ClientConfig struct { ACKLastEvent func(interface{}) } +// CloseRef allows users to close the client asynchronously. +// A CloseReg implements a subset of function required for context.Context. +type CloseRef interface { + Done() <-chan struct{} + Err() error +} + // ProcessingConfig provides additional event processing settings a client can // pass to the publisher pipeline on Connect. type ProcessingConfig struct { diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 70af536ded9..3436f7a2528 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -30,6 +30,7 @@ import ( // All pipeline and client ACK handling support is provided by acker instances. type acker interface { close() + wait() addEvent(event beat.Event, published bool) bool ackEvents(int) } @@ -40,6 +41,7 @@ type emptyACK struct{} var nilACKer acker = (*emptyACK)(nil) func (*emptyACK) close() {} +func (*emptyACK) wait() {} func (*emptyACK) addEvent(_ beat.Event, _ bool) bool { return true } func (*emptyACK) ackEvents(_ int) {} @@ -68,6 +70,7 @@ func newCountACK(pipeline *Pipeline, fn func(total, acked int)) *countACK { } func (a *countACK) close() {} +func (a *countACK) wait() {} func (a *countACK) addEvent(_ beat.Event, _ bool) bool { return true } func (a *countACK) ackEvents(n int) { if a.pipeline.ackActive.Load() { @@ -220,6 +223,8 @@ func (a *gapCountACK) close() { close(a.done) } +func (a *gapCountACK) wait() {} + func (a *gapCountACK) addEvent(_ beat.Event, published bool) bool { // if gapList is empty and event is being dropped, forward drop event to ack // loop worker: @@ -313,9 +318,8 @@ func newBoundGapCountACK( return a } -func (a *boundGapCountACK) close() { - a.acker.close() -} +func (a *boundGapCountACK) close() { a.acker.close() } +func (a *boundGapCountACK) wait() { a.acker.wait() } func (a *boundGapCountACK) addEvent(event beat.Event, published bool) bool { a.sema.inc() @@ -361,9 +365,9 @@ func makeCountACK(pipeline *Pipeline, canDrop bool, sema *sema, fn func(int, int return newCountACK(pipeline, fn) } -func (a *eventDataACK) close() { - a.acker.close() -} +func (a *eventDataACK) close() { a.acker.close() } + +func (a *eventDataACK) wait() { a.acker.wait() } func (a *eventDataACK) addEvent(event beat.Event, published bool) bool { a.mutex.Lock() @@ -400,37 +404,57 @@ func (a *eventDataACK) onACK(total, acked int) { type waitACK struct { acker acker - signal chan struct{} - waitClose time.Duration + signalAll chan struct{} // ack loop notifies `close` that all events have been acked + signalDone chan struct{} // shutdown handler telling `wait` that shutdown has been completed + waitClose time.Duration active atomic.Bool // number of active events events atomic.Uint64 + + afterClose func() } -func newWaitACK(acker acker, timeout time.Duration) *waitACK { +func newWaitACK(acker acker, timeout time.Duration, afterClose func()) *waitACK { return &waitACK{ - acker: acker, - signal: make(chan struct{}, 1), - waitClose: timeout, - active: atomic.MakeBool(true), + acker: acker, + signalAll: make(chan struct{}, 1), + signalDone: make(chan struct{}), + waitClose: timeout, + active: atomic.MakeBool(true), + afterClose: afterClose, } } func (a *waitACK) close() { - // TODO: wait for events - a.active.Store(false) - if a.events.Load() > 0 { + + if a.events.Load() == 0 { + a.finishClose() + return + } + + // start routine to propagate shutdown signals or timeouts to anyone + // being blocked in wait. + go func() { + defer a.finishClose() + select { - case <-a.signal: + case <-a.signalAll: case <-time.After(a.waitClose): } - } + }() +} - // close the underlying acker upon exit +func (a *waitACK) finishClose() { a.acker.close() + a.afterClose() + close(a.signalDone) +} + +func (a *waitACK) wait() { + <-a.signalDone } func (a *waitACK) addEvent(event beat.Event, published bool) bool { @@ -454,6 +478,22 @@ func (a *waitACK) releaseEvents(n int) { // send done signal, if close is waiting if !a.active.Load() { - a.signal <- struct{}{} + a.signalAll <- struct{}{} } } + +// closeACKer simply wraps any other acker. It calls a custom function after +// the underlying acker has been closed. +type closeACKer struct { + acker + afterClose func() +} + +func newCloseACKer(a acker, fn func()) acker { + return &closeACKer{acker: a, afterClose: fn} +} + +func (a closeACKer) close() { + a.acker.close() + a.afterClose() +} diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index 44c49849f96..192bf4f8619 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -43,7 +43,11 @@ type client struct { canDrop bool reportEvents bool - isOpen atomic.Bool + // Open state, signaling, and sync primitives for coordinating client Close. + isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. + closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once + closeRef beat.CloseRef // extern closeRef for sending a signal that the client should be closed. + done chan struct{} // the done channel will be closed if the closeReg gets closed, or Close is run. eventer beat.ClientEventer } @@ -135,23 +139,40 @@ func (c *client) publish(e beat.Event) { } func (c *client) Close() error { - // first stop ack handling. ACK handler might block (with timeout), waiting + log := c.logger() + + // first stop ack handling. ACK handler might block on wait (with timeout), waiting // for pending events to be ACKed. + c.doClose() + log.Debug("client: wait for acker to finish") + c.acker.wait() + log.Debug("client: acker shut down") + return nil +} - log := c.logger() +func (c *client) doClose() { + c.closeOnce.Do(func() { + close(c.done) - if !c.isOpen.Swap(false) { - return nil // closed or already closing - } + log := c.logger() + + c.isOpen.Store(false) + c.onClosing() - c.onClosing() + log.Debug("client: closing acker") + c.acker.close() // this must trigger a direct/indirect call to 'unlink' + }) +} - log.Debug("client: closing acker") - c.acker.close() +// unlink is the final step of closing a client. It must be executed only after +// it is guaranteed that the underlying acker has been closed and will not +// accept any new publish or ACK events. +// This method is normally registered with the ACKer and triggered by it. +func (c *client) unlink() { + log := c.logger() log.Debug("client: done closing acker") - // finally disconnect client from broker - n := c.producer.Cancel() + n := c.producer.Cancel() // close connection to queue log.Debugf("client: cancelled %v events", n) if c.reportEvents { @@ -162,7 +183,6 @@ func (c *client) Close() error { } c.onClosed() - return nil } func (c *client) logger() *logp.Logger { diff --git a/libbeat/publisher/pipeline/client_ack.go b/libbeat/publisher/pipeline/client_ack.go index 123a1c6ffaa..5ace3de7fce 100644 --- a/libbeat/publisher/pipeline/client_ack.go +++ b/libbeat/publisher/pipeline/client_ack.go @@ -33,6 +33,7 @@ func (p *Pipeline) makeACKer( canDrop bool, cfg *beat.ClientConfig, waitClose time.Duration, + afterClose func(), ) acker { var ( bld = p.ackBuilder @@ -50,15 +51,16 @@ func (p *Pipeline) makeACKer( acker = bld.createEventACKer(canDrop, sema, cb) default: if waitClose <= 0 { - return bld.createPipelineACKer(canDrop, sema) + acker = bld.createPipelineACKer(canDrop, sema) + } else { + acker = bld.createCountACKer(canDrop, sema, func(_ int) {}) } - acker = bld.createCountACKer(canDrop, sema, func(_ int) {}) } if waitClose <= 0 { - return acker + return newCloseACKer(acker, afterClose) } - return newWaitACK(acker, waitClose) + return newWaitACK(acker, waitClose, afterClose) } func lastEventACK(fn func(interface{})) func([]interface{}) { diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go new file mode 100644 index 00000000000..c7b83835c9c --- /dev/null +++ b/libbeat/publisher/pipeline/client_test.go @@ -0,0 +1,115 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "context" + "sync" + "testing" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/publisher/queue" + "github.com/elastic/beats/libbeat/tests/resources" +) + +func TestClient(t *testing.T) { + makePipeline := func(settings Settings, qu queue.Queue) *Pipeline { + p, err := New(beat.Info{}, + Monitors{}, + func(_ queue.Eventer) (queue.Queue, error) { + return qu, nil + }, + outputs.Group{}, + settings, + ) + if err != nil { + panic(err) + } + + return p + } + + t.Run("client close", func(t *testing.T) { + // Note: no asserts. If closing fails we have a deadlock, because Publish + // would block forever + + cases := map[string]struct { + context bool + close func(client beat.Client, cancel func()) + }{ + "close unblocks client without context": { + context: false, + close: func(client beat.Client, _ func()) { + client.Close() + }, + }, + "close unblocks client with context": { + context: true, + close: func(client beat.Client, _ func()) { + client.Close() + }, + }, + "context cancel unblocks client": { + context: true, + close: func(client beat.Client, cancel func()) { + cancel() + }, + }, + } + + if testing.Verbose() { + logp.TestingSetup() + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + routinesChecker := resources.NewGoroutinesChecker() + defer routinesChecker.Check(t) + + pipeline := makePipeline(Settings{}, makeBlockingQueue()) + defer pipeline.Close() + + var ctx context.Context + var cancel func() + if test.context { + ctx, cancel = context.WithCancel(context.Background()) + } + + client, err := pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: ctx, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + client.Publish(beat.Event{}) + }() + + test.close(client, cancel) + wg.Wait() + }) + } + }) +} diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index cb1aec86b67..cba643c94ee 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -170,10 +170,12 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { consumer = nil continue } + if queueBatch != nil { + batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) + } - batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) paused = c.paused() - if paused { + if paused || batch == nil { out = nil } } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 93d7e6c59f4..688b7315c74 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,6 +22,7 @@ package pipeline import ( "errors" + "reflect" "sync" "time" @@ -76,6 +77,10 @@ type Pipeline struct { ackBuilder ackBuilder eventSema *sema + // closeRef signal propagation support + guardStartSigPropagation sync.Once + sigNewClient chan *client + processors processing.Supporter } @@ -273,6 +278,10 @@ func (p *Pipeline) Close() error { } p.observer.cleanup() + if p.sigNewClient != nil { + close(p.sigNewClient) + } + return nil } @@ -325,7 +334,20 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { if err != nil { return nil, err } - acker := p.makeACKer(processors != nil, &cfg, waitClose) + + client := &client{ + pipeline: p, + closeRef: cfg.CloseRef, + done: make(chan struct{}), + isOpen: atomic.MakeBool(true), + eventer: cfg.Events, + processors: processors, + eventFlags: eventFlags, + canDrop: canDrop, + reportEvents: reportEvents, + } + + acker := p.makeACKer(processors != nil, &cfg, waitClose, client.unlink) producerCfg := queue.ProducerConfig{ // Cancel events from queue if acker is configured // and no pipeline-wide ACK handler is registered. @@ -346,26 +368,99 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { if acker != nil { producerCfg.ACK = acker.ackEvents } else { - acker = nilACKer + acker = newCloseACKer(nilACKer, client.unlink) } - producer := p.queue.Producer(producerCfg) - client := &client{ - pipeline: p, - isOpen: atomic.MakeBool(true), - eventer: cfg.Events, - processors: processors, - producer: producer, - acker: acker, - eventFlags: eventFlags, - canDrop: canDrop, - reportEvents: reportEvents, - } + client.acker = acker + client.producer = p.queue.Producer(producerCfg) p.observer.clientConnected() + + if client.closeRef != nil { + p.registerSignalPropagation(client) + } + return client, nil } +func (p *Pipeline) registerSignalPropagation(c *client) { + p.guardStartSigPropagation.Do(func() { + p.sigNewClient = make(chan *client, 1) + go p.runSignalPropagation() + }) + p.sigNewClient <- c +} + +func (p *Pipeline) runSignalPropagation() { + var channels []reflect.SelectCase + var clients []*client + + channels = append(channels, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(p.sigNewClient), + }) + + for { + chosen, recv, recvOK := reflect.Select(channels) + if chosen == 0 { + if !recvOK { + // sigNewClient was closed + return + } + + // new client -> register client for signal propagation. + client := recv.Interface().(*client) + channels = append(channels, + reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(client.closeRef.Done()), + }, + reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(client.done), + }, + ) + clients = append(clients, client) + continue + } + + // find client we received a signal for. If client.done was closed, then + // we have to remove the client only. But if closeRef did trigger the signal, then + // we have to propagate the async close to the client. + // In either case, the client will be removed + + i := (chosen - 1) / 2 + isSig := (chosen & 1) == 1 + if isSig { + client := clients[i] + client.doClose() + } + + // remove: + last := len(clients) - 1 + ch1 := i*2 + 1 + ch2 := ch1 + 1 + lastCh1 := last*2 + 1 + lastCh2 := lastCh1 + 1 + + clients[i], clients[last] = clients[last], nil + channels[ch1], channels[lastCh1] = channels[lastCh1], reflect.SelectCase{} + channels[ch2], channels[lastCh2] = channels[lastCh2], reflect.SelectCase{} + + clients = clients[:last] + channels = channels[:lastCh1] + if cap(clients) > 10 && len(clients) <= cap(clients)/2 { + clientsTmp := make([]*client, len(clients)) + copy(clientsTmp, clients) + clients = clientsTmp + + channelsTmp := make([]reflect.SelectCase, len(channels)) + copy(channelsTmp, channels) + channels = channelsTmp + } + } +} + func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { if p.processors == nil { return nil, nil diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go new file mode 100644 index 00000000000..c5ff791bf68 --- /dev/null +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -0,0 +1,221 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "sync" + + "github.com/elastic/beats/libbeat/common/atomic" + "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/publisher/queue" +) + +type testQueue struct { + close func() error + bufferConfig func() queue.BufferConfig + producer func(queue.ProducerConfig) queue.Producer + consumer func() queue.Consumer +} + +type testProducer struct { + publish func(try bool, event publisher.Event) bool + cancel func() int +} + +type testConsumer struct { + get func(sz int) (queue.Batch, error) + close func() error +} + +func (q *testQueue) Close() error { + if q.close != nil { + return q.close() + } + return nil +} + +func (q *testQueue) BufferConfig() queue.BufferConfig { + if q.bufferConfig != nil { + return q.bufferConfig() + } + return queue.BufferConfig{} +} + +func (q *testQueue) Producer(cfg queue.ProducerConfig) queue.Producer { + if q.producer != nil { + return q.producer(cfg) + } + return nil +} + +func (q *testQueue) Consumer() queue.Consumer { + if q.consumer != nil { + return q.consumer() + } + return nil +} + +func (p *testProducer) Publish(event publisher.Event) bool { + if p.publish != nil { + return p.publish(false, event) + } + return false +} + +func (p *testProducer) TryPublish(event publisher.Event) bool { + if p.publish != nil { + return p.publish(true, event) + } + return false +} + +func (p *testProducer) Cancel() int { + if p.cancel != nil { + return p.cancel() + } + return 0 +} + +func (p *testConsumer) Get(sz int) (queue.Batch, error) { + if p.get != nil { + return p.get(sz) + } + return nil, nil +} + +func (p *testConsumer) Close() error { + if p.close() != nil { + return p.close() + } + return nil +} + +func makeBlockingQueue() queue.Queue { + return makeTestQueue(emptyConsumer, blockingProducer) +} + +func makeTestQueue( + makeConsumer func() queue.Consumer, + makeProducer func(queue.ProducerConfig) queue.Producer, +) queue.Queue { + var mux sync.Mutex + var wg sync.WaitGroup + consumers := map[*testConsumer]struct{}{} + producers := map[queue.Producer]struct{}{} + + return &testQueue{ + close: func() error { + mux.Lock() + for consumer := range consumers { + consumer.Close() + } + for producer := range producers { + producer.Cancel() + } + mux.Unlock() + + wg.Wait() + return nil + }, + + consumer: func() queue.Consumer { + var consumer *testConsumer + c := makeConsumer() + consumer = &testConsumer{ + get: func(sz int) (queue.Batch, error) { return c.Get(sz) }, + close: func() error { + err := c.Close() + + mux.Lock() + defer mux.Unlock() + delete(consumers, consumer) + wg.Done() + + return err + }, + } + + mux.Lock() + defer mux.Unlock() + consumers[consumer] = struct{}{} + wg.Add(1) + return consumer + }, + + producer: func(cfg queue.ProducerConfig) queue.Producer { + var producer *testProducer + p := makeProducer(cfg) + producer = &testProducer{ + publish: func(try bool, event publisher.Event) bool { + if try { + return p.TryPublish(event) + } + return p.Publish(event) + }, + cancel: func() int { + i := p.Cancel() + + mux.Lock() + defer mux.Unlock() + delete(producers, producer) + wg.Done() + + return i + }, + } + + mux.Lock() + defer mux.Unlock() + producers[producer] = struct{}{} + wg.Add(1) + return producer + }, + } +} + +func emptyConsumer() queue.Consumer { + done := make(chan struct{}) + return &testConsumer{ + get: func(sz int) (queue.Batch, error) { + <-done + return nil, nil + }, + close: func() error { + close(done) + return nil + }, + } +} + +func blockingProducer(_ queue.ProducerConfig) queue.Producer { + sig := make(chan struct{}) + waiting := atomic.MakeInt(0) + + return &testProducer{ + publish: func(_ bool, _ publisher.Event) bool { + waiting.Inc() + <-sig + return false + }, + + cancel: func() int { + close(sig) + return waiting.Load() + }, + } +}