Skip to content

Commit

Permalink
Moving counter support to filebeat
Browse files Browse the repository at this point in the history
The way the counter support is setup is very specific for filebeat
and how Filebeat hooks up a global ACKer, and the Registrar for storing
file.State. Moving it to filebeat to not encourage someone to use a it.
  • Loading branch information
urso committed Apr 29, 2020
1 parent ed21c4d commit f09d457
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 96 deletions.
91 changes: 91 additions & 0 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (

"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)

type registrarLogger struct {
Expand All @@ -41,6 +43,23 @@ type eventCounter struct {
wg sync.WaitGroup
}

// countingClient adds and substracts from a counter when events have been
// published, dropped or ACKed. The countingClient can be used to keep track of
// inflight events for a beat.Client instance. The counter is updated after the
// client has been disconnected from the publisher pipeline via 'Closed'.
type countingClient struct {
counter *eventCounter
client beat.Client
}

type countingEventer struct {
wgEvents *eventCounter
}

type combinedEventer struct {
a, b beat.ClientEventer
}

func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
return &registrarLogger{
done: make(chan struct{}),
Expand Down Expand Up @@ -87,3 +106,75 @@ func (c *eventCounter) Done() {
func (c *eventCounter) Wait() {
c.wg.Wait()
}

// withPipelineEventCounter adds a counter to the pipeline that keeps track of
// all events published, dropped and ACKed by any active client.
// The type accepted by counter is compatible with sync.WaitGroup.
func withPipelineEventCounter(pipeline beat.PipelineConnector, counter *eventCounter) beat.PipelineConnector {
counterListener := &countingEventer{counter}

pipeline = pipetool.WithClientConfigEdit(pipeline, func(config beat.ClientConfig) (beat.ClientConfig, error) {
if evts := config.Events; evts != nil {
config.Events = &combinedEventer{evts, counterListener}
} else {
config.Events = counterListener
}
return config, nil
})

pipeline = pipetool.WithClientWrapper(pipeline, func(client beat.Client) beat.Client {
return &countingClient{
counter: counter,
client: client,
}
})
return pipeline
}

func (c *countingClient) Publish(event beat.Event) {
c.counter.Add(1)
c.client.Publish(event)
}

func (c *countingClient) PublishAll(events []beat.Event) {
c.counter.Add(len(events))
c.client.PublishAll(events)
}

func (c *countingClient) Close() error {
return c.client.Close()
}

func (*countingEventer) Closing() {}
func (*countingEventer) Closed() {}
func (*countingEventer) Published() {}

func (c *countingEventer) FilteredOut(_ beat.Event) {}
func (c *countingEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}

func (c *combinedEventer) Closing() {
c.a.Closing()
c.b.Closing()
}

func (c *combinedEventer) Closed() {
c.a.Closed()
c.b.Closed()
}

func (c *combinedEventer) Published() {
c.a.Published()
c.b.Published()
}

func (c *combinedEventer) FilteredOut(event beat.Event) {
c.a.FilteredOut(event)
c.b.FilteredOut(event)
}

func (c *combinedEventer) DroppedOnPublish(event beat.Event) {
c.a.DroppedOnPublish(event)
c.b.DroppedOnPublish(event)
}
4 changes: 2 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

fb.pipeline = pipetool.WithPipelineEventCounter(b.Publisher, wgEvents)
fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend)
fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents)
fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend)

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create
Expand Down
94 changes: 0 additions & 94 deletions libbeat/publisher/pipetool/pipetool.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,76 +64,6 @@ func (p *wrapClientPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, er
return client, err
}

// countingClient adds and substracts from a counter when events have been
// published, dropped or ACKed. The countingClient can be used to keep track of
// inflight events for a beat.Client instance. The counter is updated after the
// client has been disconnected from the publisher pipeline via 'Closed'.
type countingClient struct {
counter eventCounter
client beat.Client
}

type eventCounter interface {
Add(n int)
Done()
}

type countingEventer struct {
wgEvents eventCounter
}

type combinedEventer struct {
a, b beat.ClientEventer
}

func (c *countingClient) Publish(event beat.Event) {
c.counter.Add(1)
c.client.Publish(event)
}

func (c *countingClient) PublishAll(events []beat.Event) {
c.counter.Add(len(events))
c.client.PublishAll(events)
}

func (c *countingClient) Close() error {
return c.client.Close()
}

func (*countingEventer) Closing() {}
func (*countingEventer) Closed() {}
func (*countingEventer) Published() {}

func (c *countingEventer) FilteredOut(_ beat.Event) {}
func (c *countingEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}

func (c *combinedEventer) Closing() {
c.a.Closing()
c.b.Closing()
}

func (c *combinedEventer) Closed() {
c.a.Closed()
c.b.Closed()
}

func (c *combinedEventer) Published() {
c.a.Published()
c.b.Published()
}

func (c *combinedEventer) FilteredOut(event beat.Event) {
c.a.FilteredOut(event)
c.b.FilteredOut(event)
}

func (c *combinedEventer) DroppedOnPublish(event beat.Event) {
c.a.DroppedOnPublish(event)
c.b.DroppedOnPublish(event)
}

// WithClientConfigEdit creates a pipeline connector, that allows the
// beat.ClientConfig to be modified before connecting to the underlying
// pipeline.
Expand All @@ -159,27 +89,3 @@ func WithDefaultGuarantees(pipeline beat.PipelineConnector, mode beat.PublishMod
func WithClientWrapper(pipeline beat.PipelineConnector, wrap ClientWrapper) beat.PipelineConnector {
return &wrapClientPipeline{parent: pipeline, wrapper: wrap}
}

// WithPipelineEventCounter adds a counter to the pipeline that keeps track of
// all events published, dropped and ACKed by any active client.
// The type accepted by counter is compatible with sync.WaitGroup.
func WithPipelineEventCounter(pipeline beat.PipelineConnector, counter eventCounter) beat.PipelineConnector {
counterListener := &countingEventer{counter}

pipeline = WithClientConfigEdit(pipeline, func(config beat.ClientConfig) (beat.ClientConfig, error) {
if evts := config.Events; evts != nil {
config.Events = &combinedEventer{evts, counterListener}
} else {
config.Events = counterListener
}
return config, nil
})

pipeline = WithClientWrapper(pipeline, func(client beat.Client) beat.Client {
return &countingClient{
counter: counter,
client: client,
}
})
return pipeline
}

0 comments on commit f09d457

Please sign in to comment.