Skip to content

Commit

Permalink
[libbeat] Document / clean up parts of the publisher queue interface (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Mar 6, 2020
1 parent a034a92 commit c756ba8
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 40 deletions.
3 changes: 0 additions & 3 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type broker struct {
logger logger

bufSize int
// buf brokerBuffer
// minEvents int
// idleTimeout time.Duration

// api channels
events chan pushRequest
Expand Down
2 changes: 0 additions & 2 deletions libbeat/publisher/queue/memqueue/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ func newConsumer(b *broker) *consumer {
}

func (c *consumer) Get(sz int) (queue.Batch, error) {
// log := c.broker.logger

if c.closed.Load() {
return nil, io.EOF
}
Expand Down
18 changes: 7 additions & 11 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package memqueue

import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -30,16 +29,15 @@ type forgetfulProducer struct {
}

type ackProducer struct {
broker *broker
cancel bool
seq uint32
state produceState
openState openState
broker *broker
dropOnCancel bool
seq uint32
state produceState
openState openState
}

type openState struct {
log logger
isOpen atomic.Bool
done chan struct{}
events chan pushRequest
}
Expand All @@ -56,13 +54,12 @@ type ackHandler func(count int)
func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
openState := openState{
log: b.logger,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
events: b.events,
}

if cb != nil {
p := &ackProducer{broker: b, seq: 1, cancel: dropOnCancel, openState: openState}
p := &ackProducer{broker: b, seq: 1, dropOnCancel: dropOnCancel, openState: openState}
p.state.cb = cb
p.state.dropCB = dropCB
return p
Expand Down Expand Up @@ -114,7 +111,7 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest {
func (p *ackProducer) Cancel() int {
p.openState.Close()

if p.cancel {
if p.dropOnCancel {
ch := make(chan producerCancelResponse)
p.broker.pubCancel <- producerCancelRequest{
state: &p.state,
Expand All @@ -129,7 +126,6 @@ func (p *ackProducer) Cancel() int {
}

func (st *openState) Close() {
st.isOpen.Store(false)
close(st.done)
}

Expand Down
34 changes: 24 additions & 10 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,37 @@ type ProducerConfig struct {
DropOnCancel bool
}

// Producer interface to be used by the pipelines client to forward events to be
// published to the queue.
// When a producer calls `Cancel`, it's up to the queue to send or remove
// events not yet ACKed.
// Note: A queue is still allowed to send the ACK signal after Cancel. The
// pipeline client must filter out ACKs after cancel.
// Producer is an interface to be used by the pipelines client to forward
// events to a queue.
type Producer interface {
// Publish adds an event to the queue, blocking if necessary, and returns
// true on success.
Publish(event publisher.Event) bool

// TryPublish adds an event to the queue if doing so will not block the
// caller, otherwise it immediately returns. The reasons a publish attempt
// might block are defined by the specific queue implementation and its
// configuration. Returns true if the event was successfully added, false
// otherwise.
TryPublish(event publisher.Event) bool

// Cancel closes this Producer endpoint. If the producer is configured to
// drop its events on Cancel, the number of dropped events is returned.
// Note: A queue may still send ACK signals even after Cancel is called on
// the originating Producer. The pipeline client must accept and
// discard these ACKs.
Cancel() int
}

// Consumer interface to be used by the pipeline output workers.
// The `Get` method retrieves a batch of events up to size `sz`. If sz <= 0,
// the batch size is up to the queue.
// Consumer is an interface to be used by the pipeline output workers,
// used to read events from the head of the queue.
type Consumer interface {
Get(sz int) (Batch, error)
// Get retrieves a batch of up to eventCount events. If eventCount <= 0,
// there is no bound on the number of returned events.
Get(eventCount int) (Batch, error)

// Close closes this Consumer. Returns an error if the Consumer is
// already closed.
Close() error
}

Expand Down
24 changes: 10 additions & 14 deletions libbeat/publisher/queue/spool/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)
Expand All @@ -35,17 +34,16 @@ type forgetfulProducer struct {
// ackProducer forwards events to the inBroker. The ackBroker provides
// functionality for ACK/Drop callbacks.
type ackProducer struct {
cancel bool
seq uint32
state produceState
openState openState
pubCancel chan producerCancelRequest
dropOnCancel bool
seq uint32
state produceState
openState openState
pubCancel chan producerCancelRequest
}

// openState tracks the producer->inBroker connection state.
type openState struct {
ctx *spoolCtx
isOpen atomic.Bool
done chan struct{}
events chan pushRequest
}
Expand Down Expand Up @@ -83,7 +81,6 @@ func newProducer(
) queue.Producer {
openState := openState{
ctx: ctx,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
events: events,
}
Expand All @@ -93,10 +90,10 @@ func newProducer(
}

p := &ackProducer{
seq: 1,
cancel: dropOnCancel,
openState: openState,
pubCancel: pubCancel,
seq: 1,
dropOnCancel: dropOnCancel,
openState: openState,
pubCancel: pubCancel,
}
p.state.ackCB = ackCB
p.state.dropCB = dropCB
Expand Down Expand Up @@ -131,7 +128,7 @@ func (p *ackProducer) TryPublish(event publisher.Event) bool {
func (p *ackProducer) Cancel() int {
p.openState.Close()

if p.cancel {
if p.dropOnCancel {
ch := make(chan producerCancelResponse)
p.pubCancel <- producerCancelRequest{
state: &p.state,
Expand All @@ -157,7 +154,6 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest {
}

func (st *openState) Close() {
st.isOpen.Store(false)
close(st.done)
}

Expand Down

0 comments on commit c756ba8

Please sign in to comment.