diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 7a0a798bda8..ec36dd14574 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Filebeat* - Add multiline support for combining multiple related lines into one event. {issue}461[461] - Add close_older configuration option to complete ignore_older https://github.com/elastic/filebeat/issues/181[181] +- Add experimental option to enable filebeat publisher pipeline to operate asynchonrously {pull}782[782] *Winlogbeat* - Added support for reading event logs using the Windows Event Log API {pull}576[576] diff --git a/filebeat/beat/filebeat.go b/filebeat/beat/filebeat.go index 8f008b7f08e..8e33ec4ee15 100644 --- a/filebeat/beat/filebeat.go +++ b/filebeat/beat/filebeat.go @@ -5,9 +5,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher" cfg "github.com/elastic/beats/filebeat/config" . "github.com/elastic/beats/filebeat/crawler" @@ -93,7 +91,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } // Publishes event to output - go Publish(b, fb) + pub := newPublisher(fb.FbConfig.Filebeat.PublishAsync, + fb.publisherChan, fb.registrar.Channel, b.Events) + pub.Start() // Blocks progressing select { @@ -123,23 +123,3 @@ func (fb *Filebeat) Stop() { // Stop Filebeat close(fb.done) } - -func Publish(beat *beat.Beat, fb *Filebeat) { - logp.Info("Start sending events to output") - - // Receives events from spool during flush - for events := range fb.publisherChan { - - pubEvents := make([]common.MapStr, 0, len(events)) - for _, event := range events { - pubEvents = append(pubEvents, event.ToMapStr()) - } - - beat.Events.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed) - - logp.Info("Events sent: %d", len(events)) - - // Tell the registrar that we've successfully sent these events - fb.registrar.Channel <- events - } -} diff --git a/filebeat/beat/publish.go b/filebeat/beat/publish.go new file mode 100644 index 00000000000..895fa46e767 --- /dev/null +++ b/filebeat/beat/publish.go @@ -0,0 +1,223 @@ +package beat + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/publisher" +) + +type logPublisher interface { + Start() + Stop() +} + +type syncLogPublisher struct { + client publisher.Client + in, out chan []*input.FileEvent + + done chan struct{} + wg sync.WaitGroup +} + +type asyncLogPublisher struct { + client publisher.Client + in, out chan []*input.FileEvent + + // list of in-flight batches + active batchList + failing bool + + done chan struct{} + wg sync.WaitGroup +} + +// eventsBatch is used to store sorted list of actively published log lines. +// Implements `outputs.Signalerr` interface for marking batch as finished +type eventsBatch struct { + next *eventsBatch + flag int32 + events []*input.FileEvent +} + +type batchList struct { + head, tail *eventsBatch +} + +const ( + defaultGCTimeout = 1 * time.Second +) + +const ( + batchSuccess int32 = 1 + batchFailed int32 = 2 +) + +func newPublisher( + async bool, + in, out chan []*input.FileEvent, + client publisher.Client, +) logPublisher { + if async { + return newAsyncLogPublisher(in, out, client) + } + return newSyncLogPublisher(in, out, client) +} + +func newSyncLogPublisher( + in, out chan []*input.FileEvent, + client publisher.Client, +) *syncLogPublisher { + return &syncLogPublisher{ + in: in, + out: out, + client: client, + done: make(chan struct{}), + } +} + +func (p *syncLogPublisher) Start() { + p.wg.Add(1) + go func() { + defer p.wg.Done() + + logp.Info("Start sending events to output") + + for { + var events []*input.FileEvent + select { + case <-p.done: + return + case events = <-p.in: + } + + pubEvents := make([]common.MapStr, 0, len(events)) + for _, event := range events { + pubEvents = append(pubEvents, event.ToMapStr()) + } + + p.client.PublishEvents(pubEvents, publisher.Sync, publisher.Guaranteed) + logp.Info("Events sent: %d", len(events)) + + // Tell the registrar that we've successfully sent these events + select { + case <-p.done: + return + case p.out <- events: + } + } + }() +} + +func (p *syncLogPublisher) Stop() { + close(p.done) + p.wg.Wait() +} + +func newAsyncLogPublisher( + in, out chan []*input.FileEvent, + client publisher.Client, +) *asyncLogPublisher { + return &asyncLogPublisher{ + in: in, + out: out, + client: client, + done: make(chan struct{}), + } +} + +func (p *asyncLogPublisher) Start() { + p.wg.Add(1) + go func() { + defer p.wg.Done() + + logp.Info("Start sending events to output") + + // short gc timer, in case no logs are received from spooler the queued + // bulkEvents can still be cleaned up and forwarded to the registrar + ticker := time.NewTicker(defaultGCTimeout) + + for { + select { + case <-p.done: + return + case events := <-p.in: + pubEvents := make([]common.MapStr, len(events)) + for i, event := range events { + pubEvents[i] = event.ToMapStr() + } + + batch := &eventsBatch{ + flag: 0, + events: events, + } + p.client.PublishEvents(pubEvents, + publisher.Signal(batch), publisher.Guaranteed) + + p.active.append(batch) + case <-ticker.C: + } + + p.collect() + } + }() +} + +func (p *asyncLogPublisher) Stop() { + close(p.done) + p.wg.Wait() +} + +// collect collects finished bulk-Events in order and forward processed batches +// to registrar. Reports to registrar are guaranteed to be in same order +// as bulk-Events have been received by the spooler +func (p *asyncLogPublisher) collect() bool { + for batch := p.active.head; batch != nil; batch = batch.next { + state := atomic.LoadInt32(&batch.flag) + if state == 0 && !p.failing { + break + } + + // remove batch from active list + p.active.head = batch.next + if batch.next == nil { + p.active.tail = nil + } + + // Batches get marked as failed, if publisher pipeline is shutting down + // In this case we do not want to send any more batches to the registrar + if state == batchFailed { + p.failing = true + } + + if p.failing { + // if in failing state keep cleaning up queue + continue + } + + // Tell the registrar that we've successfully sent these events + select { + case <-p.done: + return false + case p.out <- batch.events: + } + } + return true +} + +func (b *eventsBatch) Completed() { atomic.StoreInt32(&b.flag, batchSuccess) } +func (b *eventsBatch) Failed() { atomic.StoreInt32(&b.flag, batchFailed) } + +func (l *batchList) append(b *eventsBatch) { + if l.head == nil { + l.head = b + } else { + l.tail.next = b + } + b.next = nil + l.tail = b +} diff --git a/filebeat/beat/publish_test.go b/filebeat/beat/publish_test.go new file mode 100644 index 00000000000..29eb865afa4 --- /dev/null +++ b/filebeat/beat/publish_test.go @@ -0,0 +1,79 @@ +package beat + +import ( + "fmt" + "testing" + "time" + + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/publisher" + "github.com/stretchr/testify/assert" +) + +func makeEvents(name string, n int) []*input.FileEvent { + var events []*input.FileEvent + for i := 0; i < n; i++ { + event := &input.FileEvent{ + ReadTime: time.Now(), + InputType: "log", + DocumentType: "log", + Bytes: 100, + Offset: int64(i), + Source: &name, + } + events = append(events, event) + } + return events +} + +func TestPublisherModes(t *testing.T) { + tests := []struct { + title string + async bool + order []int + }{ + {"sync", false, []int{1, 2, 3, 4, 5, 6}}, + {"async ordered signal", true, []int{1, 2, 3, 4, 5, 6}}, + {"async out of order signal", true, []int{5, 2, 3, 1, 4, 6}}, + } + + for i, test := range tests { + t.Logf("run publisher test (%v): %v", i, test.title) + + pubChan := make(chan []*input.FileEvent, len(test.order)+1) + regChan := make(chan []*input.FileEvent, len(test.order)+1) + client := publisher.ExtChanClient{make(chan publisher.PublishMessage)} + + pub := newPublisher(test.async, pubChan, regChan, client) + pub.Start() + + var events [][]*input.FileEvent + for i := range test.order { + tmp := makeEvents(fmt.Sprintf("msg: %v", i), 1) + pubChan <- tmp + events = append(events, tmp) + } + + var msgs []publisher.PublishMessage + for _ = range test.order { + m := <-client.Channel + msgs = append(msgs, m) + } + + for _, i := range test.order { + outputs.SignalCompleted(msgs[i-1].Context.Signal) + } + + var regEvents [][]*input.FileEvent + for _ = range test.order { + regEvents = append(regEvents, <-regChan) + } + pub.Stop() + + // validate order + for i := range events { + assert.Equal(t, events[i], regEvents[i]) + } + } +} diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 23122560aa5..1a28bcbd105 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -37,6 +37,7 @@ type Config struct { type FilebeatConfig struct { Prospectors []ProspectorConfig SpoolSize uint64 `yaml:"spool_size"` + PublishAsync bool `yaml:"publish_async"` IdleTimeout string `yaml:"idle_timeout"` IdleTimeoutDuration time.Duration RegistryFile string `yaml:"registry_file"` diff --git a/filebeat/docs/configuration.asciidoc b/filebeat/docs/configuration.asciidoc index 6d995130e92..00622e27f42 100644 --- a/filebeat/docs/configuration.asciidoc +++ b/filebeat/docs/configuration.asciidoc @@ -307,6 +307,13 @@ filebeat: ------------------------------------------------------------------------------------- +===== publish_async + +If enabled, the publisher pipeline in filebeat operates in async mode preparing +a new batch of lines while waiting for ACK. This option can improve load-balancing +throughput at the cost of increased memory usage. The default value is false. + + ===== idle_timeout A duration string that specifies how often the spooler is flushed. After the diff --git a/filebeat/etc/beat.yml b/filebeat/etc/beat.yml index 914386df123..393714c1b73 100644 --- a/filebeat/etc/beat.yml +++ b/filebeat/etc/beat.yml @@ -151,6 +151,9 @@ filebeat: # Event count spool threshold - forces network flush if exceeded #spool_size: 2048 + # Enable async publisher pipeline in filebeat (Experimental!) + #publish_async: false + # Defines how often the spooler is flushed. After idle_timeout the spooler is # Flush even though spool_size is not reached. #idle_timeout: 5s diff --git a/libbeat/publisher/async.go b/libbeat/publisher/async.go index 0651a99ba1a..fe3f9633d61 100644 --- a/libbeat/publisher/async.go +++ b/libbeat/publisher/async.go @@ -39,12 +39,12 @@ func (p *asyncPublisher) client() eventPublisher { return p } -func (p *asyncPublisher) PublishEvent(ctx context, event common.MapStr) bool { +func (p *asyncPublisher) PublishEvent(ctx Context, event common.MapStr) bool { p.send(message{context: ctx, event: event}) return true } -func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool { +func (p *asyncPublisher) PublishEvents(ctx Context, events []common.MapStr) bool { p.send(message{context: ctx, events: events}) return true } @@ -52,16 +52,16 @@ func (p *asyncPublisher) PublishEvents(ctx context, events []common.MapStr) bool func (p *asyncPublisher) send(m message) { if p.pub.disabled { debug("publisher disabled") - outputs.SignalCompleted(m.context.signal) + outputs.SignalCompleted(m.context.Signal) return } // m.signal is not set yet. But a async client type supporting signals might // be implemented in the future. - // If m.signal is nil, NewSplitSignaler will return nil -> signaler will + // If m.Signal is nil, NewSplitSignaler will return nil -> signaler will // only set if client did send one - if m.context.signal != nil && len(p.outputs) > 1 { - m.context.signal = outputs.NewSplitSignaler(m.context.signal, len(p.outputs)) + if m.context.Signal != nil && len(p.outputs) > 1 { + m.context.Signal = outputs.NewSplitSignaler(m.context.Signal, len(p.outputs)) } for _, o := range p.outputs { o.send(m) diff --git a/libbeat/publisher/bulk.go b/libbeat/publisher/bulk.go index 5604c1ff1df..01887c14541 100644 --- a/libbeat/publisher/bulk.go +++ b/libbeat/publisher/bulk.go @@ -58,9 +58,9 @@ func (b *bulkWorker) run() { case <-b.ws.done: return case m := <-b.queue: - b.onEvent(m.context.signal, m.event) + b.onEvent(m.context.Signal, m.event) case m := <-b.bulkQueue: - b.onEvents(m.context.signal, m.events) + b.onEvents(m.context.Signal, m.events) case <-b.flushTicker.C: if len(b.events) > 0 { b.publish() @@ -113,8 +113,8 @@ func (b *bulkWorker) onEvents(signal outputs.Signaler, events []common.MapStr) { func (b *bulkWorker) publish() { // TODO: remember/merge and forward context options to output worker b.output.send(message{ - context: context{ - signal: outputs.NewCompositeSignaler(b.pending...), + context: Context{ + Signal: outputs.NewCompositeSignaler(b.pending...), }, event: nil, events: b.events, diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index 7e2d211bf61..9d51aff58a8 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -31,6 +31,15 @@ type ChanClient struct { Channel chan common.MapStr } +type ExtChanClient struct { + Channel chan PublishMessage +} + +type PublishMessage struct { + Context Context + Events []common.MapStr +} + type client struct { publisher *PublisherType @@ -39,28 +48,28 @@ type client struct { } // ClientOption allows API users to set additional options when publishing events. -type ClientOption func(option context) context +type ClientOption func(option Context) Context // Guaranteed option will retry publishing the event, until send attempt have // been ACKed by output plugin. -func Guaranteed(o context) context { - o.guaranteed = true +func Guaranteed(o Context) Context { + o.Guaranteed = true return o } // Sync option will block the event publisher until an event has been ACKed by // the output plugin or failed. -func Sync(o context) context { - o.sync = true +func Sync(o Context) Context { + o.Sync = true return o } func Signal(signaler outputs.Signaler) ClientOption { - return func(ctx context) context { - if ctx.signal == nil { - ctx.signal = signaler + return func(ctx Context) Context { + if ctx.Signal == nil { + ctx.Signal = signaler } else { - ctx.signal = outputs.NewCompositeSignaler(ctx.signal, signaler) + ctx.Signal = outputs.NewCompositeSignaler(ctx.Signal, signaler) } return ctx } @@ -106,13 +115,9 @@ func (c *client) annotateEvent(event common.MapStr) { } } -func (c *client) getClient(opts []ClientOption) (context, eventPublisher) { - var ctx context - for _, opt := range opts { - ctx = opt(ctx) - } - - if ctx.sync { +func (c *client) getClient(opts []ClientOption) (Context, eventPublisher) { + ctx := makeContext(opts) + if ctx.Sync { return ctx, c.publisher.syncPublisher.client() } return ctx, c.publisher.asyncPublisher.client() @@ -133,3 +138,25 @@ func (c ChanClient) PublishEvents(events []common.MapStr, opts ...ClientOption) } return true } + +// PublishEvent will publish the event on the channel. Options will be ignored. +// Always returns true. +func (c ExtChanClient) PublishEvent(event common.MapStr, opts ...ClientOption) bool { + c.Channel <- PublishMessage{makeContext(opts), []common.MapStr{event}} + return true +} + +// PublishEvents publishes all event on the configured channel. Options will be ignored. +// Always returns true. +func (c ExtChanClient) PublishEvents(events []common.MapStr, opts ...ClientOption) bool { + c.Channel <- PublishMessage{makeContext(opts), events} + return true +} + +func makeContext(opts []ClientOption) Context { + var ctx Context + for _, opt := range opts { + ctx = opt(ctx) + } + return ctx +} diff --git a/libbeat/publisher/common_test.go b/libbeat/publisher/common_test.go index 28d05b84ca4..02b293f3e6a 100644 --- a/libbeat/publisher/common_test.go +++ b/libbeat/publisher/common_test.go @@ -36,9 +36,9 @@ func (mh *testMessageHandler) send(m message) { func (mh *testMessageHandler) acknowledgeMessage(m message) { if mh.response == CompletedResponse { - outputs.SignalCompleted(m.context.signal) + outputs.SignalCompleted(m.context.Signal) } else { - outputs.SignalFailed(m.context.signal, nil) + outputs.SignalFailed(m.context.Signal, nil) } } @@ -158,22 +158,22 @@ func newTestPublisher(bulkSize int, response OutputResponse) *testPublisher { } func (t *testPublisher) asyncPublishEvent(event common.MapStr) bool { - ctx := context{} + ctx := Context{} return t.pub.asyncPublisher.client().PublishEvent(ctx, event) } func (t *testPublisher) asyncPublishEvents(events []common.MapStr) bool { - ctx := context{} + ctx := Context{} return t.pub.asyncPublisher.client().PublishEvents(ctx, events) } func (t *testPublisher) syncPublishEvent(event common.MapStr) bool { - ctx := context{publishOptions: publishOptions{guaranteed: true}} + ctx := Context{publishOptions: publishOptions{Guaranteed: true}} return t.pub.syncPublisher.client().PublishEvent(ctx, event) } func (t *testPublisher) syncPublishEvents(events []common.MapStr) bool { - ctx := context{publishOptions: publishOptions{guaranteed: true}} + ctx := Context{publishOptions: publishOptions{Guaranteed: true}} return t.pub.syncPublisher.client().PublishEvents(ctx, events) } @@ -190,9 +190,9 @@ func newTestPublisherNoBulk(response OutputResponse) *testPublisher { } func testMessage(s *testSignaler, event common.MapStr) message { - return message{context: context{signal: s}, event: event} + return message{context: Context{Signal: s}, event: event} } func testBulkMessage(s *testSignaler, events []common.MapStr) message { - return message{context: context{signal: s}, events: events} + return message{context: Context{Signal: s}, events: events} } diff --git a/libbeat/publisher/output.go b/libbeat/publisher/output.go index a255504eaf8..b948f20a2c1 100644 --- a/libbeat/publisher/output.go +++ b/libbeat/publisher/output.go @@ -50,15 +50,15 @@ func (o *outputWorker) onMessage(m message) { } } -func (o *outputWorker) onEvent(ctx *context, event common.MapStr) { +func (o *outputWorker) onEvent(ctx *Context, event common.MapStr) { debug("output worker: publish single event") - o.out.PublishEvent(ctx.signal, outputs.Options{ctx.guaranteed}, event) + o.out.PublishEvent(ctx.Signal, outputs.Options{ctx.Guaranteed}, event) } -func (o *outputWorker) onBulk(ctx *context, events []common.MapStr) { +func (o *outputWorker) onBulk(ctx *Context, events []common.MapStr) { if len(events) == 0 { debug("output worker: no events to publish") - outputs.SignalCompleted(ctx.signal) + outputs.SignalCompleted(ctx.Signal) return } @@ -69,7 +69,7 @@ func (o *outputWorker) onBulk(ctx *context, events []common.MapStr) { // start splitting bulk request splits := (len(events) + (o.maxBulkSize - 1)) / o.maxBulkSize - ctx.signal = outputs.NewSplitSignaler(ctx.signal, splits) + ctx.Signal = outputs.NewSplitSignaler(ctx.Signal, splits) for len(events) > 0 { sz := o.maxBulkSize if sz > len(events) { @@ -81,12 +81,12 @@ func (o *outputWorker) onBulk(ctx *context, events []common.MapStr) { } func (o *outputWorker) sendBulk( - ctx *context, + ctx *Context, events []common.MapStr, ) { debug("output worker: publish %v events", len(events)) - err := o.out.BulkPublish(ctx.signal, outputs.Options{ctx.guaranteed}, events) + err := o.out.BulkPublish(ctx.Signal, outputs.Options{ctx.Guaranteed}, events) if err != nil { logp.Info("Error bulk publishing events: %s", err) } diff --git a/libbeat/publisher/output_test.go b/libbeat/publisher/output_test.go index ef376a98be5..0957f66a984 100644 --- a/libbeat/publisher/output_test.go +++ b/libbeat/publisher/output_test.go @@ -42,7 +42,7 @@ func TestOutputWorker(t *testing.T) { } for _, m := range testCases { - sig := m.context.signal.(*testSignaler) + sig := m.context.Signal.(*testSignaler) ow.onMessage(m) assert.True(t, sig.wait()) diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index 5d3b034dd5f..6f129b901cc 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -27,18 +27,18 @@ var debug = logp.MakeDebug("publish") // EventPublisher provides the interface for beats to publish events. type eventPublisher interface { - PublishEvent(ctx context, event common.MapStr) bool - PublishEvents(ctx context, events []common.MapStr) bool + PublishEvent(ctx Context, event common.MapStr) bool + PublishEvents(ctx Context, events []common.MapStr) bool } -type context struct { +type Context struct { publishOptions - signal outputs.Signaler + Signal outputs.Signaler } type publishOptions struct { - guaranteed bool - sync bool + Guaranteed bool + Sync bool } type TransactionalEventPublisher interface { diff --git a/libbeat/publisher/sync.go b/libbeat/publisher/sync.go index 0523e19fadf..1f0c9093187 100644 --- a/libbeat/publisher/sync.go +++ b/libbeat/publisher/sync.go @@ -19,12 +19,12 @@ func (p *syncPublisher) client() eventPublisher { return p } -func (p *syncPublisher) PublishEvent(ctx context, event common.MapStr) bool { +func (p *syncPublisher) PublishEvent(ctx Context, event common.MapStr) bool { msg := message{context: ctx, event: event} return p.send(msg) } -func (p *syncPublisher) PublishEvents(ctx context, events []common.MapStr) bool { +func (p *syncPublisher) PublishEvents(ctx Context, events []common.MapStr) bool { msg := message{context: ctx, events: events} return p.send(msg) } @@ -32,16 +32,16 @@ func (p *syncPublisher) PublishEvents(ctx context, events []common.MapStr) bool func (p *syncPublisher) send(m message) bool { if p.pub.disabled { debug("publisher disabled") - outputs.SignalCompleted(m.context.signal) + outputs.SignalCompleted(m.context.Signal) return true } - signal := m.context.signal + signal := m.context.Signal sync := outputs.NewSyncSignal() if len(p.pub.Output) > 1 { - m.context.signal = outputs.NewSplitSignaler(sync, len(p.pub.Output)) + m.context.Signal = outputs.NewSplitSignaler(sync, len(p.pub.Output)) } else { - m.context.signal = sync + m.context.Signal = sync } for _, o := range p.pub.Output { diff --git a/libbeat/publisher/worker.go b/libbeat/publisher/worker.go index 51f4d520ab9..1e1832334a6 100644 --- a/libbeat/publisher/worker.go +++ b/libbeat/publisher/worker.go @@ -25,7 +25,7 @@ type messageWorker struct { } type message struct { - context context + context Context event common.MapStr events []common.MapStr } @@ -105,6 +105,6 @@ func (ws *workerSignal) Init() { func stopQueue(qu chan message) { close(qu) for msg := range qu { // clear queue and send fail signal - outputs.SignalFailed(msg.context.signal, nil) + outputs.SignalFailed(msg.context.Signal, nil) } } diff --git a/libbeat/publisher/worker_test.go b/libbeat/publisher/worker_test.go index 9013d90d8fa..030532e01e1 100644 --- a/libbeat/publisher/worker_test.go +++ b/libbeat/publisher/worker_test.go @@ -17,12 +17,12 @@ func TestMessageWorkerSend(t *testing.T) { // Send an event. s1 := newTestSignaler() - m1 := message{context: context{signal: s1}} + m1 := message{context: Context{Signal: s1}} mw.send(m1) // Send another event. s2 := newTestSignaler() - m2 := message{context: context{signal: s2}} + m2 := message{context: Context{Signal: s2}} mw.send(m2) // Verify that the messageWorker pushed to two messages to the @@ -47,10 +47,10 @@ func TestMessageWorkerSend(t *testing.T) { // Test that stopQueue invokes the Failed callback on all events in the queue. func TestMessageWorkerStopQueue(t *testing.T) { s1 := newTestSignaler() - m1 := message{context: context{signal: s1}} + m1 := message{context: Context{Signal: s1}} s2 := newTestSignaler() - m2 := message{context: context{signal: s2}} + m2 := message{context: Context{Signal: s2}} qu := make(chan message, 2) qu <- m1