Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azure-event hub: improve error handling and stop input if the event has not been processed correctly #16215

Merged
merged 20 commits into from
Mar 24, 2020
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add ingress nginx controller fileset {pull}16197[16197]
- move create-[module,fileset,fields] to mage and enable in x-pack/filebeat {pull}15836[15836]
- Add ECS tls and categorization fields to apache module. {issue}16032[16032] {pull}16121[16121]
- Work on e2e ACK's for the azure-eventhub input {issue}15671[15671] {pull}16215[16215]
- Add MQTT input. {issue}15602[15602] {pull}16204[16204]
- Add ECS categorization fields to activemq module. {issue}16151[16151] {pull}16201[16201]
- Add a TLS test and more debug output to httpjson input {pull}16315[16315]
Expand Down
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ License type (autodetected): MIT

--------------------------------------------------------------------
Dependency: github.com/Azure/azure-event-hubs-go/v3
Version: v3.1.0
Version: v3.1.2
License type (autodetected): MIT
./vendor/github.com/Azure/azure-event-hubs-go/v3/LICENSE:
--------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee // indirect
code.cloudfoundry.org/go-loggregator v7.4.0+incompatible
code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.1.0
github.com/Azure/azure-event-hubs-go/v3 v3.1.2
github.com/Azure/azure-sdk-for-go v37.1.0+incompatible
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a/go.mod h1:tkZo8
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-amqp-common-go/v3 v3.0.0 h1:j9tjcwhypb/jek3raNrwlCIl7iKQYOug7CLpSyBBodc=
github.com/Azure/azure-amqp-common-go/v3 v3.0.0/go.mod h1:SY08giD/XbhTz07tJdpw1SoxQXHPN30+DI3Z04SYqyg=
github.com/Azure/azure-event-hubs-go/v3 v3.1.0 h1:j+/WXzke3PTRu5gAgSpWgWJVfpwIyaedIqqgdgkjAe0=
github.com/Azure/azure-event-hubs-go/v3 v3.1.0/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k=
github.com/Azure/azure-event-hubs-go/v3 v3.1.2 h1:S/NjCZ1Z2R4rHJd2Hbbad6rIhxJ4lZZebKTsKHweX4A=
github.com/Azure/azure-event-hubs-go/v3 v3.1.2/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k=
github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg=
github.com/Azure/azure-pipeline-go v0.2.1 h1:OLBdZJ3yvOn2MezlWvbrBMTEUQC72zAftRZOMdj5HYo=
Expand Down Expand Up @@ -107,8 +107,8 @@ github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 h1:7rj9qZ63knnVo2Z
github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20/go.mod h1:cI59GRkC2FRaFYtgbYEqMlgnnfvAwXzjojyZKXwklNg=
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43 h1:WFwa9pqou0Nb4DdfBOyaBTH0GqLE74Qwdf61E7ITHwQ=
github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43/go.mod h1:tJPYQG4mnMeUtQvQKNkbsFrnmZOg59Qnf8CcctFv5v4=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5 h1:nkZ9axP+MvUFCu8JRN/MCY+DmTfs6lY7hE0QnJbxSdI=
github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5/go.mod h1:T7PbCXFs94rrTttyxjbyT5+/1V8T2TYDejxUfHJjw1Y=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down Expand Up @@ -730,8 +730,8 @@ golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down
8 changes: 7 additions & 1 deletion vendor/github.com/Azure/azure-event-hubs-go/v3/changelog.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 28 additions & 22 deletions vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/Azure/azure-event-hubs-go/v3/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/Azure/azure-event-hubs-go/v3/version.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ github.com/Azure/azure-amqp-common-go/v3/internal/tracing
github.com/Azure/azure-amqp-common-go/v3/rpc
github.com/Azure/azure-amqp-common-go/v3/sas
github.com/Azure/azure-amqp-common-go/v3/uuid
# github.com/Azure/azure-event-hubs-go/v3 v3.1.0
# github.com/Azure/azure-event-hubs-go/v3 v3.1.2
github.com/Azure/azure-event-hubs-go/v3
github.com/Azure/azure-event-hubs-go/v3/atom
github.com/Azure/azure-event-hubs-go/v3/eph
Expand Down
10 changes: 9 additions & 1 deletion x-pack/filebeat/input/azureeventhub/eph.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azureeventhub

import (
"context"
"errors"
"fmt"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
Expand Down Expand Up @@ -48,8 +49,15 @@ func (a *azureInput) runWithEPH() error {
// register a message handler -- many can be registered
handlerID, err := a.processor.RegisterHandler(a.workerCtx,
func(c context.Context, e *eventhub.Event) error {
var onEventErr error
// partitionID is not yet mapped in the azure-eventhub sdk
return a.processEvents(e, "")
ok := a.processEvents(e, "")
if !ok {
onEventErr = errors.New("OnEvent function returned false. Stopping input worker")
a.log.Debug(onEventErr.Error())
a.Stop()
}
return onEventErr
})
if err != nil {
return err
Expand Down
34 changes: 17 additions & 17 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type azureInput struct {
workerWg sync.WaitGroup // waits on worker goroutine.
processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option
hub *eventhub.Hub // hub will be assigned
ackChannel chan int
}

const (
Expand All @@ -66,14 +67,6 @@ func NewInput(
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrapf(err, "reading %s input config", inputName)
}
out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
})
if err != nil {
return nil, err
}

inputCtx, cancelInputCtx := context.WithCancel(context.Background())
go func() {
Expand All @@ -88,17 +81,24 @@ func NewInput(
// to be recreated with each restart.
workerCtx, workerCancel := context.WithCancel(inputCtx)

input := &azureInput{
in := &azureInput{
config: config,
log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", config.ConnectionString),
outlet: out,
context: inputContext,
workerCtx: workerCtx,
workerCancel: workerCancel,
}

input.log.Infof("Initialized %s input.", inputName)
return input, nil
out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
})
if err != nil {
return nil, err
}
in.outlet = out
in.log.Infof("Initialized %s input.", inputName)
return in, nil
}

// Run starts the input worker then returns. Only the first invocation
Expand Down Expand Up @@ -176,7 +176,7 @@ func (a *azureInput) Wait() {
a.Stop()
}

func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) error {
func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool {
timestamp := time.Now()
azure := common.MapStr{
// partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable
Expand All @@ -195,12 +195,13 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) er
"message": msg,
"azure": azure,
},
Private: event.Data,
})
if !ok {
return errors.New("event has not been sent")
return ok
}
}
return nil
return true
}

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
Expand All @@ -209,7 +210,6 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
err := json.Unmarshal(bMessage, &obj)
if err != nil {
a.log.Errorw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "error", err)
return []string{string(bMessage)}
}
var messages []string
if len(obj[expandEventListFromField]) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/azureeventhub/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ func TestProcessEvents(t *testing.T) {
Data: []byte(msg),
SystemProperties: &properties,
}
err = input.processEvents(&ev, "0")
if err != nil {
t.Fatal(err)
ok := input.processEvents(&ev, "0")
if !ok {
t.Fatal("OnEvent function returned false")
}
assert.Equal(t, len(o.Events), 1)
message, err := o.Events[0].Fields.GetValue("message")
Expand Down