Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Document / clean up parts of the publisher queue interface #16858

Merged
merged 5 commits into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type broker struct {
logger logger

bufSize int
// buf brokerBuffer
// minEvents int
// idleTimeout time.Duration

// api channels
events chan pushRequest
Expand Down
2 changes: 0 additions & 2 deletions libbeat/publisher/queue/memqueue/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ func newConsumer(b *broker) *consumer {
}

func (c *consumer) Get(sz int) (queue.Batch, error) {
// log := c.broker.logger

if c.closed.Load() {
return nil, io.EOF
}
Expand Down
18 changes: 7 additions & 11 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package memqueue

import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -30,16 +29,15 @@ type forgetfulProducer struct {
}

type ackProducer struct {
broker *broker
cancel bool
seq uint32
state produceState
openState openState
broker *broker
dropOnCancel bool
seq uint32
state produceState
openState openState
}

type openState struct {
log logger
isOpen atomic.Bool
done chan struct{}
events chan pushRequest
}
Expand All @@ -56,13 +54,12 @@ type ackHandler func(count int)
func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
openState := openState{
log: b.logger,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
events: b.events,
}

if cb != nil {
p := &ackProducer{broker: b, seq: 1, cancel: dropOnCancel, openState: openState}
p := &ackProducer{broker: b, seq: 1, dropOnCancel: dropOnCancel, openState: openState}
p.state.cb = cb
p.state.dropCB = dropCB
return p
Expand Down Expand Up @@ -114,7 +111,7 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest {
func (p *ackProducer) Cancel() int {
p.openState.Close()

if p.cancel {
if p.dropOnCancel {
ch := make(chan producerCancelResponse)
p.broker.pubCancel <- producerCancelRequest{
state: &p.state,
Expand All @@ -129,7 +126,6 @@ func (p *ackProducer) Cancel() int {
}

func (st *openState) Close() {
st.isOpen.Store(false)
close(st.done)
}

Expand Down
34 changes: 24 additions & 10 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,37 @@ type ProducerConfig struct {
DropOnCancel bool
}

// Producer interface to be used by the pipelines client to forward events to be
// published to the queue.
// When a producer calls `Cancel`, it's up to the queue to send or remove
// events not yet ACKed.
// Note: A queue is still allowed to send the ACK signal after Cancel. The
// pipeline client must filter out ACKs after cancel.
// Producer is an interface to be used by the pipelines client to forward
// events to a queue.
type Producer interface {
// Publish adds an event to the queue, blocking if necessary, and returns
// true on success.
Publish(event publisher.Event) bool

// TryPublish adds an event to the queue if doing so will not block the
// caller, otherwise it immediately returns. The reasons a publish attempt
// might block are defined by the specific queue implementation and its
// configuration. Returns true if the event was successfully added, false
// otherwise.
TryPublish(event publisher.Event) bool

// Cancel closes this Producer endpoint. If the producer is configured to
// drop its events on Cancel, the number of dropped events is returned.
// Note: A queue may still send ACK signals even after Cancel is called on
// the originating Producer. The pipeline client must accept and
// discard these ACKs.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I recall correctly the client based cancellers are closed as well. communication is async, which means that 'during' cancel we still get ACKs, but once close has finished no ACKs might be received anymore. The pipeline also has a 'global' ACK queue, which will still receive the ACKs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify? I'm not sure what "client based cancellers" are in this context. This comment was based on the code in {memqueue, spool}/produce.go:Close() which only closes the done channel of the producer's openState. Unless you're talking about the producerCancel{Request, Response} handling when dropOnCancel is set?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Yeah, you are right here. The behavior about ACKs is in the beat.Client interface, not the Producer interface.

Cancel() int
}

// Consumer interface to be used by the pipeline output workers.
// The `Get` method retrieves a batch of events up to size `sz`. If sz <= 0,
// the batch size is up to the queue.
// Consumer is an interface to be used by the pipeline output workers,
// used to read events from the head of the queue.
type Consumer interface {
Get(sz int) (Batch, error)
// Get retrieves a batch of up to eventCount events. If eventCount <= 0,
// there is no bound on the number of returned events.
Get(eventCount int) (Batch, error)

// Close closes this Consumer. Returns an error if the Consumer is
// already closed.
Close() error
}

Expand Down
24 changes: 10 additions & 14 deletions libbeat/publisher/queue/spool/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -35,17 +34,16 @@ type forgetfulProducer struct {
// ackProducer forwards events to the inBroker. The ackBroker provides
// functionality for ACK/Drop callbacks.
type ackProducer struct {
cancel bool
seq uint32
state produceState
openState openState
pubCancel chan producerCancelRequest
dropOnCancel bool
seq uint32
state produceState
openState openState
pubCancel chan producerCancelRequest
}

// openState tracks the producer->inBroker connection state.
type openState struct {
ctx *spoolCtx
isOpen atomic.Bool
done chan struct{}
events chan pushRequest
}
Expand Down Expand Up @@ -83,7 +81,6 @@ func newProducer(
) queue.Producer {
openState := openState{
ctx: ctx,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
events: events,
}
Expand All @@ -93,10 +90,10 @@ func newProducer(
}

p := &ackProducer{
seq: 1,
cancel: dropOnCancel,
openState: openState,
pubCancel: pubCancel,
seq: 1,
dropOnCancel: dropOnCancel,
openState: openState,
pubCancel: pubCancel,
}
p.state.ackCB = ackCB
p.state.dropCB = dropCB
Expand Down Expand Up @@ -131,7 +128,7 @@ func (p *ackProducer) TryPublish(event publisher.Event) bool {
func (p *ackProducer) Cancel() int {
p.openState.Close()

if p.cancel {
if p.dropOnCancel {
ch := make(chan producerCancelResponse)
p.pubCancel <- producerCancelRequest{
state: &p.state,
Expand All @@ -157,7 +154,6 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest {
}

func (st *openState) Close() {
st.isOpen.Store(false)
close(st.done)
}

Expand Down