diff --git a/NOTICE.txt b/NOTICE.txt index feb37019a44..af6d0a8ca44 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -10311,11 +10311,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-shipper-client -Version: v0.4.0 +Version: v0.5.0 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.4.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.5.0/LICENSE.txt: Elastic License 2.0 @@ -21964,11 +21964,11 @@ THE SOFTWARE. -------------------------------------------------------------------------------- Dependency : go.uber.org/multierr -Version: v1.8.0 +Version: v1.9.0 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/go.uber.org/multierr@v1.8.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/go.uber.org/multierr@v1.9.0/LICENSE.txt: Copyright (c) 2017-2021 Uber Technologies, Inc. @@ -21993,11 +21993,11 @@ THE SOFTWARE. -------------------------------------------------------------------------------- Dependency : go.uber.org/zap -Version: v1.23.0 +Version: v1.24.0 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/go.uber.org/zap@v1.23.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/go.uber.org/zap@v1.24.0/LICENSE.txt: Copyright (c) 2016-2017 Uber Technologies, Inc. diff --git a/go.mod b/go.mod index dc690d7aa9b..8d9f3ea9d45 100644 --- a/go.mod +++ b/go.mod @@ -152,8 +152,8 @@ require ( go.elastic.co/go-licence-detector v0.5.0 go.etcd.io/bbolt v1.3.6 go.uber.org/atomic v1.10.0 - go.uber.org/multierr v1.8.0 - go.uber.org/zap v1.23.0 + go.uber.org/multierr v1.9.0 + go.uber.org/zap v1.24.0 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 @@ -194,7 +194,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/elastic-agent-autodiscover v0.5.0 github.com/elastic/elastic-agent-libs v0.3.3 - github.com/elastic/elastic-agent-shipper-client v0.4.0 + github.com/elastic/elastic-agent-shipper-client v0.5.0 github.com/elastic/elastic-agent-system-metrics v0.4.6-0.20230308003052-ba171438211e github.com/elastic/go-elasticsearch/v8 v8.2.0 github.com/elastic/mito v0.0.0-20230302005114-1dda06e81678 diff --git a/go.sum b/go.sum index fb4d713d276..7751f91b6d3 100644 --- a/go.sum +++ b/go.sum @@ -616,8 +616,8 @@ github.com/elastic/elastic-agent-client/v7 v7.0.3-0.20230315204017-166fd1fd746f/ github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.3.3 h1:iE8XhqQ0zRBLba+eu6ScZED0DYcVP/r2JvjcVoOkxic= github.com/elastic/elastic-agent-libs v0.3.3/go.mod h1:nRkcK96PSJfK232cJRx17n9+/MVAIOzs5ghZdzXJAMo= -github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk= -github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= +github.com/elastic/elastic-agent-shipper-client v0.5.0 h1:rkdq7K8+ESNMXtMPzlwiiENTZz2Y6m4lN8SIMFrHuJA= +github.com/elastic/elastic-agent-shipper-client v0.5.0/go.mod h1:rWarFM7qYxJKsi9WcV6ONcFjH/NA3niDNpTxO+8/GVI= github.com/elastic/elastic-agent-system-metrics v0.4.6-0.20230308003052-ba171438211e h1:OIfumgZhI6lI7Qy1KD1VzuqvX9DWSBpXJsvj97s7MRM= github.com/elastic/elastic-agent-system-metrics v0.4.6-0.20230308003052-ba171438211e/go.mod h1:v/t/qgYueW3ZOm7SZhYY3ng9GWDddDLu7pmG4Ra3PBs= github.com/elastic/elastic-transport-go/v8 v8.1.0 h1:NeqEz1ty4RQz+TVbUrpSU7pZ48XkzGWQj02k5koahIE= @@ -1877,8 +1877,8 @@ go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= -go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= -go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -1887,8 +1887,8 @@ go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= -go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= -go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180505025534-4ec37c66abab/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index 6163b6fb8f9..319e8badee2 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -31,7 +31,6 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "google.golang.org/grpc" @@ -72,6 +71,9 @@ func init() { outputs.RegisterType("shipper", makeShipper) } +// shipperProcessor serves as a wrapper for testing Publish() calls with alternate marshalling callbacks +var shipperProcessor = toShipperEvent + func makeShipper( _ outputs.IndexManager, beat beat.Info, @@ -191,7 +193,7 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error { droppedCount := 0 for i, e := range events { - converted, err := toShipperEvent(e) + converted, err := shipperProcessor(e) if err != nil { // conversion errors are not recoverable, so we have to drop the event completely s.log.Errorf("%d/%d: %q, dropped", i+1, len(events), err) @@ -234,7 +236,7 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error { } toSend = toSend[publishReply.AcceptedCount:] lastAcceptedIndex = publishReply.AcceptedIndex - s.log.Debugf("%d events have been accepted during a publish request", len(toSend)) + s.log.Debugf("%d events have been accepted during a publish request", publishReply.AcceptedCount) } s.log.Debugf("total of %d events have been accepted from batch, %d dropped", convertedCount, droppedCount) @@ -322,44 +324,13 @@ func (s *shipper) ackLoop(ctx context.Context, ackClient sc.Producer_PersistedIn } } -func convertMapStr(m mapstr.M) (*messages.Value, error) { - if m == nil { - return helpers.NewNullValue(), nil - } - - fields := make(map[string]*messages.Value, len(m)) - - for key, value := range m { - var ( - protoValue *messages.Value - err error - ) - switch v := value.(type) { - case mapstr.M: - protoValue, err = convertMapStr(v) - default: - protoValue, err = helpers.NewValue(v) - } - if err != nil { - return nil, err - } - fields[key] = protoValue - } - - s := &messages.Struct{ - Data: fields, - } - - return helpers.NewStructValue(s), nil -} - func toShipperEvent(e publisher.Event) (*messages.Event, error) { - meta, err := convertMapStr(e.Content.Meta) + meta, err := helpers.NewValue(e.Content.Meta) if err != nil { return nil, fmt.Errorf("failed to convert event metadata to protobuf: %w", err) } - fields, err := convertMapStr(e.Content.Fields) + fields, err := helpers.NewValue(e.Content.Fields) if err != nil { return nil, fmt.Errorf("failed to convert event fields to protobuf: %w", err) } diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index af028f886a7..02934dce436 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -46,7 +46,6 @@ import ( ) func TestToShipperEvent(t *testing.T) { - wrong := struct{}{} ts := time.Now().Truncate(time.Second) cases := []struct { @@ -126,30 +125,6 @@ func TestToShipperEvent(t *testing.T) { }), }, }, - { - name: "returns error if failed to convert metadata", - value: publisher.Event{ - Content: beat.Event{ - Timestamp: ts, - Meta: mapstr.M{ - "metafield": wrong, - }, - }, - }, - expErr: "failed to convert event metadata", - }, - { - name: "returns error if failed to convert fields", - value: publisher.Event{ - Content: beat.Event{ - Timestamp: ts, - Fields: mapstr.M{ - "field": wrong, - }, - }, - }, - expErr: "failed to convert event fields", - }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { @@ -165,71 +140,8 @@ func TestToShipperEvent(t *testing.T) { } } -func TestConvertMapStr(t *testing.T) { - cases := []struct { - name string - value mapstr.M - exp *messages.Value - expErr string - }{ - { - name: "nil returns nil", - exp: helpers.NewNullValue(), - }, - { - name: "empty map returns empty struct", - value: mapstr.M{}, - exp: protoStructValue(t, nil), - }, - { - name: "returns error when type is not supported", - value: mapstr.M{ - "key": struct{}{}, - }, - expErr: "invalid type: struct {}", - }, - { - name: "values are preserved", - value: mapstr.M{ - "key1": "string", - "key2": 42, - "key3": 42.2, - "key4": mapstr.M{ - "subkey1": "string", - "subkey2": mapstr.M{ - "subsubkey1": "string", - }, - }, - }, - exp: protoStructValue(t, map[string]interface{}{ - "key1": "string", - "key2": 42, - "key3": 42.2, - "key4": map[string]interface{}{ - "subkey1": "string", - "subkey2": map[string]interface{}{ - "subsubkey1": "string", - }, - }, - }), - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - converted, err := convertMapStr(tc.value) - if tc.expErr != "" { - require.Error(t, err) - require.Contains(t, err.Error(), tc.expErr) - require.Nil(t, converted) - return - } - requireEqualProto(t, tc.exp, converted) - }) - } -} - func TestPublish(t *testing.T) { + //logp.DevelopmentSetup() events := []beat.Event{ { Timestamp: time.Now(), @@ -238,8 +150,8 @@ func TestPublish(t *testing.T) { }, { Timestamp: time.Now(), - Meta: mapstr.M{"event": "second", "dropped": true, "invalid": struct{}{}}, // this event is always dropped - Fields: mapstr.M{"c": "d"}, + Meta: nil, // see failMarshal() + Fields: mapstr.M{"a": "b"}, }, { Timestamp: time.Now(), @@ -249,23 +161,28 @@ func TestPublish(t *testing.T) { } cases := []struct { - name string - events []beat.Event - expSignals []outest.BatchSignal - serverError error - expError string - qSize int - acceptedCount uint32 + name string + events []beat.Event + expSignals []outest.BatchSignal + serverError error + expError string + // note: this sets the queue size used by the mock output + // if the mock shipper receives more than this count of events, the test will fail + qSize int + observerExpected *TestObserver + marshalMethod func(e publisher.Event) (*messages.Event, error) }{ { - name: "sends a batch excluding dropped", - events: events[:1], + name: "sends a batch", + events: events, + marshalMethod: toShipperEvent, expSignals: []outest.BatchSignal{ { Tag: outest.BatchACK, }, }, - qSize: 2, + qSize: 3, + observerExpected: &TestObserver{batch: 3, acked: 3}, }, { name: "retries not accepted events", @@ -275,8 +192,9 @@ func TestPublish(t *testing.T) { Tag: outest.BatchACK, }, }, - qSize: 2, - acceptedCount: 1, // we'll enforce 2 `PublishEvents` requests + marshalMethod: failMarshal, // emulate a dropped event + qSize: 3, + observerExpected: &TestObserver{batch: 3, dropped: 1, acked: 2}, }, { name: "cancels the batch if server error", @@ -286,9 +204,11 @@ func TestPublish(t *testing.T) { Tag: outest.BatchCancelled, }, }, - qSize: 3, - serverError: errors.New("some error"), - expError: "failed to publish the batch to the shipper, none of the 2 events were accepted", + marshalMethod: toShipperEvent, + qSize: 3, + observerExpected: &TestObserver{cancelled: 3, batch: 3}, + serverError: errors.New("some error"), + expError: "failed to publish the batch to the shipper, none of the 3 events were accepted", }, } @@ -297,6 +217,9 @@ func TestPublish(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + if tc.marshalMethod != nil { + shipperProcessor = tc.marshalMethod + } ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -307,8 +230,9 @@ func TestPublish(t *testing.T) { "server": addr, }) require.NoError(t, err) + observer := &TestObserver{} - client := createShipperClient(t, cfg) + client := createShipperClient(t, cfg, observer) batch := outest.NewBatch(tc.events...) @@ -327,8 +251,13 @@ func TestPublish(t *testing.T) { return reflect.DeepEqual(tc.expSignals, batch.Signals) }, 100*time.Millisecond, 10*time.Millisecond) require.Equal(t, tc.expSignals, batch.Signals) + if tc.observerExpected != nil { + require.Equal(t, tc.observerExpected, observer) + } }) } + // reset marshaler + shipperProcessor = toShipperEvent t.Run("cancels the batch when a different server responds", func(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) @@ -346,8 +275,8 @@ func TestPublish(t *testing.T) { }, }) require.NoError(t, err) - - client := createShipperClient(t, cfg) + observer := &TestObserver{} + client := createShipperClient(t, cfg, observer) // Should accept the batch and put it to the pending list batch := outest.NewBatch(events...) @@ -379,7 +308,7 @@ func TestPublish(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - addr, producer, stop := runServer(t, 6, nil, "localhost:0") + addr, producer, stop := runServer(t, 9, nil, "localhost:0") defer stop() cfg, err := config.NewConfigFrom(map[string]interface{}{ @@ -391,8 +320,9 @@ func TestPublish(t *testing.T) { }, }) require.NoError(t, err) - - client := createShipperClient(t, cfg) + observer := &TestObserver{} + expectedObserver := &TestObserver{batch: 9, acked: 9} + client := createShipperClient(t, cfg, observer) // Should accept the batch and put it to the pending list batch1 := outest.NewBatch(events...) @@ -413,7 +343,7 @@ func TestPublish(t *testing.T) { }, } - producer.Persist(6) // 2 events per batch, 3 batches + producer.Persist(9) // 2 events per batch, 3 batches assert.Eventually(t, func() bool { // there is a background routine that checks acknowledgments, @@ -425,6 +355,7 @@ func TestPublish(t *testing.T) { require.Equal(t, expSignals, batch1.Signals, "batch1") require.Equal(t, expSignals, batch2.Signals, "batch2") require.Equal(t, expSignals, batch3.Signals, "batch3") + require.Equal(t, expectedObserver, observer) }) } @@ -504,11 +435,11 @@ func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAdd return actualAddr, producer, stop } -func createShipperClient(t *testing.T, cfg *config.C) outputs.NetworkClient { +func createShipperClient(t *testing.T, cfg *config.C, observer outputs.Observer) outputs.NetworkClient { group, err := makeShipper( nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, - outputs.NewNilObserver(), + observer, cfg, ) require.NoError(t, err) @@ -527,10 +458,6 @@ func protoStruct(t *testing.T, values map[string]interface{}) *messages.Struct { require.NoError(t, err) return s } -func protoStructValue(t *testing.T, values map[string]interface{}) *messages.Value { - s := protoStruct(t, values) - return helpers.NewStructValue(s) -} func requireEqualProto(t *testing.T, expected, actual proto.Message) { require.True( @@ -539,3 +466,42 @@ func requireEqualProto(t *testing.T, expected, actual proto.Message) { fmt.Sprintf("These two protobuf messages are not equal:\nexpected: %v\nactual: %v", expected, actual), ) } + +// emulates the toShipperEvent, but looks for a nil meta field, and throws an error +func failMarshal(e publisher.Event) (*messages.Event, error) { + if e.Content.Meta == nil { + return nil, fmt.Errorf("nil meta field") + } + return toShipperEvent(e) +} + +// mock test observer for tracking events + +type TestObserver struct { + acked int + dropped int + cancelled int + batch int + duplicate int + failed int + + writeError error + readError error + + writeBytes int + readBytes int + + errTooMany int +} + +func (to *TestObserver) NewBatch(batch int) { to.batch += batch } +func (to *TestObserver) Acked(acked int) { to.acked += acked } +func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate } +func (to *TestObserver) Failed(failed int) { to.failed += failed } +func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped } +func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled } +func (to *TestObserver) WriteError(we error) { to.writeError = we } +func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb } +func (to *TestObserver) ReadError(re error) { to.readError = re } +func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb } +func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }