Skip to content

Commit

Permalink
Webhookevent client (#20751)
Browse files Browse the repository at this point in the history
Implemented the component. It works by standing up a server with a route to accept post requests from the webhook its configured to receive from. These requests are then converted into otel logs and shipped. Can handle gzip encoded payloads.

Link to tracking Issue: 18101
  • Loading branch information
shalper2 authored Jun 6, 2023
1 parent da2b74c commit 99bd0a9
Show file tree
Hide file tree
Showing 14 changed files with 887 additions and 18 deletions.
16 changes: 16 additions & 0 deletions .chloggen/webhookevent_client.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: webhookeventreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "first pass implementing the components internals."

# One or more tracking issues related to the change
issues: [18101]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
50 changes: 50 additions & 0 deletions receiver/webhookeventreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,60 @@
package webhookeventreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver"

import (
"errors"
"time"

"go.opentelemetry.io/collector/config/confighttp"
"go.uber.org/multierr"
)

var (
errMissingEndpointFromConfig = errors.New("missing receiver server endpoint from config")
errReadTimeoutExceedsMaxValue = errors.New("The duration specified for read_timeout exceeds the maximum allowed value of 10s")
errWriteTimeoutExceedsMaxValue = errors.New("The duration specified for write_timeout exceeds the maximum allowed value of 10s")
)

// Config defines configuration for the Generic Webhook receiver.
type Config struct {
confighttp.HTTPServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
ReadTimeout string `mapstructure:"read_timeout"` // wait time for reading request headers in ms. Default is twenty seconds.
WriteTimeout string `mapstructure:"write_timeout"` // wait time for writing request response in ms. Default is twenty seconds.
Path string `mapstructure:"path"` // path for data collection. Default is <host>:<port>/services/collector
HealthPath string `mapstructure:"health_path"` // path for health check api. Default is /services/collector/health
}

func (cfg *Config) Validate() error {
var errs error

maxReadWriteTimeout, _ := time.ParseDuration("10s")

if cfg.HTTPServerSettings.Endpoint == "" {
errs = multierr.Append(errs, errMissingEndpointFromConfig)
}

// If a user defines a custom read/write timeout there is a maximum value
// of 10s imposed here.
if cfg.ReadTimeout != "" {
readTimeout, err := time.ParseDuration(cfg.ReadTimeout)
if err != nil {
errs = multierr.Append(errs, err)
}

if readTimeout > maxReadWriteTimeout {
errs = multierr.Append(errs, errReadTimeoutExceedsMaxValue)
}
}

if cfg.WriteTimeout != "" {
writeTimeout, err := time.ParseDuration(cfg.WriteTimeout)
if err != nil {
errs = multierr.Append(errs, err)
}

if writeTimeout > maxReadWriteTimeout {
errs = multierr.Append(errs, errWriteTimeoutExceedsMaxValue)
}
}

return errs
}
109 changes: 108 additions & 1 deletion receiver/webhookeventreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package webhookeventreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/genericwebhookreceiver"
package webhookeventreceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver/internal/metadata"
)

// only one validate check so far
func TestValidateConfig(t *testing.T) {
t.Parallel()

var errs error
errs = multierr.Append(errs, errMissingEndpointFromConfig)
errs = multierr.Append(errs, errReadTimeoutExceedsMaxValue)
errs = multierr.Append(errs, errWriteTimeoutExceedsMaxValue)

tests := []struct {
desc string
expect error
conf Config
}{
{
desc: "Missing valid endpoint",
expect: errMissingEndpointFromConfig,
conf: Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: "",
},
},
},
{
desc: "ReadTimeout exceeds maximum value",
expect: errReadTimeoutExceedsMaxValue,
conf: Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: "localhost:0",
},
ReadTimeout: "14s",
},
},
{
desc: "WriteTimeout exceeds maximum value",
expect: errWriteTimeoutExceedsMaxValue,
conf: Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: "localhost:0",
},
WriteTimeout: "14s",
},
},
{
desc: "Multiple invalid configs",
expect: errs,
conf: Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: "",
},
WriteTimeout: "14s",
ReadTimeout: "15s",
},
},
}

