From ee7e9e24bc65e0c6e49786fac3dd08ac3b2717dc Mon Sep 17 00:00:00 2001 From: urso Date: Sun, 7 Feb 2016 03:43:05 +0100 Subject: [PATCH] Add async load balancer output mode - add load balancer working in async mode - add NewAsyncConnection mode generating a single-mode, fail-over mode and load-balanced mode output handler based on async-mode implementation. - rename WithError to WaitOnError + clarify comment --- libbeat/outputs/mode/backoff.go | 2 +- libbeat/outputs/mode/balance.go | 6 +- libbeat/outputs/mode/balance_async.go | 309 +++++++++++++++++ libbeat/outputs/mode/balance_async_test.go | 376 +++++++++++++++++++++ libbeat/outputs/mode/failover_client.go | 51 +++ libbeat/outputs/mode/mode.go | 51 ++- libbeat/outputs/mode/mode_test.go | 79 ++++- 7 files changed, 864 insertions(+), 10 deletions(-) create mode 100644 libbeat/outputs/mode/balance_async.go create mode 100644 libbeat/outputs/mode/balance_async_test.go diff --git a/libbeat/outputs/mode/backoff.go b/libbeat/outputs/mode/backoff.go index c0bbcfb4a39..8f221bb1790 100644 --- a/libbeat/outputs/mode/backoff.go +++ b/libbeat/outputs/mode/backoff.go @@ -41,7 +41,7 @@ func (b *backoff) Wait() bool { } } -func (b *backoff) WithError(err error) bool { +func (b *backoff) WaitOnError(err error) bool { if err == nil { b.Reset() return true diff --git a/libbeat/outputs/mode/balance.go b/libbeat/outputs/mode/balance.go index 0214961ece9..98477672c8c 100644 --- a/libbeat/outputs/mode/balance.go +++ b/libbeat/outputs/mode/balance.go @@ -176,7 +176,7 @@ func (m *LoadBalancerMode) connect(client ProtocolClient, backoff *backoff) bool for { debug("try to (re-)connect client") err := client.Connect(m.timeout) - if !backoff.WithError(err) { + if !backoff.WaitOnError(err) { return true } @@ -212,7 +212,7 @@ func (m *LoadBalancerMode) onMessage( done := false if msg.event != nil { err := client.PublishEvent(msg.event) - done = !backoff.WithError(err) + done = !backoff.WaitOnError(err) if err != nil { if msg.attemptsLeft > 0 { msg.attemptsLeft-- @@ -228,7 +228,7 @@ func (m *LoadBalancerMode) onMessage( var err error events, err = client.PublishEvents(events) - done = !backoff.WithError(err) + done = !backoff.WaitOnError(err) if done && err != nil { outputs.SignalFailed(msg.signaler, err) return done, err diff --git a/libbeat/outputs/mode/balance_async.go b/libbeat/outputs/mode/balance_async.go new file mode 100644 index 00000000000..83aecd43707 --- /dev/null +++ b/libbeat/outputs/mode/balance_async.go @@ -0,0 +1,309 @@ +package mode + +import ( + "sync" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" +) + +// AsyncLoadBalancerMode balances the sending of events between multiple connections. +// +// The balancing algorithm is mostly pull-based, with multiple workers trying to pull +// some amount of work from a shared queue. Workers will try to get a new work item +// only if they have a working/active connection. Workers without active connection +// do not participate until a connection has been re-established. +// Due to the pull based nature the algorithm will load-balance events by random +// with workers having less latencies/turn-around times potentially getting more +// work items then other workers with higher latencies. Thusly the algorithm +// dynamically adapts to resource availability of server events are forwarded to. +// +// Workers not participating in the load-balancing will continuously try to reconnect +// to their configured endpoints. Once a new connection has been established, +// these workers will participate in in load-balancing again. +// +// If a connection becomes unavailable, the events are rescheduled for another +// connection to pick up. Rescheduling events is limited to a maximum number of +// send attempts. If events have not been send after maximum number of allowed +// attemps has been passed, they will be dropped. +// +// Like network connections, distributing events to workers is subject to +// timeout. If no worker is available to pickup a message for sending, the message +// will be dropped internally after max_retries. If mode or message requires +// guaranteed send, message is retried infinitely. +type AsyncLoadBalancerMode struct { + timeout time.Duration // Send/retry timeout. Every timeout is a failed send attempt + waitRetry time.Duration // Duration to wait during re-connection attempts. + maxWaitRetry time.Duration // Maximum send/retry timeout in backoff case. + + // maximum number of configured send attempts. If set to 0, publisher will + // block until event has been successfully published. + maxAttempts int + + // waitGroup + signaling channel for handling shutdown + wg sync.WaitGroup + done chan struct{} + + // channels for forwarding work items to workers. + // The work channel is used by publisher to insert new events + // into the load balancer. The work channel is synchronous blocking until timeout + // for one worker available. + // The retries channel is used to forward failed send attempts to other workers. + // The retries channel is buffered to mitigate possible deadlocks when all + // workers become unresponsive. + work chan eventsMessage + retries chan eventsMessage +} + +// NewAsyncLoadBalancerMode create a new load balancer connection mode. +func NewAsyncLoadBalancerMode( + clients []AsyncProtocolClient, + maxAttempts int, + waitRetry, timeout, maxWaitRetry time.Duration, +) (*AsyncLoadBalancerMode, error) { + + // maxAttempts signals infinite retry. Convert to -1, so attempts left and + // and infinite retry can be more easily distinguished by load balancer + if maxAttempts == 0 { + maxAttempts = -1 + } + + m := &AsyncLoadBalancerMode{ + timeout: timeout, + maxWaitRetry: maxWaitRetry, + waitRetry: waitRetry, + maxAttempts: maxAttempts, + + work: make(chan eventsMessage), + retries: make(chan eventsMessage, len(clients)*2), + done: make(chan struct{}), + } + m.start(clients) + + return m, nil +} + +// Close stops all workers and closes all open connections. In flight events +// are signaled as failed. +func (m *AsyncLoadBalancerMode) Close() error { + close(m.done) + m.wg.Wait() + return nil +} + +// PublishEvents forwards events to some load balancing worker. +func (m *AsyncLoadBalancerMode) PublishEvents( + signaler outputs.Signaler, + opts outputs.Options, + events []common.MapStr, +) error { + return m.publishEventsMessage(opts, + eventsMessage{signaler: signaler, events: events}) +} + +// PublishEvent forwards the event to some load balancing worker. +func (m *AsyncLoadBalancerMode) PublishEvent( + signaler outputs.Signaler, + opts outputs.Options, + event common.MapStr, +) error { + return m.publishEventsMessage(opts, + eventsMessage{signaler: signaler, event: event}) +} + +func (m *AsyncLoadBalancerMode) publishEventsMessage( + opts outputs.Options, + msg eventsMessage, +) error { + maxAttempts := m.maxAttempts + if opts.Guaranteed { + maxAttempts = -1 + } + msg.attemptsLeft = maxAttempts + + if ok := m.forwardEvent(m.work, msg); !ok { + dropping(msg) + } + return nil +} + +func (m *AsyncLoadBalancerMode) start(clients []AsyncProtocolClient) { + var waitStart sync.WaitGroup + worker := func(client AsyncProtocolClient) { + defer func() { + if client.IsConnected() { + _ = client.Close() + } + m.wg.Done() + }() + + waitStart.Done() + + backoff := newBackoff(m.done, m.waitRetry, m.maxWaitRetry) + for { + // reconnect loop + for !client.IsConnected() { + if err := client.Connect(m.timeout); err == nil { + break + } + + if !backoff.Wait() { // done channel closed + return + } + } + + // receive and process messages + var msg eventsMessage + select { + case <-m.done: + return + case msg = <-m.retries: // receive message from other failed worker + debug("events from retries queue") + case msg = <-m.work: // receive message from publisher + debug("events from worker worker queue") + } + + err := m.onMessage(client, msg) + if !backoff.WaitOnError(err) { // done channel closed + return + } + } + } + + for _, client := range clients { + m.wg.Add(1) + waitStart.Add(1) + go worker(client) + } + waitStart.Wait() +} + +func (m *AsyncLoadBalancerMode) onMessage( + client AsyncProtocolClient, + msg eventsMessage, +) error { + var err error + if msg.event != nil { + err = client.AsyncPublishEvent(handlePublishEventResult(m, msg), msg.event) + } else { + err = client.AsyncPublishEvents(handlePublishEventsResult(m, msg), msg.events) + } + + if err != nil { + if msg.attemptsLeft > 0 { + msg.attemptsLeft-- + } + + // asynchronously retry to insert message (if attempts left), so worker can not + // deadlock on retries channel if client puts multiple failed outstanding + // events into the pipeline + m.onFail(true, msg, err) + } + + return err +} + +func handlePublishEventResult(m *AsyncLoadBalancerMode, msg eventsMessage) func(error) { + return func(err error) { + if err != nil { + if msg.attemptsLeft > 0 { + msg.attemptsLeft-- + } + m.onFail(false, msg, err) + } else { + outputs.SignalCompleted(msg.signaler) + } + } +} + +func handlePublishEventsResult( + m *AsyncLoadBalancerMode, + msg eventsMessage, +) func([]common.MapStr, error) { + total := len(msg.events) + return func(events []common.MapStr, err error) { + if err != nil { + if msg.attemptsLeft > 0 { + msg.attemptsLeft-- + } + + // reset attempt count if subset of messages has been processed + if len(events) < total && msg.attemptsLeft >= 0 { + msg.attemptsLeft = m.maxAttempts + } + + if err != ErrTempBulkFailure { + // retry non-published subset of events in batch + msg.events = events + m.onFail(false, msg, err) + return + } + + if m.maxAttempts > 0 && msg.attemptsLeft == 0 { + // no more attempts left => drop + dropping(msg) + return + } + + // retry non-published subset of events in batch + msg.events = events + m.onFail(false, msg, err) + return + } + + // re-insert non-published events into pipeline + if len(events) != 0 { + msg.events = events + if ok := m.forwardEvent(m.retries, msg); !ok { + dropping(msg) + } + return + } + + // all events published -> signal success + outputs.SignalCompleted(msg.signaler) + } +} + +func (m *AsyncLoadBalancerMode) onFail(async bool, msg eventsMessage, err error) { + fn := func() { + logp.Info("Error publishing events (retrying): %s", err) + + if ok := m.forwardEvent(m.retries, msg); !ok { + dropping(msg) + } + } + + if async { + go fn() + } else { + fn() + } +} + +func (m *AsyncLoadBalancerMode) forwardEvent( + ch chan eventsMessage, + msg eventsMessage, +) bool { + if msg.attemptsLeft < 0 { + select { + case ch <- msg: + return true + case <-m.done: // shutdown + return false + } + } else { + for ; msg.attemptsLeft > 0; msg.attemptsLeft-- { + select { + case ch <- msg: + return true + case <-m.done: // shutdown + return false + case <-time.After(m.timeout): + } + } + } + return false +} diff --git a/libbeat/outputs/mode/balance_async_test.go b/libbeat/outputs/mode/balance_async_test.go new file mode 100644 index 00000000000..515b23c24da --- /dev/null +++ b/libbeat/outputs/mode/balance_async_test.go @@ -0,0 +1,376 @@ +package mode + +import ( + "errors" + "testing" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func TestAsyncLBStartStop(t *testing.T) { + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{}, + false, + 1, + 100*time.Millisecond, + 100*time.Millisecond, + 1*time.Second, + ) + testMode(t, mode, testNoOpts, nil, nil, nil) +} + +func testAsyncLBFailSendWithoutActiveConnection(t *testing.T, events []eventInfo) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + errFail := errors.New("fail connect") + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: false, + close: closeOK, + connect: alwaysFailConnect(errFail), + }, + &mockClient{ + connected: false, + close: closeOK, + connect: alwaysFailConnect(errFail), + }, + }, + false, + 2, + 100*time.Millisecond, + 100*time.Millisecond, + 1*time.Second, + ) + testMode(t, mode, testNoOpts, events, signals(false), nil) +} + +func TestAsyncLBFailSendWithoutActiveConnections(t *testing.T) { + testAsyncLBFailSendWithoutActiveConnection(t, singleEvent(testEvent)) +} + +func TestAsyncLBFailSendMultWithoutActiveConnections(t *testing.T) { + testAsyncLBFailSendWithoutActiveConnection(t, multiEvent(2, testEvent)) +} + +func testAsyncLBOKSend(t *testing.T, events []eventInfo) { + var collected [][]common.MapStr + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: false, + close: closeOK, + connect: connectOK, + asyncPublish: asyncCollectPublish(&collected), + }, + }, + false, + 2, + 100*time.Millisecond, + 100*time.Millisecond, + 1*time.Second, + ) + testMode(t, mode, testNoOpts, events, signals(true), &collected) +} + +func TestAsyncLBOKSend(t *testing.T) { + testAsyncLBOKSend(t, singleEvent(testEvent)) +} + +func TestAsyncLBOKSendMult(t *testing.T) { + testAsyncLBOKSend(t, multiEvent(10, testEvent)) +} + +func testAsyncLBFlakyConnectionOkSend(t *testing.T, events []eventInfo) { + var collected [][]common.MapStr + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailStart(1, asyncCollectPublish(&collected)), + }, + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailStart(1, asyncCollectPublish(&collected)), + }, + }, + false, + 3, + 100*time.Millisecond, + 100*time.Millisecond, + 1*time.Second, + ) + testMode(t, mode, testNoOpts, events, signals(true), &collected) +} + +func TestAsyncLBFlakyConnectionOkSend(t *testing.T) { + testAsyncLBFlakyConnectionOkSend(t, singleEvent(testEvent)) +} + +func TestAsyncLBFlakyConnectionOkSendMult(t *testing.T) { + testAsyncLBFlakyConnectionOkSend(t, multiEvent(10, testEvent)) +} + +func testAsyncLBFlakyFail(t *testing.T, events []eventInfo) { + var collected [][]common.MapStr + + err := errors.New("flaky") + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(3, err, asyncCollectPublish(&collected)), + }, + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(3, err, asyncCollectPublish(&collected)), + }, + }, + false, + 3, + 100*time.Millisecond, + 100*time.Millisecond, + 1*time.Second, + ) + testMode(t, mode, testNoOpts, events, signals(false), &collected) +} + +func TestAsyncLBFlakyFail(t *testing.T) { + testAsyncLBFlakyFail(t, singleEvent(testEvent)) +} + +func TestAsyncLBMultiFlakyFail(t *testing.T) { + testAsyncLBFlakyFail(t, multiEvent(10, testEvent)) +} + +func testAsyncLBTemporayFailure(t *testing.T, events []eventInfo) { + var collected [][]common.MapStr + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(1, ErrTempBulkFailure, + asyncCollectPublish(&collected)), + }, + }, + false, + 3, + 100*time.Millisecond, + 100*time.Millisecond, + 1*time.Second, + ) + testMode(t, mode, testNoOpts, events, signals(true), &collected) +} + +func TestAsyncLBTemporayFailure(t *testing.T) { + testAsyncLBTemporayFailure(t, singleEvent(testEvent)) +} + +func TestAsyncLBTemporayFailureMutlEvents(t *testing.T) { + testAsyncLBTemporayFailure(t, multiEvent(10, testEvent)) +} + +func testAsyncLBTempFlakyFail(t *testing.T, events []eventInfo) { + var collected [][]common.MapStr + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(3, ErrTempBulkFailure, + asyncCollectPublish(&collected)), + }, + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(3, ErrTempBulkFailure, + asyncCollectPublish(&collected)), + }, + }, + false, + 3, + 100*time.Millisecond, + 100*time.Millisecond, + 1*time.Second, + ) + testMode(t, mode, testNoOpts, events, signals(false), &collected) +} + +func TestAsyncLBTempFlakyFail(t *testing.T) { + testAsyncLBTempFlakyFail(t, singleEvent(testEvent)) +} + +func TestAsyncLBMultiTempFlakyFail(t *testing.T) { + testAsyncLBTempFlakyFail(t, multiEvent(10, testEvent)) +} + +func testAsyncLBFlakyInfAttempts(t *testing.T, events []eventInfo) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + var collected [][]common.MapStr + err := errors.New("flaky") + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(50, err, asyncCollectPublish(&collected)), + }, + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(50, err, asyncCollectPublish(&collected)), + }, + }, + false, + 0, + 1*time.Nanosecond, + 1*time.Millisecond, + 4*time.Millisecond, + ) + testMode(t, mode, testNoOpts, events, signals(true), &collected) +} + +func TestAsyncLBFlakyInfAttempts(t *testing.T) { + testAsyncLBFlakyInfAttempts(t, singleEvent(testEvent)) +} + +func TestAsyncLBMultiFlakyInfAttempts(t *testing.T) { + testAsyncLBFlakyInfAttempts(t, multiEvent(10, testEvent)) +} + +func testAsyncLBFlakyInfAttempts2(t *testing.T, events []eventInfo) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + var collected [][]common.MapStr + err := errors.New("flaky") + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailStartWith(50, err, asyncCollectPublish(&collected)), + }, + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailStartWith(50, err, asyncCollectPublish(&collected)), + }, + }, + false, + 0, + 1*time.Nanosecond, + 1*time.Millisecond, + 4*time.Millisecond, + ) + testMode(t, mode, testNoOpts, events, signals(true), &collected) +} + +func TestAsyncLBFlakyInfAttempts2(t *testing.T) { + testAsyncLBFlakyInfAttempts2(t, singleEvent(testEvent)) +} + +func TestAsyncLBMultiFlakyInfAttempts2(t *testing.T) { + testAsyncLBFlakyInfAttempts2(t, multiEvent(10, testEvent)) +} + +func testAsyncLBFlakyGuaranteed(t *testing.T, events []eventInfo) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + var collected [][]common.MapStr + err := errors.New("flaky") + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(50, err, asyncCollectPublish(&collected)), + }, + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailWith(50, err, asyncCollectPublish(&collected)), + }, + }, + false, + 3, + 1*time.Nanosecond, + 1*time.Millisecond, + 4*time.Millisecond, + ) + testMode(t, mode, testGuaranteed, events, signals(true), &collected) +} + +func TestAsyncLBFlakyGuaranteed(t *testing.T) { + testAsyncLBFlakyGuaranteed(t, singleEvent(testEvent)) +} + +func TestAsyncLBMultiFlakyGuaranteed(t *testing.T) { + testAsyncLBFlakyGuaranteed(t, multiEvent(10, testEvent)) +} + +func testAsyncLBFlakyGuaranteed2(t *testing.T, events []eventInfo) { + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + var collected [][]common.MapStr + err := errors.New("flaky") + mode, _ := NewAsyncConnectionMode( + []AsyncProtocolClient{ + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailStartWith(50, err, asyncCollectPublish(&collected)), + }, + &mockClient{ + connected: true, + close: closeOK, + connect: connectOK, + asyncPublish: asyncFailStartWith(50, err, asyncCollectPublish(&collected)), + }, + }, + false, + 3, + 1*time.Nanosecond, + 1*time.Millisecond, + 4*time.Millisecond, + ) + testMode(t, mode, testGuaranteed, events, signals(true), &collected) +} + +func TestAsyncLBFlakyGuaranteed2(t *testing.T) { + testAsyncLBFlakyGuaranteed2(t, singleEvent(testEvent)) +} + +func TestAsyncLBMultiFlakyGuaranteed2(t *testing.T) { + testAsyncLBFlakyGuaranteed2(t, multiEvent(10, testEvent)) +} diff --git a/libbeat/outputs/mode/failover_client.go b/libbeat/outputs/mode/failover_client.go index 19427013362..88571889a18 100644 --- a/libbeat/outputs/mode/failover_client.go +++ b/libbeat/outputs/mode/failover_client.go @@ -13,6 +13,11 @@ type failOverClient struct { active int } +type asyncFailOverClient struct { + conns []AsyncProtocolClient + active int +} + type clientList interface { Active() int Len() int @@ -65,6 +70,52 @@ func (f *failOverClient) PublishEvent(event common.MapStr) error { return f.conns[f.active].PublishEvent(event) } +func NewAsyncFailoverClient(clients []AsyncProtocolClient) []AsyncProtocolClient { + if len(clients) <= 1 { + return clients + } + return []AsyncProtocolClient{ + &asyncFailOverClient{conns: clients, active: -1}, + } +} + +func (f *asyncFailOverClient) Active() int { return f.active } +func (f *asyncFailOverClient) Len() int { return len(f.conns) } +func (f *asyncFailOverClient) Get(i int) Connectable { return f.conns[i] } +func (f *asyncFailOverClient) Activate(i int) { f.active = i } + +func (f *asyncFailOverClient) Connect(to time.Duration) error { + return connect(f, to) +} + +func (f *asyncFailOverClient) IsConnected() bool { + return f.active >= 0 && f.conns[f.active].IsConnected() +} + +func (f *asyncFailOverClient) Close() error { + return closeActive(f) +} + +func (f *asyncFailOverClient) AsyncPublishEvents( + cb func([]common.MapStr, error), + events []common.MapStr, +) error { + if f.active < 0 { + return errNoActiveConnection + } + return f.conns[f.active].AsyncPublishEvents(cb, events) +} + +func (f *asyncFailOverClient) AsyncPublishEvent( + cb func(error), + event common.MapStr, +) error { + if f.active < 0 { + return errNoActiveConnection + } + return f.conns[f.active].AsyncPublishEvent(cb, event) +} + func connect(lst clientList, to time.Duration) error { active := lst.Active() l := lst.Len() diff --git a/libbeat/outputs/mode/mode.go b/libbeat/outputs/mode/mode.go index e8e779525a6..749bb93715e 100644 --- a/libbeat/outputs/mode/mode.go +++ b/libbeat/outputs/mode/mode.go @@ -71,6 +71,16 @@ type ProtocolClient interface { PublishEvent(event common.MapStr) error } +// AsyncProtocolClient interface is a output plugin specfic client implementation +// for asynchronous encoding and publishing events. +type AsyncProtocolClient interface { + Connectable + + AsyncPublishEvents(cb func([]common.MapStr, error), events []common.MapStr) error + + AsyncPublishEvent(cb func(error), event common.MapStr) error +} + var ( // ErrTempBulkFailure indicates PublishEvents fail temporary to retry. ErrTempBulkFailure = errors.New("temporary bulk send failure") @@ -98,13 +108,26 @@ func NewConnectionMode( waitRetry, timeout, maxWaitRetry) } +func NewAsyncConnectionMode( + clients []AsyncProtocolClient, + failover bool, + maxAttempts int, + waitRetry, timeout, maxWaitRetry time.Duration, +) (ConnectionMode, error) { + if failover { + clients = NewAsyncFailoverClient(clients) + } + return NewAsyncLoadBalancerMode(clients, maxAttempts, + waitRetry, timeout, maxWaitRetry) +} + // MakeClients will create a list from of ProtocolClient instances from // outputer configuration host list and client factory function. func MakeClients( config outputs.MothershipConfig, newClient func(string) (ProtocolClient, error), ) ([]ProtocolClient, error) { - hosts := readHostList(config) + hosts := ReadHostList(config) if len(hosts) == 0 { return nil, ErrNoHostsConfigured } @@ -124,7 +147,31 @@ func MakeClients( return clients, nil } -func readHostList(config outputs.MothershipConfig) []string { +func MakeAsyncClients( + config outputs.MothershipConfig, + newClient func(string) (AsyncProtocolClient, error), +) ([]AsyncProtocolClient, error) { + hosts := ReadHostList(config) + if len(hosts) == 0 { + return nil, ErrNoHostsConfigured + } + + clients := make([]AsyncProtocolClient, 0, len(hosts)) + for _, host := range hosts { + client, err := newClient(host) + if err != nil { + // on error destroy all client instance created + for _, client := range clients { + _ = client.Close() // ignore error + } + return nil, err + } + clients = append(clients, client) + } + return clients, nil +} + +func ReadHostList(config outputs.MothershipConfig) []string { var lst []string // TODO: remove config.Host diff --git a/libbeat/outputs/mode/mode_test.go b/libbeat/outputs/mode/mode_test.go index d29621b0fc8..91488572664 100644 --- a/libbeat/outputs/mode/mode_test.go +++ b/libbeat/outputs/mode/mode_test.go @@ -13,10 +13,11 @@ import ( ) type mockClient struct { - publish func([]common.MapStr) ([]common.MapStr, error) - close func() error - connected bool - connect func(time.Duration) error + publish func([]common.MapStr) ([]common.MapStr, error) + asyncPublish func(func([]common.MapStr, error), []common.MapStr) error + close func() error + connected bool + connect func(time.Duration) error } func enableLogging(selectors []string) { @@ -46,6 +47,16 @@ func (c *mockClient) PublishEvent(event common.MapStr) error { return err } +func (c *mockClient) AsyncPublishEvents(cb func([]common.MapStr, error), events []common.MapStr) error { + return c.asyncPublish(cb, events) +} + +func (c *mockClient) AsyncPublishEvent(cb func(error), event common.MapStr) error { + return c.AsyncPublishEvents( + func(evts []common.MapStr, err error) { cb(err) }, + []common.MapStr{event}) +} + func connectOK(timeout time.Duration) error { return nil } @@ -81,6 +92,20 @@ func collectPublish( } } +func asyncCollectPublish( + collected *[][]common.MapStr, +) func(func([]common.MapStr, error), []common.MapStr) error { + mutex := sync.Mutex{} + return func(cb func([]common.MapStr, error), events []common.MapStr) error { + mutex.Lock() + defer mutex.Unlock() + + *collected = append(*collected, events) + cb(nil, nil) + return nil + } +} + type errNetTimeout struct{} func (e errNetTimeout) Error() string { return "errNetTimeout" } @@ -125,6 +150,52 @@ func publishFailStart( return publishFailWith(n, errNetTimeout{}, pub) } +func asyncFailStart( + n int, + pub func(func([]common.MapStr, error), []common.MapStr) error, +) func(func([]common.MapStr, error), []common.MapStr) error { + return asyncFailStartWith(n, errNetTimeout{}, pub) +} + +func asyncFailStartWith( + n int, + err error, + pub func(func([]common.MapStr, error), []common.MapStr) error, +) func(func([]common.MapStr, error), []common.MapStr) error { + count := 0 + return func(cb func([]common.MapStr, error), events []common.MapStr) error { + if count < n { + count++ + debug("fail with(%v): %v", count, err) + return err + } + + count = 0 + debug("forward events") + return pub(cb, events) + } +} + +func asyncFailWith( + n int, + err error, + pub func(func([]common.MapStr, error), []common.MapStr) error, +) func(func([]common.MapStr, error), []common.MapStr) error { + count := 0 + return func(cb func([]common.MapStr, error), events []common.MapStr) error { + if count < n { + count++ + go func() { + cb(events, err) + }() + return nil + } + + count = 0 + return pub(cb, events) + } +} + func closeOK() error { return nil }