diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b91c0deea9f..7aecfe09bba 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -42,9 +42,6 @@ type broker struct { logger logger bufSize int - // buf brokerBuffer - // minEvents int - // idleTimeout time.Duration // api channels events chan pushRequest diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go index 30042275117..a995fbdc0ca 100644 --- a/libbeat/publisher/queue/memqueue/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -58,8 +58,6 @@ func newConsumer(b *broker) *consumer { } func (c *consumer) Get(sz int) (queue.Batch, error) { - // log := c.broker.logger - if c.closed.Load() { return nil, io.EOF } diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 2838e1ce574..f6898d10012 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -19,7 +19,6 @@ package memqueue import ( "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -30,16 +29,15 @@ type forgetfulProducer struct { } type ackProducer struct { - broker *broker - cancel bool - seq uint32 - state produceState - openState openState + broker *broker + dropOnCancel bool + seq uint32 + state produceState + openState openState } type openState struct { log logger - isOpen atomic.Bool done chan struct{} events chan pushRequest } @@ -56,13 +54,12 @@ type ackHandler func(count int) func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer { openState := openState{ log: b.logger, - isOpen: atomic.MakeBool(true), done: make(chan struct{}), events: b.events, } if cb != nil { - p := &ackProducer{broker: b, seq: 1, cancel: dropOnCancel, openState: openState} + p := &ackProducer{broker: b, seq: 1, dropOnCancel: dropOnCancel, openState: openState} p.state.cb = cb p.state.dropCB = dropCB return p @@ -114,7 +111,7 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest { func (p *ackProducer) Cancel() int { p.openState.Close() - if p.cancel { + if p.dropOnCancel { ch := make(chan producerCancelResponse) p.broker.pubCancel <- producerCancelRequest{ state: &p.state, @@ -129,7 +126,6 @@ func (p *ackProducer) Cancel() int { } func (st *openState) Close() { - st.isOpen.Store(false) close(st.done) } diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index a126e9ef381..a99c81e3f9b 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -80,23 +80,37 @@ type ProducerConfig struct { DropOnCancel bool } -// Producer interface to be used by the pipelines client to forward events to be -// published to the queue. -// When a producer calls `Cancel`, it's up to the queue to send or remove -// events not yet ACKed. -// Note: A queue is still allowed to send the ACK signal after Cancel. The -// pipeline client must filter out ACKs after cancel. +// Producer is an interface to be used by the pipelines client to forward +// events to a queue. type Producer interface { + // Publish adds an event to the queue, blocking if necessary, and returns + // true on success. Publish(event publisher.Event) bool + + // TryPublish adds an event to the queue if doing so will not block the + // caller, otherwise it immediately returns. The reasons a publish attempt + // might block are defined by the specific queue implementation and its + // configuration. Returns true if the event was successfully added, false + // otherwise. TryPublish(event publisher.Event) bool + + // Cancel closes this Producer endpoint. If the producer is configured to + // drop its events on Cancel, the number of dropped events is returned. + // Note: A queue may still send ACK signals even after Cancel is called on + // the originating Producer. The pipeline client must accept and + // discard these ACKs. Cancel() int } -// Consumer interface to be used by the pipeline output workers. -// The `Get` method retrieves a batch of events up to size `sz`. If sz <= 0, -// the batch size is up to the queue. +// Consumer is an interface to be used by the pipeline output workers, +// used to read events from the head of the queue. type Consumer interface { - Get(sz int) (Batch, error) + // Get retrieves a batch of up to eventCount events. If eventCount <= 0, + // there is no bound on the number of returned events. + Get(eventCount int) (Batch, error) + + // Close closes this Consumer. Returns an error if the Consumer is + // already closed. Close() error } diff --git a/libbeat/publisher/queue/spool/produce.go b/libbeat/publisher/queue/spool/produce.go index 81e12a18d02..6a74d93b1c6 100644 --- a/libbeat/publisher/queue/spool/produce.go +++ b/libbeat/publisher/queue/spool/produce.go @@ -21,7 +21,6 @@ import ( "sync" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -35,17 +34,16 @@ type forgetfulProducer struct { // ackProducer forwards events to the inBroker. The ackBroker provides // functionality for ACK/Drop callbacks. type ackProducer struct { - cancel bool - seq uint32 - state produceState - openState openState - pubCancel chan producerCancelRequest + dropOnCancel bool + seq uint32 + state produceState + openState openState + pubCancel chan producerCancelRequest } // openState tracks the producer->inBroker connection state. type openState struct { ctx *spoolCtx - isOpen atomic.Bool done chan struct{} events chan pushRequest } @@ -83,7 +81,6 @@ func newProducer( ) queue.Producer { openState := openState{ ctx: ctx, - isOpen: atomic.MakeBool(true), done: make(chan struct{}), events: events, } @@ -93,10 +90,10 @@ func newProducer( } p := &ackProducer{ - seq: 1, - cancel: dropOnCancel, - openState: openState, - pubCancel: pubCancel, + seq: 1, + dropOnCancel: dropOnCancel, + openState: openState, + pubCancel: pubCancel, } p.state.ackCB = ackCB p.state.dropCB = dropCB @@ -131,7 +128,7 @@ func (p *ackProducer) TryPublish(event publisher.Event) bool { func (p *ackProducer) Cancel() int { p.openState.Close() - if p.cancel { + if p.dropOnCancel { ch := make(chan producerCancelResponse) p.pubCancel <- producerCancelRequest{ state: &p.state, @@ -157,7 +154,6 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest { } func (st *openState) Close() { - st.isOpen.Store(false) close(st.done) }