diff --git a/.chloggen/webhookevent_client.yaml b/.chloggen/webhookevent_client.yaml new file mode 100755 index 000000000000..b760332ffca4 --- /dev/null +++ b/.chloggen/webhookevent_client.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: webhookeventreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "first pass implementing the components internals." + +# One or more tracking issues related to the change +issues: [18101] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/webhookeventreceiver/config.go b/receiver/webhookeventreceiver/config.go index b7d1753dd057..e272abfb481e 100644 --- a/receiver/webhookeventreceiver/config.go +++ b/receiver/webhookeventreceiver/config.go @@ -4,10 +4,60 @@ package webhookeventreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver" import ( + "errors" + "time" + "go.opentelemetry.io/collector/config/confighttp" + "go.uber.org/multierr" +) + +var ( + errMissingEndpointFromConfig = errors.New("missing receiver server endpoint from config") + errReadTimeoutExceedsMaxValue = errors.New("The duration specified for read_timeout exceeds the maximum allowed value of 10s") + errWriteTimeoutExceedsMaxValue = errors.New("The duration specified for write_timeout exceeds the maximum allowed value of 10s") ) // Config defines configuration for the Generic Webhook receiver. type Config struct { confighttp.HTTPServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + ReadTimeout string `mapstructure:"read_timeout"` // wait time for reading request headers in ms. Default is twenty seconds. + WriteTimeout string `mapstructure:"write_timeout"` // wait time for writing request response in ms. Default is twenty seconds. + Path string `mapstructure:"path"` // path for data collection. Default is :/services/collector + HealthPath string `mapstructure:"health_path"` // path for health check api. Default is /services/collector/health +} + +func (cfg *Config) Validate() error { + var errs error + + maxReadWriteTimeout, _ := time.ParseDuration("10s") + + if cfg.HTTPServerSettings.Endpoint == "" { + errs = multierr.Append(errs, errMissingEndpointFromConfig) + } + + // If a user defines a custom read/write timeout there is a maximum value + // of 10s imposed here. + if cfg.ReadTimeout != "" { + readTimeout, err := time.ParseDuration(cfg.ReadTimeout) + if err != nil { + errs = multierr.Append(errs, err) + } + + if readTimeout > maxReadWriteTimeout { + errs = multierr.Append(errs, errReadTimeoutExceedsMaxValue) + } + } + + if cfg.WriteTimeout != "" { + writeTimeout, err := time.ParseDuration(cfg.WriteTimeout) + if err != nil { + errs = multierr.Append(errs, err) + } + + if writeTimeout > maxReadWriteTimeout { + errs = multierr.Append(errs, errWriteTimeoutExceedsMaxValue) + } + } + + return errs } diff --git a/receiver/webhookeventreceiver/config_test.go b/receiver/webhookeventreceiver/config_test.go index efbf302f63a5..3f1107b98267 100644 --- a/receiver/webhookeventreceiver/config_test.go +++ b/receiver/webhookeventreceiver/config_test.go @@ -1,4 +1,111 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package webhookeventreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/genericwebhookreceiver" +package webhookeventreceiver + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.uber.org/multierr" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver/internal/metadata" +) + +// only one validate check so far +func TestValidateConfig(t *testing.T) { + t.Parallel() + + var errs error + errs = multierr.Append(errs, errMissingEndpointFromConfig) + errs = multierr.Append(errs, errReadTimeoutExceedsMaxValue) + errs = multierr.Append(errs, errWriteTimeoutExceedsMaxValue) + + tests := []struct { + desc string + expect error + conf Config + }{ + { + desc: "Missing valid endpoint", + expect: errMissingEndpointFromConfig, + conf: Config{ + HTTPServerSettings: confighttp.HTTPServerSettings{ + Endpoint: "", + }, + }, + }, + { + desc: "ReadTimeout exceeds maximum value", + expect: errReadTimeoutExceedsMaxValue, + conf: Config{ + HTTPServerSettings: confighttp.HTTPServerSettings{ + Endpoint: "localhost:0", + }, + ReadTimeout: "14s", + }, + }, + { + desc: "WriteTimeout exceeds maximum value", + expect: errWriteTimeoutExceedsMaxValue, + conf: Config{ + HTTPServerSettings: confighttp.HTTPServerSettings{ + Endpoint: "localhost:0", + }, + WriteTimeout: "14s", + }, + }, + { + desc: "Multiple invalid configs", + expect: errs, + conf: Config{ + HTTPServerSettings: confighttp.HTTPServerSettings{ + Endpoint: "", + }, + WriteTimeout: "14s", + ReadTimeout: "15s", + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + err := test.conf.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), test.expect.Error()) + }) + } +} + +func TestLoadConfig(t *testing.T) { + t.Parallel() + + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + // LoadConf includes the TypeStr which NewFactory does not set + id := component.NewIDWithName(metadata.Type, "valid_config") + cmNoStr, err := cm.Sub(id.String()) + require.NoError(t, err) + + expect := &Config{ + HTTPServerSettings: confighttp.HTTPServerSettings{ + Endpoint: "localhost:8080", + }, + ReadTimeout: "500ms", + WriteTimeout: "500ms", + Path: "some/path", + HealthPath: "health/path", + } + + // create expected config + factory := NewFactory() + conf := factory.CreateDefaultConfig() + require.NoError(t, component.UnmarshalConfig(cmNoStr, conf)) + require.NoError(t, component.ValidateConfig(conf)) + + require.Equal(t, expect, conf) +} diff --git a/receiver/webhookeventreceiver/factory.go b/receiver/webhookeventreceiver/factory.go index a79c68141c7f..2f50bd3bedee 100644 --- a/receiver/webhookeventreceiver/factory.go +++ b/receiver/webhookeventreceiver/factory.go @@ -7,7 +7,6 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" @@ -15,12 +14,19 @@ import ( ) const ( + // might add this later, for now I wish to require a valid + // endpoint to be declared by the user. // Default endpoints to bind to. - defaultEndpoint = ":8080" + // defaultEndpoint = "localhost:8080" + scopeLogName = "otlp/" + metadata.Type + defaultReadTimeout = "500ms" + defaultWriteTimeout = "500ms" + defaultPath = "/events" + defaultHealthPath = "/health_check" ) // NewFactory creates a factory for Generic Webhook Receiver. -func NewFactory() component.Factory { +func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, @@ -31,9 +37,10 @@ func NewFactory() component.Factory { // Default configuration for the generic webhook receiver func createDefaultConfig() component.Config { return &Config{ - HTTPServerSettings: confighttp.HTTPServerSettings{ - Endpoint: defaultEndpoint, - }, + Path: defaultPath, + HealthPath: defaultHealthPath, + ReadTimeout: defaultReadTimeout, + WriteTimeout: defaultWriteTimeout, } } @@ -45,6 +52,5 @@ func createLogsReceiver( consumer consumer.Logs, ) (receiver.Logs, error) { conf := cfg.(*Config) - return newLogsReceiver(params, *conf, consumer) } diff --git a/receiver/webhookeventreceiver/factory_test.go b/receiver/webhookeventreceiver/factory_test.go index bb35c313f9c6..526556e1e6e7 100644 --- a/receiver/webhookeventreceiver/factory_test.go +++ b/receiver/webhookeventreceiver/factory_test.go @@ -2,3 +2,70 @@ // SPDX-License-Identifier: Apache-2.0 package webhookeventreceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestFactoryCreate(t *testing.T) { + factory := NewFactory() + require.EqualValues(t, "webhookevent", factory.Type()) +} + +func TestDefaultConfig(t *testing.T) { + cfg := createDefaultConfig() + require.NotNil(t, cfg, "Failed to create default configuration") +} + +func TestCreateLogsReceiver(t *testing.T) { + tests := []struct { + desc string + run func(t *testing.T) + }{ + { + desc: "Defaults with valid inputs", + run: func(t *testing.T) { + t.Parallel() + + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:8080" + require.NoError(t, cfg.Validate(), "error validating default config") + + _, err := createLogsReceiver( + context.Background(), + receivertest.NewNopCreateSettings(), + cfg, + consumertest.NewNop(), + ) + require.NoError(t, err, "failed to create logs receiver") + }, + }, + { + desc: "Missing consumer", + run: func(t *testing.T) { + t.Parallel() + + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:8080" + require.NoError(t, cfg.Validate(), "error validating default config") + + _, err := createLogsReceiver( + context.Background(), + receivertest.NewNopCreateSettings(), + cfg, + nil, + ) + require.Error(t, err, "Succeeded in creating a receiver without a consumer") + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, test.run) + } +} diff --git a/receiver/webhookeventreceiver/go.mod b/receiver/webhookeventreceiver/go.mod index 9628f7e9468e..a0b381f43af5 100644 --- a/receiver/webhookeventreceiver/go.mod +++ b/receiver/webhookeventreceiver/go.mod @@ -3,13 +3,21 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhoo go 1.19 require ( + github.com/json-iterator/go v1.1.12 + github.com/julienschmidt/httprouter v1.3.0 + github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector v0.79.0 go.opentelemetry.io/collector/component v0.79.0 + go.opentelemetry.io/collector/confmap v0.79.0 go.opentelemetry.io/collector/consumer v0.79.0 + go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 go.opentelemetry.io/collector/receiver v0.79.0 + go.uber.org/multierr v1.11.0 + go.uber.org/zap v1.24.0 ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect @@ -17,7 +25,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.16.5 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -25,23 +32,23 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.9.0 // indirect - go.opentelemetry.io/collector/confmap v0.79.0 // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/collector/exporter v0.79.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect - go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.24.0 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect google.golang.org/grpc v1.55.0 // indirect google.golang.org/protobuf v1.30.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) retract ( diff --git a/receiver/webhookeventreceiver/go.sum b/receiver/webhookeventreceiver/go.sum index 6479849af9fd..938a03a76c21 100644 --- a/receiver/webhookeventreceiver/go.sum +++ b/receiver/webhookeventreceiver/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +contrib.go.opencensus.io/exporter/prometheus v0.4.2 h1:sqfsYl5GIY/L570iT+l93ehxaWJs2/OwXtiWwew3oAg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -24,10 +25,12 @@ github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAm github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -54,10 +57,12 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-ldap/ldap v3.0.2+incompatible/go.mod h1:qfd9rJvER9Q0/D/Sqn1DfHRoBp40uXYvFoEVrNEPqRc= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -70,6 +75,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -96,6 +103,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -152,6 +160,7 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -164,8 +173,10 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -175,6 +186,7 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -221,19 +233,25 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.15.1 h1:8tXpTmJbyH5lydzFPoxSIJ0J46jdh3tylbvM1xCv0LI= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= +github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0= github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= @@ -247,18 +265,26 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/collector v0.79.0 h1:Lra7U0ilMor1g5WVkO3YZ0kZYsvzAtGN+Uq+CmC96JY= go.opentelemetry.io/collector v0.79.0/go.mod h1:O2Vfwykphq9VqdATZiAypjnJMS3WFBXwFSe/0ujo38Q= go.opentelemetry.io/collector/component v0.79.0 h1:ZKLJ4qa0AngmyGp1RQBJgl6OIP6mxdfrVpbz09h/W34= @@ -267,6 +293,8 @@ go.opentelemetry.io/collector/confmap v0.79.0 h1:a4XVde3lLP81BiSbt8AzVD6pvQBX8Yk go.opentelemetry.io/collector/confmap v0.79.0/go.mod h1:cKr2c7lVtEJCuMOncUPlcROJBbTFaHiPjYp1Y8RbL+Q= go.opentelemetry.io/collector/consumer v0.79.0 h1:V/4PCvbTw2Bt+lYb/ogac0g/nCCb3oKnmz+jM3t5Dyk= go.opentelemetry.io/collector/consumer v0.79.0/go.mod h1:VfqIyUI5K20zXx3mfVN+skmA+V3sV5fNorJ5TaIOj/U= +go.opentelemetry.io/collector/exporter v0.79.0 h1:PxhKgWf1AkZvN1PjiJT5xiO+pKZA9Y4fyuMs5aNFuEA= +go.opentelemetry.io/collector/exporter v0.79.0/go.mod h1:qlXiqnOUeHelpAwk03f8nB5+91UIqlA7udSBsj9bJ3M= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 h1:pSO81lfikGEgRXHepmOGy2o6WWCly427UJCgMJC5c8g= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012/go.mod h1:/kVAsGUCyJXIDSgHftCN63QiwAEVHRLX2Kh/S+dqgHY= go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 h1:R+cfEUMyLn9Q1QknyQ4QU77pbfc1aJKYEXFHtnwSbCg= @@ -277,8 +305,11 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/exporters/prometheus v0.39.0 h1:whAaiHxOatgtKd+w0dOi//1KUxj3KoPINZdtDaDj3IA= go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= +go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -319,6 +350,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= @@ -411,6 +443,7 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= @@ -431,6 +464,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -439,6 +473,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/receiver/webhookeventreceiver/internal/metadata/generated_status.go b/receiver/webhookeventreceiver/internal/metadata/generated_status.go index bb7712e97b3d..27a3adfaf2d9 100644 --- a/receiver/webhookeventreceiver/internal/metadata/generated_status.go +++ b/receiver/webhookeventreceiver/internal/metadata/generated_status.go @@ -7,6 +7,6 @@ import ( ) const ( - Type = "generic_webhook" + Type = "webhookevent" LogsStability = component.StabilityLevelDevelopment ) diff --git a/receiver/webhookeventreceiver/metadata.yaml b/receiver/webhookeventreceiver/metadata.yaml index 1adca297e89c..1611b0d9dca7 100644 --- a/receiver/webhookeventreceiver/metadata.yaml +++ b/receiver/webhookeventreceiver/metadata.yaml @@ -1,8 +1,8 @@ -type: generic_webhook +type: webhookevent status: class: receiver stability: development: [logs] - distributions: + distributions: diff --git a/receiver/webhookeventreceiver/receiver.go b/receiver/webhookeventreceiver/receiver.go index 2055ad6e5623..2cb5140dab1a 100644 --- a/receiver/webhookeventreceiver/receiver.go +++ b/receiver/webhookeventreceiver/receiver.go @@ -4,12 +4,227 @@ package webhookeventreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver" import ( + "bufio" + "compress/gzip" + "context" "errors" + "io" + "net/http" + "sync" + "time" + jsoniter "github.com/json-iterator/go" + "github.com/julienschmidt/httprouter" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver/internal/metadata" +) + +var ( + errNilLogsConsumer = errors.New("missing a logs consumer") + errMissingEndpoint = errors.New("missing a receiver endpoint") + errInvalidRequestMethod = errors.New("invalid method. Valid method is POST") + errInvalidEncodingType = errors.New("invalid encoding type") + errEmptyResponseBody = errors.New("request body content length is zero") ) -func newLogsReceiver(_ receiver.CreateSettings, _ Config, _ consumer.Logs) (receiver.Logs, error) { - return nil, errors.New("unimplemented") +const healthyResponse = `{"text": "Webhookevent receiver is healthy"}` + +type eventReceiver struct { + settings receiver.CreateSettings + cfg *Config + logConsumer consumer.Logs + server *http.Server + shutdownWG sync.WaitGroup + obsrecv *obsreport.Receiver + gzipPool *sync.Pool +} + +func newLogsReceiver(params receiver.CreateSettings, cfg Config, consumer consumer.Logs) (receiver.Logs, error) { + if consumer == nil { + return nil, errNilLogsConsumer + } + + if cfg.Endpoint == "" { + return nil, errMissingEndpoint + } + + transport := "http" + if cfg.TLSSetting != nil { + transport = "https" + } + + obsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: params.ID, + Transport: transport, + ReceiverCreateSettings: params, + }) + + if err != nil { + return nil, err + } + + // create eventReceiver instance + er := &eventReceiver{ + settings: params, + cfg: &cfg, + logConsumer: consumer, + obsrecv: obsrecv, + gzipPool: &sync.Pool{New: func() interface{} { return new(gzip.Reader) }}, + } + + return er, nil +} + +// Start function manages receiver startup tasks. part of the receiver.Logs interface. +func (er *eventReceiver) Start(_ context.Context, host component.Host) error { + // noop if not nil. if start has not been called before these values should be nil. + if er.server != nil && er.server.Handler != nil { + return nil + } + + // create listener from config + ln, err := er.cfg.HTTPServerSettings.ToListener() + if err != nil { + return err + } + + // set up router. + router := httprouter.New() + + router.POST(er.cfg.Path, er.handleReq) + router.GET(er.cfg.HealthPath, er.handleHealthCheck) + + // webhook server standup and configuration + er.server, err = er.cfg.HTTPServerSettings.ToServer(host, er.settings.TelemetrySettings, router) + if err != nil { + return err + } + + readTimeout, err := time.ParseDuration(er.cfg.ReadTimeout) + if err != nil { + return err + } + + writeTimeout, err := time.ParseDuration(er.cfg.WriteTimeout) + if err != nil { + return err + } + + // set timeouts + er.server.ReadHeaderTimeout = readTimeout + er.server.WriteTimeout = writeTimeout + + // shutdown + er.shutdownWG.Add(1) + go func() { + defer er.shutdownWG.Done() + if errHTTP := er.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() + + return nil +} + +// Shutdown function manages receiver shutdown tasks. part of the receiver.Logs interface. +func (er *eventReceiver) Shutdown(_ context.Context) error { + err := er.server.Close() + er.shutdownWG.Wait() + return err +} + +// handleReq handles incoming request from webhook. On success returns a 200 response code to the webhook +func (er *eventReceiver) handleReq(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + ctx := r.Context() + ctx = er.obsrecv.StartLogsOp(ctx) + + if r.Method != http.MethodPost { + er.failBadReq(ctx, w, http.StatusBadRequest, errInvalidRequestMethod) + return + } + + encoding := r.Header.Get("Content-Encoding") + // only support gzip if encoding header is set. + if encoding != "" && encoding != "gzip" { + er.failBadReq(ctx, w, http.StatusUnsupportedMediaType, errInvalidEncodingType) + return + } + + if r.ContentLength == 0 { + er.obsrecv.EndLogsOp(ctx, metadata.Type, 0, nil) + er.failBadReq(ctx, w, http.StatusBadRequest, errEmptyResponseBody) + } + + bodyReader := r.Body + // gzip encoded case + if encoding == "gzip" || encoding == "x-gzip" { + reader := er.gzipPool.Get().(*gzip.Reader) + err := reader.Reset(bodyReader) + + if err != nil { + er.failBadReq(ctx, w, http.StatusBadRequest, err) + _, _ = io.ReadAll(r.Body) + _ = r.Body.Close() + return + } + bodyReader = reader + defer er.gzipPool.Put(reader) + } + + // finish reading the body into a log + sc := bufio.NewScanner(bodyReader) + ld, numLogs := reqToLog(sc, r.URL.Query(), er.cfg, er.settings) + consumerErr := er.logConsumer.ConsumeLogs(ctx, ld) + + _ = bodyReader.Close() + + if consumerErr != nil { + er.failBadReq(ctx, w, http.StatusInternalServerError, consumerErr) + er.obsrecv.EndLogsOp(ctx, metadata.Type, numLogs, nil) + } else { + w.WriteHeader(http.StatusOK) + er.obsrecv.EndLogsOp(ctx, metadata.Type, numLogs, nil) + } +} + +// Simple healthcheck endpoint. +func (er *eventReceiver) handleHealthCheck(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + _, _ = w.Write([]byte(healthyResponse)) +} + +// write response on a failed/bad request. Generates a small json body based on the thrown by +// the handle func and the appropriate http status code. many webhooks will either log these responses or +// notify webhook users should a none 2xx code be detected. +func (er *eventReceiver) failBadReq(_ context.Context, + w http.ResponseWriter, + httpStatusCode int, + err error) { + jsonResp, err := jsoniter.Marshal(err.Error()) + if err != nil { + er.settings.Logger.Warn("failed to marshall error to json") + } + + // write response to webhook + w.WriteHeader(httpStatusCode) + if len(jsonResp) > 0 { + w.Header().Add("Content-Type", "application/json") + _, err = w.Write(jsonResp) + if err != nil { + er.settings.Logger.Warn("failed to write json response", zap.Error(err)) + } + } + + // log bad webhook request if debug is enabled + if er.settings.Logger.Core().Enabled(zap.DebugLevel) { + msg := string(jsonResp) + er.settings.Logger.Debug(msg, zap.Int("http_status_code", httpStatusCode), zap.Error(err)) + } } diff --git a/receiver/webhookeventreceiver/receiver_test.go b/receiver/webhookeventreceiver/receiver_test.go index bb35c313f9c6..d20a07d0e1c1 100644 --- a/receiver/webhookeventreceiver/receiver_test.go +++ b/receiver/webhookeventreceiver/receiver_test.go @@ -2,3 +2,228 @@ // SPDX-License-Identifier: Apache-2.0 package webhookeventreceiver + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/julienschmidt/httprouter" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestCreateNewLogReceiver(t *testing.T) { + defaultConfig := createDefaultConfig().(*Config) + + tests := []struct { + desc string + cfg Config + consumer consumer.Logs + err error + }{ + { + desc: "Default config fails (no endpoint)", + cfg: *defaultConfig, + consumer: consumertest.NewNop(), + err: errMissingEndpoint, + }, + { + desc: "User defined config success", + cfg: Config{ + HTTPServerSettings: confighttp.HTTPServerSettings{ + Endpoint: "localhost:8080", + }, + ReadTimeout: "543", + WriteTimeout: "210", + Path: "/event", + HealthPath: "/health", + }, + consumer: consumertest.NewNop(), + }, + { + desc: "Missing consumer fails", + cfg: *defaultConfig, + err: errNilLogsConsumer, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + rec, err := newLogsReceiver(receivertest.NewNopCreateSettings(), test.cfg, test.consumer) + if test.err == nil { + require.NotNil(t, rec) + } else { + require.ErrorIs(t, err, test.err) + require.Nil(t, rec) + } + }) + } +} + +// these requests should all succeed +func TestHandleReq(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" + + tests := []struct { + desc string + cfg Config + req *http.Request + }{ + { + desc: "Good request", + cfg: *cfg, + req: httptest.NewRequest("POST", "http://localhost/events", strings.NewReader("test")), + }, + { + desc: "Good request with gzip", + cfg: *cfg, + req: func() *http.Request { + // create gzip encoded message + msgStruct := struct { + Field1 string + Field2 int + Field3 string + }{ + Field1: "hello", + Field2: 42, + Field3: "world", + } + msgJSON, err := json.Marshal(msgStruct) + require.NoError(t, err, "failed to marshall message into valid json") + + var msg bytes.Buffer + gzipWriter := gzip.NewWriter(&msg) + _, err = gzipWriter.Write(msgJSON) + require.NoError(t, err, "Gzip writer failed") + + req := httptest.NewRequest("POST", "http://localhost/events", &msg) + return req + }(), + }, + { + desc: "Multiple logs", + cfg: *cfg, + req: httptest.NewRequest("POST", "http://localhost/events", strings.NewReader("log1\nlog2")), + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + consumer := consumertest.NewNop() + receiver, err := newLogsReceiver(receivertest.NewNopCreateSettings(), test.cfg, consumer) + require.NoError(t, err, "Failed to create receiver") + + r := receiver.(*eventReceiver) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Failed to start receiver") + defer func() { + require.NoError(t, r.Shutdown(context.Background()), "Failed to shutdown receiver") + }() + + w := httptest.NewRecorder() + r.handleReq(w, test.req, httprouter.ParamsFromContext(context.Background())) + + response := w.Result() + _, err = io.ReadAll(response.Body) + require.NoError(t, err, "Failed to read message body") + + require.Equal(t, http.StatusOK, response.StatusCode) + }) + } +} + +// failure in its many forms +func TestFailedReq(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" + + tests := []struct { + desc string + cfg Config + req *http.Request + status int + }{ + { + desc: "Invalid method", + cfg: *cfg, + req: httptest.NewRequest("GET", "http://localhost/events", nil), + status: http.StatusBadRequest, + }, + { + desc: "Empty body", + cfg: *cfg, + req: httptest.NewRequest("POST", "http://localhost/events", strings.NewReader("")), + status: http.StatusBadRequest, + }, + { + desc: "Invalid encoding", + cfg: *cfg, + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost/events", strings.NewReader("test")) + req.Header.Set("Content-Encoding", "glizzy") + return req + }(), + status: http.StatusUnsupportedMediaType, + }, + { + desc: "Valid content encoding header invalid data", + cfg: *cfg, + req: func() *http.Request { + req := httptest.NewRequest("POST", "http://localhost/events", strings.NewReader("notzipped")) + req.Header.Set("Content-Encoding", "gzip") + return req + }(), + status: http.StatusBadRequest, + }, + } + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + consumer := consumertest.NewNop() + receiver, err := newLogsReceiver(receivertest.NewNopCreateSettings(), test.cfg, consumer) + require.NoError(t, err, "Failed to create receiver") + + r := receiver.(*eventReceiver) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Failed to start receiver") + defer func() { + require.NoError(t, r.Shutdown(context.Background()), "Failed to shutdown receiver") + }() + + w := httptest.NewRecorder() + r.handleReq(w, test.req, httprouter.ParamsFromContext(context.Background())) + + response := w.Result() + require.Equal(t, test.status, response.StatusCode) + }) + } +} + +func TestHealthCheck(t *testing.T) { + defaultConfig := createDefaultConfig().(*Config) + defaultConfig.Endpoint = "localhost:0" + consumer := consumertest.NewNop() + receiver, err := newLogsReceiver(receivertest.NewNopCreateSettings(), *defaultConfig, consumer) + require.NoError(t, err, "failed to create receiver") + + r := receiver.(*eventReceiver) + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "failed to start receiver") + defer func() { + require.NoError(t, r.Shutdown(context.Background()), "failed to shutdown revceiver") + }() + + w := httptest.NewRecorder() + r.handleHealthCheck(w, httptest.NewRequest("GET", "http://localhost/health", nil), httprouter.ParamsFromContext(context.Background())) + + response := w.Result() + require.Equal(t, http.StatusOK, response.StatusCode) +} diff --git a/receiver/webhookeventreceiver/req_to_log.go b/receiver/webhookeventreceiver/req_to_log.go new file mode 100644 index 000000000000..222621f9c263 --- /dev/null +++ b/receiver/webhookeventreceiver/req_to_log.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package webhookeventreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver" + +import ( + "bufio" + "net/url" + + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver/internal/metadata" +) + +func reqToLog(sc *bufio.Scanner, + query url.Values, + _ *Config, + settings receiver.CreateSettings) (plog.Logs, int) { + log := plog.NewLogs() + resourceLog := log.ResourceLogs().AppendEmpty() + appendMetadata(resourceLog, query) + scopeLog := resourceLog.ScopeLogs().AppendEmpty() + + scopeLog.Scope().SetName(scopeLogName) + scopeLog.Scope().SetVersion(settings.BuildInfo.Version) + scopeLog.Scope().Attributes().PutStr("source", settings.ID.String()) + scopeLog.Scope().Attributes().PutStr("receiver", metadata.Type) + + for sc.Scan() { + logRecord := scopeLog.LogRecords().AppendEmpty() + line := sc.Text() + logRecord.Body().SetStr(line) + } + + return log, scopeLog.LogRecords().Len() +} + +// append query parameters and webhook source as resource attributes +func appendMetadata(resourceLog plog.ResourceLogs, query url.Values) { + for k := range query { + if query.Get(k) != "" { + resourceLog.Resource().Attributes().PutStr(k, query.Get(k)) + } + } + +} diff --git a/receiver/webhookeventreceiver/req_to_log_test.go b/receiver/webhookeventreceiver/req_to_log_test.go new file mode 100644 index 000000000000..dd1b9f05c9ce --- /dev/null +++ b/receiver/webhookeventreceiver/req_to_log_test.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package webhookeventreceiver + +import ( + "bufio" + "bytes" + "io" + "log" + "net/url" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +func TestReqToLog(t *testing.T) { + defaultConfig := createDefaultConfig().(*Config) + + tests := []struct { + desc string + sc *bufio.Scanner + query url.Values + tt func(t *testing.T, reqLog plog.Logs, reqLen int, settings receiver.CreateSettings) + }{ + { + desc: "Valid query valid event", + sc: func() *bufio.Scanner { + reader := io.NopCloser(bytes.NewReader([]byte("this is a: log"))) + return bufio.NewScanner(reader) + }(), + query: func() url.Values { + v, err := url.ParseQuery(`qparam1=hello&qparam2=world`) + if err != nil { + log.Fatal("failed to parse query") + } + return v + }(), + tt: func(t *testing.T, reqLog plog.Logs, reqLen int, settings receiver.CreateSettings) { + require.Equal(t, 1, reqLen) + + attributes := reqLog.ResourceLogs().At(0).Resource().Attributes() + require.Equal(t, 2, attributes.Len()) + + scopeLogsScope := reqLog.ResourceLogs().At(0).ScopeLogs().At(0).Scope() + require.Equal(t, 2, scopeLogsScope.Attributes().Len()) + + if v, ok := attributes.Get("qparam1"); ok { + require.Equal(t, "hello", v.AsString()) + } else { + require.Fail(t, "faild to set attribute from query parameter 1") + } + if v, ok := attributes.Get("qparam2"); ok { + require.Equal(t, "world", v.AsString()) + } else { + require.Fail(t, "faild to set attribute query parameter 2") + } + }, + }, + { + desc: "Query is empty", + sc: func() *bufio.Scanner { + reader := io.NopCloser(bytes.NewReader([]byte("this is a: log"))) + return bufio.NewScanner(reader) + }(), + tt: func(t *testing.T, reqLog plog.Logs, reqLen int, settings receiver.CreateSettings) { + require.Equal(t, 1, reqLen) + + attributes := reqLog.ResourceLogs().At(0).Resource().Attributes() + require.Equal(t, 0, attributes.Len()) + + scopeLogsScope := reqLog.ResourceLogs().At(0).ScopeLogs().At(0).Scope() + require.Equal(t, 2, scopeLogsScope.Attributes().Len()) + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + reqLog, reqLen := reqToLog(test.sc, test.query, defaultConfig, receivertest.NewNopCreateSettings()) + test.tt(t, reqLog, reqLen, receivertest.NewNopCreateSettings()) + }) + } +} diff --git a/receiver/webhookeventreceiver/testdata/config.yaml b/receiver/webhookeventreceiver/testdata/config.yaml new file mode 100644 index 000000000000..64ea7f3be47d --- /dev/null +++ b/receiver/webhookeventreceiver/testdata/config.yaml @@ -0,0 +1,7 @@ +# each webhook will require its own webhook event receiver +webhookevent/valid_config: + endpoint: localhost:8080 + read_timeout: "500ms" + write_timeout: "500ms" + path: "some/path" + health_path: "health/path"