for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
err := test.conf.Validate()
require.Error(t, err)
require.Contains(t, err.Error(), test.expect.Error())
})
}
}

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
// LoadConf includes the TypeStr which NewFactory does not set
id := component.NewIDWithName(metadata.Type, "valid_config")
cmNoStr, err := cm.Sub(id.String())
require.NoError(t, err)

expect := &Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: "localhost:8080",
},
ReadTimeout: "500ms",
WriteTimeout: "500ms",
Path: "some/path",
HealthPath: "health/path",
}

// create expected config
factory := NewFactory()
conf := factory.CreateDefaultConfig()
require.NoError(t, component.UnmarshalConfig(cmNoStr, conf))
require.NoError(t, component.ValidateConfig(conf))

require.Equal(t, expect, conf)
}
20 changes: 13 additions & 7 deletions receiver/webhookeventreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhookeventreceiver/internal/metadata"
)

const (
// might add this later, for now I wish to require a valid
// endpoint to be declared by the user.
// Default endpoints to bind to.
defaultEndpoint = ":8080"
// defaultEndpoint = "localhost:8080"
scopeLogName = "otlp/" + metadata.Type
defaultReadTimeout = "500ms"
defaultWriteTimeout = "500ms"
defaultPath = "/events"
defaultHealthPath = "/health_check"
)

// NewFactory creates a factory for Generic Webhook Receiver.
func NewFactory() component.Factory {
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
Expand All @@ -31,9 +37,10 @@ func NewFactory() component.Factory {
// Default configuration for the generic webhook receiver
func createDefaultConfig() component.Config {
return &Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
Endpoint: defaultEndpoint,
},
Path: defaultPath,
HealthPath: defaultHealthPath,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
}
}

Expand All @@ -45,6 +52,5 @@ func createLogsReceiver(
consumer consumer.Logs,
) (receiver.Logs, error) {
conf := cfg.(*Config)

return newLogsReceiver(params, *conf, consumer)
}
67 changes: 67 additions & 0 deletions receiver/webhookeventreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,70 @@
// SPDX-License-Identifier: Apache-2.0

package webhookeventreceiver

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func TestFactoryCreate(t *testing.T) {
factory := NewFactory()
require.EqualValues(t, "webhookevent", factory.Type())
}

func TestDefaultConfig(t *testing.T) {
cfg := createDefaultConfig()
require.NotNil(t, cfg, "Failed to create default configuration")
}

func TestCreateLogsReceiver(t *testing.T) {
tests := []struct {
desc string
run func(t *testing.T)
}{
{
desc: "Defaults with valid inputs",
run: func(t *testing.T) {
t.Parallel()

cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:8080"
require.NoError(t, cfg.Validate(), "error validating default config")

_, err := createLogsReceiver(
context.Background(),
receivertest.NewNopCreateSettings(),
cfg,
consumertest.NewNop(),
)
require.NoError(t, err, "failed to create logs receiver")
},
},
{
desc: "Missing consumer",
run: func(t *testing.T) {
t.Parallel()

cfg := createDefaultConfig().(*Config)
cfg.Endpoint = "localhost:8080"
require.NoError(t, cfg.Validate(), "error validating default config")

_, err := createLogsReceiver(
context.Background(),
receivertest.NewNopCreateSettings(),
cfg,
nil,
)
require.Error(t, err, "Succeeded in creating a receiver without a consumer")
},
},
}

for _, test := range tests {
t.Run(test.desc, test.run)
}
}
17 changes: 12 additions & 5 deletions receiver/webhookeventreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,52 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/webhoo
go 1.19

require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector v0.79.0
go.opentelemetry.io/collector/component v0.79.0
go.opentelemetry.io/collector/confmap v0.79.0
go.opentelemetry.io/collector/consumer v0.79.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012
go.opentelemetry.io/collector/receiver v0.79.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.9.0 // indirect
go.opentelemetry.io/collector/confmap v0.79.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/exporter v0.79.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0012 // indirect
go.opentelemetry.io/collector/pdata v1.0.0-rcv0012 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

retract (
Expand Down
Loading

0 comments on commit 99bd0a9

Please sign in to comment.