diff --git a/CHANGELOG.md b/CHANGELOG.md index bd80267a1f..0ab86e9eba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ Main (unreleased) - Add `otelcol.exporter.splunkhec` allowing to export otel data to Splunk HEC (@adlotsof) +- Add `otelcol.receiver.solace` component to receive traces from a Solace broker. (@wildum) + ### Bugfixes - Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 87251c0046..b4fc33504d 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -361,6 +361,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol - [otelcol.receiver.opencensus](../components/otelcol/otelcol.receiver.opencensus) - [otelcol.receiver.otlp](../components/otelcol/otelcol.receiver.otlp) - [otelcol.receiver.prometheus](../components/otelcol/otelcol.receiver.prometheus) +- [otelcol.receiver.solace](../components/otelcol/otelcol.receiver.solace) - [otelcol.receiver.vcenter](../components/otelcol/otelcol.receiver.vcenter) - [otelcol.receiver.zipkin](../components/otelcol/otelcol.receiver.zipkin) {{< /collapse >}} diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.solace.md b/docs/sources/reference/components/otelcol/otelcol.receiver.solace.md new file mode 100644 index 0000000000..2df0cd9133 --- /dev/null +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.solace.md @@ -0,0 +1,209 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.receiver.solace/ +description: Learn about otelcol.receiver.solace +title: otelcol.receiver.solace +--- + +# otelcol.receiver.solace + +`otelcol.receiver.solace` accepts traces from a [Solace PubSub+ Event Broker](https://solace.com/products/event-broker/) and +forwards it to other `otelcol.*` components. + +{{< admonition type="note" >}} +`otelcol.receiver.solace` is a wrapper over the upstream OpenTelemetry Collector `solace` receiver from the `otelcol-contrib` distribution. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} + +You can specify multiple `otelcol.receiver.solace` components by giving them different labels. + +## Usage + +```alloy +otelcol.receiver.solace "LABEL" { + queue = "QUEUE" + auth { + // sasl_plain or sasl_xauth2 or sasl_external block + } + output { + traces = [...] + } +} +``` + +## Arguments + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| -------------------- | -------- | ------------------------------------------------------------------------- | ---------------- | -------- | +| `queue` | `string` | Name of the Solace telemetry queue to get span trace messages from. | | yes | +| `broker` | `string` | Name of the Solace broker using AMQP over TLS. | `localhost:5671` | no | +| `max_unacknowledged` | `int` | Maximum number of unacknowledged messages the Solace broker can transmit. | 10 | no | + +`queue` must have the format `queue://#telemetry-myTelemetryProfile`. + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.receiver.solace`: + +| Hierarchy | Block | Description | Required | +| ------------------------------ | ------------------ | -------------------------------------------------------------------------------------------------------------------------------- | -------- | +| authentication | [authentication][] | Configures authentication for connecting to the Solace broker. | yes | +| authentication > sasl_plain | [sasl_plain][] | Authenticates against the Solace broker with SASL PLAIN. | no | +| authentication > sasl_xauth2 | [sasl_xauth2][] | Authenticates against the Solace broker with SASL XOauth2. | no | +| authentication > sasl_external | [sasl_external][] | Authenticates against the Solace broker with SASL External. | no | +| flow | [flow][] | Configures the behaviour to use when temporary errors are encountered from the next component. | no | +| flow > delayed_retry | [delayed_retry][] | Sets the flow control strategy to `delayed retry` which will wait before trying to push the message to the next component again. | no | +| tls | [tls][] | Configures TLS for connecting to the Solace broker. | no | +| debug_metrics | [debug_metrics][] | Configures the metrics which this component generates to monitor its state. | no | +| output | [output][] | Configures where to send received telemetry data. | yes | + +One SASL authentication block is required in the `authentication` block. + +`sasl_external` must be used together with the `tls` block. + +The `>` symbol indicates deeper levels of nesting. For example, +`authentication > tls` refers to a `tls` block defined inside an +`authentication` block. + +[authentication]: #authentication-block +[sasl_plain]: #sasl_plain-block +[sasl_xauth2]: #sasl_xauth2-block +[sasl_external]: #sasl_external-block +[tls]: #tls-block +[flow]: #flow-block +[delayed_retry]: #delayed_retry-block +[debug_metrics]: #debug_metrics-block +[output]: #output-block + +### authentication block + +The `authentication` block configures how to authenticate for connecting to the Solace broker. +It doesn't support any arguments and is configured fully through inner blocks. + +### sasl_plain block + +The `sasl_plain` block configures how to authenticate to the Solace broker with SASL PLAIN. + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| ---------- | -------- | -------------------- | ------- | -------- | +| `username` | `string` | The username to use. | | yes | +| `password` | `string` | The password to use. | | yes | + +### sasl_xauth2 block + +The `sasl_xauth2` block configures how to authenticate to the Solace broker with SASL XOauth2. + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| ---------- | -------- | ------------------------- | ------- | -------- | +| `username` | `string` | The username to use. | | yes | +| `bearer` | `string` | The bearer in plain text. | | yes | + +### sasl_external block + +The `sasl_xauth2` block configures how to authenticate to the Solace broker with SASL External. +It doesn't support any arguments or blocks. It must be used with the [tls][] block. + +### flow block + +The `flow` block configures the behaviour to use when temporary errors are encountered from the next component. +It doesn't support any arguments and is configured fully through inner blocks. + +### delayed_retry block + +The `delayed_retry` block sets the flow control strategy to `delayed retry` which will wait before trying to push the message to the next component again. + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| ------- | -------- | --------------------------------- | -------- | -------- | +| `delay` | `string` | The time to wait before retrying. | `"10ms"` | no | + +### tls block + +{{< docs/shared lookup="reference/components/otelcol-tls-client-block.md" source="alloy" version="" >}} + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +### output block + +{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="" >}} + +{{< admonition type="warning" >}} +Having multiple consumers may result in duplicated traces in case of errors because of the retry strategy. +It is recommended to only set one consumer for this component. +{{< /admonition >}} + +## Exported fields + +`otelcol.receiver.solace` does not export any fields. + +## Component health + +`otelcol.receiver.solace` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.receiver.solace` does not expose any component-specific debug +information. + +## Example + +This example forwards read telemetry data through a batch processor before +finally sending it to an OTLP-capable endpoint: + +```alloy +otelcol.receiver.solace "default" { + queue = "queue://#telemetry-testprofile" + broker = "localhost:5672" + auth { + sasl_plain { + username = "alloy" + password = "password" + } + } + tls { + insecure = true + insecure_skip_verify = true + } + output { + traces = [otelcol.processor.batch.default.input] + } +} + +otelcol.processor.batch "default" { + output { + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = sys.env("OTLP_ENDPOINT") + } +} +``` + + + +## Compatible components + +`otelcol.receiver.solace` can accept arguments from the following components: + +- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters) + + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index 1f1e516f04..41fa2a8f76 100644 --- a/go.mod +++ b/go.mod @@ -138,6 +138,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.112.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.112.0 github.com/ory/dockertest/v3 v3.8.1 @@ -821,6 +822,7 @@ require ( ) require ( + github.com/Azure/go-amqp v1.2.0 // indirect github.com/DataDog/datadog-agent/comp/core/log/def v0.57.1 // indirect github.com/antchfx/xmlquery v1.4.2 // indirect github.com/antchfx/xpath v1.3.2 // indirect diff --git a/go.sum b/go.sum index 5aa02756c4..6a139e700d 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 h1:gggzg0SUMs6SQbEw+ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0/go.mod h1:+6KLcKIVgxoBDMqMO/Nvy7bZ9a0nbU3I1DtFQK3YvB4= github.com/Azure/azure-storage-queue-go v0.0.0-20181215014128-6ed74e755687/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8= github.com/Azure/go-amqp v0.12.6/go.mod h1:qApuH6OFTSKZFmCOxccvAv5rLizBQf4v8pRmG138DPo= +github.com/Azure/go-amqp v1.2.0 h1:NNyfN3/cRszWzMvjmm64yaPZDHX/2DJkowv8Ub9y01I= +github.com/Azure/go-amqp v1.2.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= @@ -2047,6 +2049,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.112.0/go.mod h1:v61OP6Zxu8Kx4WLuswY06uaUPiOWDt5ykW9vqNQJNXc= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.112.0 h1:X/eUpWLWBZg2fDT+jWZiIept45akvKbZXhktg1x86gE= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.112.0/go.mod h1:q2lFBHfnG+ar2DJJlIU6RviOFXDeFur9vJ083NvOMQs= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0 h1:cHk8vS/D1pjeZ0o4LJJAENP847HHWjTXFe4y1RJYlfo= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0/go.mod h1:2CK7Hh6UGLnBSGW7Y0nopvEhoo25D6t/395jFEephEs= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0 h1:Vv1FDwd7pykzj8Wmuc7yj7bcN0qUv1mGBb/dcTMPfNE= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0/go.mod h1:lklLK8ELD2Wk5z7ywjaf6XEbbViDtf7uK8jAExjRlls= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.112.0 h1:XhKHjEpQJQMaUuWVhWS1FEuaY4LJDwBgsGXE166j9SY= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 5f2cba1ea2..f9da77aacb 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -101,6 +101,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/receiver/opencensus" // Import otelcol.receiver.opencensus _ "github.com/grafana/alloy/internal/component/otelcol/receiver/otlp" // Import otelcol.receiver.otlp _ "github.com/grafana/alloy/internal/component/otelcol/receiver/prometheus" // Import otelcol.receiver.prometheus + _ "github.com/grafana/alloy/internal/component/otelcol/receiver/solace" // Import otelcol.receiver.solace _ "github.com/grafana/alloy/internal/component/otelcol/receiver/vcenter" // Import otelcol.receiver.vcenter _ "github.com/grafana/alloy/internal/component/otelcol/receiver/zipkin" // Import otelcol.receiver.zipkin _ "github.com/grafana/alloy/internal/component/prometheus/exporter/apache" // Import prometheus.exporter.apache diff --git a/internal/component/all/all_test.go b/internal/component/all/all_test.go index fe28c7739a..a0cc0ef448 100644 --- a/internal/component/all/all_test.go +++ b/internal/component/all/all_test.go @@ -99,6 +99,10 @@ func sharePointer(a, b reflect.Value) (string, bool) { return "", false case reflect.Pointer: + // same edge case as above: skip if this is a struct ptr and that the structs are empty + if !a.IsNil() && a.Elem().Kind() == reflect.Struct && a.Elem().NumField() == 0 { + return "", false + } if pointersMatch(a, b) { return "", true } else { diff --git a/internal/component/otelcol/receiver/solace/config_solace.go b/internal/component/otelcol/receiver/solace/config_solace.go new file mode 100644 index 0000000000..b5ed417456 --- /dev/null +++ b/internal/component/otelcol/receiver/solace/config_solace.go @@ -0,0 +1,98 @@ +package solace + +import ( + "time" + + "github.com/grafana/alloy/syntax/alloytypes" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver" + "go.opentelemetry.io/collector/config/configopaque" +) + +// Authentication defines authentication strategies. +type Authentication struct { + PlainText *SaslPlainTextConfig `alloy:"sasl_plain,block,optional"` + XAuth2 *SaslXAuth2Config `alloy:"sasl_xauth2,block,optional"` + External *SaslExternalConfig `alloy:"sasl_external,block,optional"` +} + +// Convert converts args into the upstream type. +func (args Authentication) Convert() solacereceiver.Authentication { + auth := solacereceiver.Authentication{} + + if args.PlainText != nil { + auth.PlainText = args.PlainText.Convert() + } + if args.XAuth2 != nil { + auth.XAuth2 = args.XAuth2.Convert() + } + if args.External != nil { + auth.External = args.External.Convert() + } + + return auth +} + +// SaslPlainTextConfig defines SASL PLAIN authentication. +type SaslPlainTextConfig struct { + Username string `alloy:"username,attr"` + Password alloytypes.Secret `alloy:"password,attr"` +} + +func (args SaslPlainTextConfig) Convert() *solacereceiver.SaslPlainTextConfig { + return &solacereceiver.SaslPlainTextConfig{ + Username: args.Username, + Password: configopaque.String(args.Password), + } +} + +// SaslXAuth2Config defines the configuration for the SASL XAUTH2 authentication. +type SaslXAuth2Config struct { + Username string `alloy:"username,attr"` + Bearer string `alloy:"bearer,attr"` +} + +func (args SaslXAuth2Config) Convert() *solacereceiver.SaslXAuth2Config { + return &solacereceiver.SaslXAuth2Config{ + Username: args.Username, + Bearer: args.Bearer, + } +} + +// SaslExternalConfig defines the configuration for the SASL External used in conjunction with TLS client authentication. +type SaslExternalConfig struct{} + +func (args SaslExternalConfig) Convert() *solacereceiver.SaslExternalConfig { + return &solacereceiver.SaslExternalConfig{} +} + +// FlowControl defines the configuration for what to do in backpressure scenarios, e.g. memorylimiter errors +type FlowControl struct { + DelayedRetry *FlowControlDelayedRetry `alloy:"delayed_retry,block"` +} + +func (args FlowControl) Convert() solacereceiver.FlowControl { + flowControl := solacereceiver.FlowControl{} + if args.DelayedRetry != nil { + flowControl.DelayedRetry = args.DelayedRetry.Convert() + } + return flowControl +} + +func (args *FlowControl) SetToDefault() { + *args = FlowControl{ + DelayedRetry: &FlowControlDelayedRetry{ + Delay: 10 * time.Millisecond, + }, + } +} + +// FlowControlDelayedRetry represents the strategy of waiting for a defined amount of time (in time.Duration) and attempt redelivery +type FlowControlDelayedRetry struct { + Delay time.Duration `alloy:"delay,attr,optional"` +} + +func (args FlowControlDelayedRetry) Convert() *solacereceiver.FlowControlDelayedRetry { + return &solacereceiver.FlowControlDelayedRetry{ + Delay: args.Delay, + } +} diff --git a/internal/component/otelcol/receiver/solace/solace.go b/internal/component/otelcol/receiver/solace/solace.go new file mode 100644 index 0000000000..5895d1c9bd --- /dev/null +++ b/internal/component/otelcol/receiver/solace/solace.go @@ -0,0 +1,159 @@ +// Package solace provides an otelcol.receiver.solace component. +package solace + +/* +How to test solace manually: +- Use the docker compose template here: https://github.com/SolaceLabs/solace-single-docker-compose/blob/master/template/PubSubStandard_singleNode.yml +- Log in to http://localhost:8080/ to configure solace and select default +- Go to Telemetry, enable it, create a profile "testprofile" and enable both trace and receiver +- Telemetry > Receiver connect ACLs > Client Connect Default Action: set it to "Allow" via the edit button +- Telemetry > Trace Filters: create a trace filter "testfilter" and enable it +- Click on the test filter, go to Subscriptions and create a new Subscription ">" +- In Queues, create a new queue "testqueue" +- Click on "testqueue" and create a subscription "solace/tracing" +- Access Control > Client Authentication: set the Type to "Internal database" +- Access Control > Client Username: create two clients: + - "alloy", with all toggle enabled, the password set to "alloy" and the client profile and acl profile set to #telemetry-testprofile + - "bob", with all toggle and the password set to "bob". Keep the client profile and acl profile set to default +- Connect Alloy with the following config: + otelcol.receiver.solace "solace" { + queue = "queue://#telemetry-testprofile" + broker = "localhost:5672" + auth { + sasl_plain { + username = "alloy" + password = "alloy" + } + } + tls { + insecure = true + insecure_skip_verify = true + } + output { + traces = [otelcol.exporter.debug.solace.input] + } + } + + otelcol.exporter.debug "solace" { + verbosity = "detailed" + } + + logging { + level = "debug" + } +- In "Try Me!", connect as "bob" (username and password) +- You should be able to see the span in the terminal +*/ + +import ( + "fmt" + "strings" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/receiver" + "github.com/grafana/alloy/internal/featuregate" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pipeline" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.receiver.solace", + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := solacereceiver.NewFactory() + return receiver.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.receiver.solace component. +type Arguments struct { + // The upstream component uses a list for the broker but they only use the first element in the list so I decided to use + // a simple string in Alloy to avoid confusing the users. + Broker string `alloy:"broker,attr,optional"` + Queue string `alloy:"queue,attr"` + MaxUnacked int32 `alloy:"max_unacknowledged,attr,optional"` + + TLS otelcol.TLSClientArguments `alloy:"tls,block,optional"` + Flow FlowControl `alloy:"flow_control,block,optional"` + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` + Auth Authentication `alloy:"auth,block"` + + // Output configures where to send received data. Required. + Output *otelcol.ConsumerArguments `alloy:"output,block"` +} + +var _ receiver.Arguments = Arguments{} + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{ + Broker: "localhost:5671", + MaxUnacked: 1000, + } + args.Flow.SetToDefault() + args.DebugMetrics.SetToDefault() +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + authMethod := 0 + if args.Auth.PlainText != nil { + authMethod++ + } + if args.Auth.External != nil { + authMethod++ + } + if args.Auth.XAuth2 != nil { + authMethod++ + } + if authMethod != 1 { + return fmt.Errorf("the auth block must contain exactly one of sasl_plain block, sasl_xauth2 block or sasl_external block") + } + if len(strings.TrimSpace(args.Queue)) == 0 { + return fmt.Errorf("queue must not be empty, queue definition has format queue://") + } + if args.Flow.DelayedRetry != nil && args.Flow.DelayedRetry.Delay <= 0 { + return fmt.Errorf("the delay attribute in the delayed_retry block must be > 0, got %d", args.Flow.DelayedRetry.Delay) + } + return nil +} + +// Convert implements receiver.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + return &solacereceiver.Config{ + Broker: []string{args.Broker}, + Queue: args.Queue, + MaxUnacked: args.MaxUnacked, + TLS: *args.TLS.Convert(), + Auth: args.Auth.Convert(), + Flow: args.Flow.Convert(), + }, nil +} + +// Extensions implements receiver.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements receiver.Arguments. +func (args Arguments) Exporters() map[pipeline.Signal]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements receiver.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// DebugMetricsConfig implements receiver.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/receiver/solace/solace_test.go b/internal/component/otelcol/receiver/solace/solace_test.go new file mode 100644 index 0000000000..fc1fb6f708 --- /dev/null +++ b/internal/component/otelcol/receiver/solace/solace_test.go @@ -0,0 +1,226 @@ +package solace_test + +import ( + "testing" + "time" + + "github.com/grafana/alloy/internal/component/otelcol/receiver/solace" + "github.com/grafana/alloy/syntax" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" +) + +func TestArguments_UnmarshalAlloy(t *testing.T) { + tests := []struct { + testName string + cfg string + expected solacereceiver.Config + }{ + { + testName: "Defaults", + cfg: ` + queue = "queue://#telemetry_testprofile" + auth { + sasl_plain { + username = "alloy" + password = "password" + } + } + output {} + `, + expected: solacereceiver.Config{ + Broker: []string{"localhost:5671"}, + Queue: "queue://#telemetry_testprofile", + MaxUnacked: 1000, + Flow: solacereceiver.FlowControl{ + DelayedRetry: &solacereceiver.FlowControlDelayedRetry{ + Delay: 10 * time.Millisecond, + }, + }, + Auth: solacereceiver.Authentication{ + PlainText: &solacereceiver.SaslPlainTextConfig{ + Username: "alloy", + Password: "password", + }, + }, + }, + }, + { + testName: "Explicit Values - External / TLS", + cfg: ` + broker = "localhost:5672" + max_unacknowledged = 500 + queue = "queue://#telemetry_testprofile" + auth { + sasl_external {} + } + tls { + cert_file = "testdata/test-cert.crt" + key_file = "testdata/test-key.key" + } + flow_control { + delayed_retry { + delay = "50ms" + } + } + output {} + `, + expected: solacereceiver.Config{ + Broker: []string{"localhost:5672"}, + Queue: "queue://#telemetry_testprofile", + MaxUnacked: 500, + Flow: solacereceiver.FlowControl{ + DelayedRetry: &solacereceiver.FlowControlDelayedRetry{ + Delay: 50 * time.Millisecond, + }, + }, + Auth: solacereceiver.Authentication{ + External: &solacereceiver.SaslExternalConfig{}, + }, + TLS: configtls.ClientConfig{ + Config: configtls.Config{ + CertFile: "testdata/test-cert.crt", + KeyFile: "testdata/test-key.key", + }, + }, + }, + }, + { + testName: "Explicit Values - XAuth2 / TLS", + cfg: ` + broker = "localhost:5672" + max_unacknowledged = 500 + queue = "queue://#telemetry_testprofile" + auth { + sasl_xauth2 { + username = "alloy" + bearer = "bearer" + } + } + tls { + cert_file = "testdata/test-cert.crt" + key_file = "testdata/test-key.key" + } + flow_control { + delayed_retry { + delay = "50ms" + } + } + output {} + `, + expected: solacereceiver.Config{ + Broker: []string{"localhost:5672"}, + Queue: "queue://#telemetry_testprofile", + MaxUnacked: 500, + Flow: solacereceiver.FlowControl{ + DelayedRetry: &solacereceiver.FlowControlDelayedRetry{ + Delay: 50 * time.Millisecond, + }, + }, + Auth: solacereceiver.Authentication{ + XAuth2: &solacereceiver.SaslXAuth2Config{ + Username: "alloy", + Bearer: "bearer", + }, + }, + TLS: configtls.ClientConfig{ + Config: configtls.Config{ + CertFile: "testdata/test-cert.crt", + KeyFile: "testdata/test-key.key", + }, + }, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args solace.Arguments + err := syntax.Unmarshal([]byte(tc.cfg), &args) + require.NoError(t, err) + + actualPtr, err := args.Convert() + require.NoError(t, err) + + actual := actualPtr.(*solacereceiver.Config) + + require.Equal(t, tc.expected, *actual) + }) + } +} + +func TestArguments_Validate(t *testing.T) { + tests := []struct { + testName string + cfg string + expectedError string + }{ + { + testName: "Missing Auth", + cfg: ` + queue = "queue://#telemetry_testprofile" + auth {} + output {} + `, + expectedError: "the auth block must contain exactly one of sasl_plain block, sasl_xauth2 block or sasl_external block", + }, + { + testName: "Multiple Auth", + cfg: ` + queue = "queue://#telemetry_testprofile" + auth { + sasl_plain { + username = "alloy" + password = "password" + } + sasl_xauth2 { + username = "alloy" + bearer = "bearer" + } + } + output {} + `, + expectedError: "the auth block must contain exactly one of sasl_plain block, sasl_xauth2 block or sasl_external block", + }, + { + testName: "Empty Queue", + cfg: ` + queue = "" + auth { + sasl_plain { + username = "alloy" + password = "password" + } + } + output {} + `, + expectedError: "queue must not be empty, queue definition has format queue://", + }, + { + testName: "Wrong value for delay in delayed_retry block", + cfg: ` + queue = "queue://#telemetry_testprofile" + auth { + sasl_plain { + username = "alloy" + password = "password" + } + } + flow_control { + delayed_retry { + delay = "0ms" + } + } + output {} + `, + expectedError: "the delay attribute in the delayed_retry block must be > 0, got 0", + }, + } + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + var args solace.Arguments + require.ErrorContains(t, syntax.Unmarshal([]byte(tc.cfg), &args), tc.expectedError) + }) + } +} diff --git a/internal/converter/internal/otelcolconvert/converter_solacereceiver.go b/internal/converter/internal/otelcolconvert/converter_solacereceiver.go new file mode 100644 index 0000000000..435d95b4dd --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_solacereceiver.go @@ -0,0 +1,120 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/receiver/solace" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/grafana/alloy/syntax/alloytypes" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/pipeline" +) + +func init() { + converters = append(converters, solaceReceiverConverter{}) +} + +type solaceReceiverConverter struct{} + +func (solaceReceiverConverter) Factory() component.Factory { return solacereceiver.NewFactory() } + +func (solaceReceiverConverter) InputComponentName() string { return "" } + +func (solaceReceiverConverter) ConvertAndAppend(state *State, id componentstatus.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toSolaceReceiver(state, id, cfg.(*solacereceiver.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "solace"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toSolaceReceiver(state *State, id componentstatus.InstanceID, cfg *solacereceiver.Config) *solace.Arguments { + nextTraces := state.Next(id, pipeline.SignalTraces) + + var broker string + if len(cfg.Broker) == 0 { + broker = "" + } else { + broker = cfg.Broker[0] + } + + return &solace.Arguments{ + Broker: broker, + Queue: cfg.Queue, + MaxUnacked: cfg.MaxUnacked, + + Auth: toSolaceAuthentication(cfg.Auth), + TLS: toTLSClientArguments(cfg.TLS), + Flow: toSolaceFlow(cfg.Flow), + DebugMetrics: common.DefaultValue[solace.Arguments]().DebugMetrics, + Output: &otelcol.ConsumerArguments{ + Traces: ToTokenizedConsumers(nextTraces), + }, + } +} + +func toSolaceAuthentication(cfg solacereceiver.Authentication) solace.Authentication { + return solace.Authentication{ + PlainText: toSaslPlaintext(cfg.PlainText), + XAuth2: toSaslXAuth2(cfg.XAuth2), + External: toSaslExternal(cfg.External), + } +} + +func toSaslPlaintext(cfg *solacereceiver.SaslPlainTextConfig) *solace.SaslPlainTextConfig { + if cfg == nil { + return nil + } + + return &solace.SaslPlainTextConfig{ + Username: cfg.Username, + Password: alloytypes.Secret(cfg.Password), + } +} + +func toSaslXAuth2(cfg *solacereceiver.SaslXAuth2Config) *solace.SaslXAuth2Config { + if cfg == nil { + return nil + } + + return &solace.SaslXAuth2Config{ + Username: cfg.Username, + Bearer: cfg.Bearer, + } +} + +func toSaslExternal(cfg *solacereceiver.SaslExternalConfig) *solace.SaslExternalConfig { + if cfg == nil { + return nil + } + + return &solace.SaslExternalConfig{} +} + +func toSolaceFlow(cfg solacereceiver.FlowControl) solace.FlowControl { + return solace.FlowControl{ + DelayedRetry: toFlowControlDelayedRetry(cfg.DelayedRetry), + } +} + +func toFlowControlDelayedRetry(cfg *solacereceiver.FlowControlDelayedRetry) *solace.FlowControlDelayedRetry { + if cfg == nil { + return nil + } + return &solace.FlowControlDelayedRetry{ + Delay: cfg.Delay, + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/solace.alloy b/internal/converter/internal/otelcolconvert/testdata/solace.alloy new file mode 100644 index 0000000000..7e088ea149 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/solace.alloy @@ -0,0 +1,31 @@ +otelcol.receiver.solace "default" { + broker = "localhost:5672" + queue = "queue://#telemetry-profile123" + + tls { + insecure = true + } + + flow_control { + delayed_retry { + delay = "20ms" + } + } + + auth { + sasl_plain { + username = "otel" + password = "otel01$" + } + } + + output { + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/solace.yaml b/internal/converter/internal/otelcolconvert/testdata/solace.yaml new file mode 100644 index 0000000000..dfaa5dc0ab --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/solace.yaml @@ -0,0 +1,24 @@ +receivers: + solace: + broker: [localhost:5672] + auth: + sasl_plain: + username: otel + password: otel01$ + flow_control: + delayed_retry: + delay: 20ms + tls: + insecure: true + queue: queue://#telemetry-profile123 + +exporters: + otlp: + endpoint: database:4317 + +service: + pipelines: + traces: + receivers: [solace] + processors: [] + exporters: [otlp]