Skip to content

Commit

Permalink
remove some exports from the smartagent receiver (open-telemetry#2077)
Browse files Browse the repository at this point in the history
* remove some exports from the smartagent receiver

* rename go receiver output -> out
atoulme authored Oct 5, 2022

Verified

This commit was signed with the committer’s verified signature.
DavideD Davide D'Alto
1 parent 3f7dedb commit 3f6e6e9
Showing 11 changed files with 173 additions and 173 deletions.
4 changes: 2 additions & 2 deletions internal/receiver/smartagentreceiver/config.go
Original file line number Diff line number Diff line change
@@ -187,14 +187,14 @@ func setHostAndPortViaEndpoint(endpoint string, monitorConfig any) error {
}

if host != "" {
_, err := SetStructFieldIfZeroValue(monitorConfig, "Host", host)
_, err := setStructFieldIfZeroValue(monitorConfig, "Host", host)
if err != nil {
return fmt.Errorf("unable to set monitor Host field using Endpoint-derived value of %s: %w", host, err)
}
}

if port != 0 {
_, err := SetStructFieldIfZeroValue(monitorConfig, "Port", port)
_, err := setStructFieldIfZeroValue(monitorConfig, "Port", port)
if err != nil {
return fmt.Errorf("unable to set monitor Port field using Endpoint-derived value of %d: %w", port, err)
}
20 changes: 10 additions & 10 deletions internal/receiver/smartagentreceiver/converter/event.go
Original file line number Diff line number Diff line change
@@ -24,12 +24,12 @@ import (
)

const (
// SFxEventCategoryKey key for splunk event category,
SFxEventCategoryKey = "com.splunk.signalfx.event_category"
// SFxEventPropertiesKey key for splunk event properties.
SFxEventPropertiesKey = "com.splunk.signalfx.event_properties"
// SFxEventType key for splunk event type
SFxEventType = "com.splunk.signalfx.event_type"
// sfxEventCategoryKey key for splunk event category,
sfxEventCategoryKey = "com.splunk.signalfx.event_category"
// sfxEventPropertiesKey key for splunk event properties.
sfxEventPropertiesKey = "com.splunk.signalfx.event_properties"
// sfxEventType key for splunk event type
sfxEventType = "com.splunk.signalfx.event_type"
)

// eventToLog converts a SFx event to a plog.Logs entry suitable for consumption by LogConsumer.
@@ -58,17 +58,17 @@ func sfxEventToPDataLogs(event *event.Event, logger *zap.Logger) plog.Logs {

if event.Category == 0 {
// This attribute must be present or SFx exporter will not know it's an event
attrs.PutEmpty(SFxEventCategoryKey)
attrs.PutEmpty(sfxEventCategoryKey)
} else {
attrs.PutInt(SFxEventCategoryKey, int64(event.Category))
attrs.PutInt(sfxEventCategoryKey, int64(event.Category))
}

if event.EventType != "" {
attrs.PutString(SFxEventType, event.EventType)
attrs.PutString(sfxEventType, event.EventType)
}

if len(event.Properties) > 0 {
propMap := attrs.PutEmptyMap(SFxEventPropertiesKey)
propMap := attrs.PutEmptyMap(sfxEventPropertiesKey)
propMap.EnsureCapacity(len(event.Properties))

for property, value := range event.Properties {
6 changes: 3 additions & 3 deletions internal/receiver/smartagentreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -31,10 +31,10 @@ var (
// Smart Agent receivers can be for metrics or logs (events).
// We keep store of them to ensure the same instance is used for a given config.
receiverStoreLock = sync.Mutex{}
receiverStore = map[*Config]*Receiver{}
receiverStore = map[*Config]*receiver{}
)

func getOrCreateReceiver(cfg config.Receiver, params component.ReceiverCreateSettings) (*Receiver, error) {
func getOrCreateReceiver(cfg config.Receiver, params component.ReceiverCreateSettings) (*receiver, error) {
receiverStoreLock.Lock()
defer receiverStoreLock.Unlock()
receiverConfig := cfg.(*Config)
@@ -46,7 +46,7 @@ func getOrCreateReceiver(cfg config.Receiver, params component.ReceiverCreateSet

receiver, ok := receiverStore[receiverConfig]
if !ok {
receiver = NewReceiver(params, *receiverConfig)
receiver = newReceiver(params, *receiverConfig)
receiverStore[receiverConfig] = receiver
}

12 changes: 6 additions & 6 deletions internal/receiver/smartagentreceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -139,9 +139,9 @@ func TestCreateMetricsThenLogsAndThenTracesReceiver(t *testing.T) {
assert.Same(t, logsReceiver, tracesReceiver)
assert.Same(t, metricsReceiver, receiverStore[cfg.(*Config)])

assert.Same(t, nextMetricsConsumer, metricsReceiver.(*Receiver).nextMetricsConsumer)
assert.Same(t, nextLogsConsumer, metricsReceiver.(*Receiver).nextLogsConsumer)
assert.Same(t, nextTracesConsumer, metricsReceiver.(*Receiver).nextTracesConsumer)
assert.Same(t, nextMetricsConsumer, metricsReceiver.(*receiver).nextMetricsConsumer)
assert.Same(t, nextLogsConsumer, metricsReceiver.(*receiver).nextLogsConsumer)
assert.Same(t, nextTracesConsumer, metricsReceiver.(*receiver).nextTracesConsumer)
}

func TestCreateTracesThenLogsAndThenMetricsReceiver(t *testing.T) {
@@ -169,7 +169,7 @@ func TestCreateTracesThenLogsAndThenMetricsReceiver(t *testing.T) {
assert.Same(t, logsReceiver, tracesReceiver)
assert.Same(t, metricsReceiver, receiverStore[cfg.(*Config)])

assert.Same(t, nextMetricsConsumer, metricsReceiver.(*Receiver).nextMetricsConsumer)
assert.Same(t, nextLogsConsumer, metricsReceiver.(*Receiver).nextLogsConsumer)
assert.Same(t, nextTracesConsumer, metricsReceiver.(*Receiver).nextTracesConsumer)
assert.Same(t, nextMetricsConsumer, metricsReceiver.(*receiver).nextMetricsConsumer)
assert.Same(t, nextLogsConsumer, metricsReceiver.(*receiver).nextLogsConsumer)
assert.Same(t, nextTracesConsumer, metricsReceiver.(*receiver).nextTracesConsumer)
}
140 changes: 70 additions & 70 deletions internal/receiver/smartagentreceiver/output.go
Original file line number Diff line number Diff line change
@@ -35,11 +35,11 @@ import (

const internalTransport = "internal"

// Output is an implementation of a Smart Agent FilteringOutput that receives datapoints, events, and dimension updates
// output is an implementation of a Smart Agent FilteringOutput that receives datapoints, events, and dimension updates
// from a configured monitor. It will forward all datapoints to the nextMetricsConsumer, all dimension updates to the
// nextDimensionClients as determined by the associated items in Config.MetadataClients, and all events to the
// nextLogsConsumer.
type Output struct {
type output struct {
nextMetricsConsumer consumer.Metrics
nextLogsConsumer consumer.Logs
nextTracesConsumer consumer.Traces
@@ -54,15 +54,15 @@ type Output struct {
nextDimensionClients []metadata.MetadataExporter
}

var _ types.Output = (*Output)(nil)
var _ types.FilteringOutput = (*Output)(nil)
var _ types.Output = (*output)(nil)
var _ types.FilteringOutput = (*output)(nil)

func NewOutput(
func newOutput(
config Config, filtering *monitorFiltering, nextMetricsConsumer consumer.Metrics,
nextLogsConsumer consumer.Logs, nextTracesConsumer consumer.Traces, host component.Host,
params component.ReceiverCreateSettings,
) *Output {
return &Output{
) *output {
return &output{
receiverID: config.ID(),
nextMetricsConsumer: nextMetricsConsumer,
nextLogsConsumer: nextLogsConsumer,
@@ -162,77 +162,77 @@ func getLoneSFxExporter(host component.Host, exporterType collectorConfig.DataTy

}

func (output *Output) AddDatapointExclusionFilter(filter dpfilters.DatapointFilter) {
output.logger.Debug("AddDatapointExclusionFilter has been called", zap.Any("filter", filter))
output.monitorFiltering.AddDatapointExclusionFilter(filter)
func (out *output) AddDatapointExclusionFilter(filter dpfilters.DatapointFilter) {
out.logger.Debug("AddDatapointExclusionFilter has been called", zap.Any("filter", filter))
out.monitorFiltering.AddDatapointExclusionFilter(filter)
}

func (output *Output) EnabledMetrics() []string {
output.logger.Debug("EnabledMetrics has been called.")
return output.monitorFiltering.EnabledMetrics()
func (out *output) EnabledMetrics() []string {
out.logger.Debug("EnabledMetrics has been called.")
return out.monitorFiltering.EnabledMetrics()
}

func (output *Output) HasEnabledMetricInGroup(group string) bool {
output.logger.Debug("HasEnabledMetricInGroup has been called", zap.String("group", group))
return output.monitorFiltering.HasEnabledMetricInGroup(group)
func (out *output) HasEnabledMetricInGroup(group string) bool {
out.logger.Debug("HasEnabledMetricInGroup has been called", zap.String("group", group))
return out.monitorFiltering.HasEnabledMetricInGroup(group)
}

func (output *Output) HasAnyExtraMetrics() bool {
output.logger.Debug("HasAnyExtraMetrics has been called.")
return output.monitorFiltering.HasAnyExtraMetrics()
func (out *output) HasAnyExtraMetrics() bool {
out.logger.Debug("HasAnyExtraMetrics has been called.")
return out.monitorFiltering.HasAnyExtraMetrics()
}

// Copy clones the Output to provide to child monitors with their own extraDimensions.
func (output *Output) Copy() types.Output {
output.logger.Debug("Copying Output", zap.Any("output", output))
cp := *output
cp.extraDimensions = utils.CloneStringMap(output.extraDimensions)
cp.extraSpanTags = utils.CloneStringMap(output.extraSpanTags)
cp.defaultSpanTags = utils.CloneStringMap(output.defaultSpanTags)
// Copy clones the output to provide to child monitors with their own extraDimensions.
func (out *output) Copy() types.Output {
out.logger.Debug("Copying out", zap.Any("out", out))
cp := *out
cp.extraDimensions = utils.CloneStringMap(out.extraDimensions)
cp.extraSpanTags = utils.CloneStringMap(out.extraSpanTags)
cp.defaultSpanTags = utils.CloneStringMap(out.defaultSpanTags)
return &cp
}

func (output *Output) SendDatapoints(datapoints ...*datapoint.Datapoint) {
if output.nextMetricsConsumer == nil {
func (out *output) SendDatapoints(datapoints ...*datapoint.Datapoint) {
if out.nextMetricsConsumer == nil {
return
}

ctx := output.reporter.StartMetricsOp(context.Background())
ctx := out.reporter.StartMetricsOp(context.Background())

datapoints = output.filterDatapoints(datapoints)
datapoints = out.filterDatapoints(datapoints)
for _, dp := range datapoints {
// Output's extraDimensions take priority over datapoint's
dp.Dimensions = utils.MergeStringMaps(dp.Dimensions, output.extraDimensions)
// out's extraDimensions take priority over datapoint's
dp.Dimensions = utils.MergeStringMaps(dp.Dimensions, out.extraDimensions)
}

metrics, err := output.translator.ToMetrics(datapoints)
metrics, err := out.translator.ToMetrics(datapoints)
if err != nil {
output.logger.Error("error converting SFx datapoints to ptrace.Traces", zap.Error(err))
out.logger.Error("error converting SFx datapoints to ptrace.Traces", zap.Error(err))
}

numPoints := metrics.DataPointCount()
err = output.nextMetricsConsumer.ConsumeMetrics(context.Background(), metrics)
output.reporter.EndMetricsOp(ctx, typeStr, numPoints, err)
err = out.nextMetricsConsumer.ConsumeMetrics(context.Background(), metrics)
out.reporter.EndMetricsOp(ctx, typeStr, numPoints, err)
}

func (output *Output) SendEvent(event *event.Event) {
if output.nextLogsConsumer == nil {
func (out *output) SendEvent(event *event.Event) {
if out.nextLogsConsumer == nil {
return
}

logs, err := output.translator.ToLogs(event)
logs, err := out.translator.ToLogs(event)
if err != nil {
output.logger.Error("error converting SFx events to ptrace.Traces", zap.Error(err))
out.logger.Error("error converting SFx events to ptrace.Traces", zap.Error(err))
}

err = output.nextLogsConsumer.ConsumeLogs(context.Background(), logs)
err = out.nextLogsConsumer.ConsumeLogs(context.Background(), logs)
if err != nil {
output.logger.Debug("SendEvent has failed", zap.Error(err))
out.logger.Debug("SendEvent has failed", zap.Error(err))
}
}

func (output *Output) SendSpans(spans ...*trace.Span) {
if output.nextTracesConsumer == nil {
func (out *output) SendSpans(spans ...*trace.Span) {
if out.nextTracesConsumer == nil {
return
}

@@ -241,71 +241,71 @@ func (output *Output) SendSpans(spans ...*trace.Span) {
span.Tags = map[string]string{}
}

for name, value := range output.defaultSpanTags {
for name, value := range out.defaultSpanTags {
// If the tags are already set, don't override
if _, ok := span.Tags[name]; !ok {
span.Tags[name] = value
}
}

span.Tags = utils.MergeStringMaps(span.Tags, output.extraSpanTags)
span.Tags = utils.MergeStringMaps(span.Tags, out.extraSpanTags)
}

traces, err := output.translator.ToTraces(spans)
traces, err := out.translator.ToTraces(spans)
if err != nil {
output.logger.Error("error converting SFx spans to ptrace.Traces", zap.Error(err))
out.logger.Error("error converting SFx spans to ptrace.Traces", zap.Error(err))
}

err = output.nextTracesConsumer.ConsumeTraces(context.Background(), traces)
err = out.nextTracesConsumer.ConsumeTraces(context.Background(), traces)
if err != nil {
output.logger.Debug("SendSpans has failed", zap.Error(err))
out.logger.Debug("SendSpans has failed", zap.Error(err))
}
}

func (output *Output) SendDimensionUpdate(dimension *types.Dimension) {
if len(output.nextDimensionClients) == 0 {
func (out *output) SendDimensionUpdate(dimension *types.Dimension) {
if len(out.nextDimensionClients) == 0 {
return
}

metadataUpdate := dimensionToMetadataUpdate(*dimension)
for _, consumer := range output.nextDimensionClients {
for _, consumer := range out.nextDimensionClients {
err := consumer.ConsumeMetadata([]*metadata.MetadataUpdate{&metadataUpdate})
if err != nil {
output.logger.Debug("SendDimensionUpdate has failed", zap.Error(err))
out.logger.Debug("SendDimensionUpdate has failed", zap.Error(err))
}
}
}

func (output *Output) AddExtraDimension(key, value string) {
output.logger.Debug("Adding extra dimension", zap.String("key", key), zap.String("value", value))
output.extraDimensions[key] = value
func (out *output) AddExtraDimension(key, value string) {
out.logger.Debug("Adding extra dimension", zap.String("key", key), zap.String("value", value))
out.extraDimensions[key] = value
}

func (output *Output) RemoveExtraDimension(key string) {
output.logger.Debug("Removing extra dimension", zap.String("key", key))
delete(output.extraDimensions, key)
func (out *output) RemoveExtraDimension(key string) {
out.logger.Debug("Removing extra dimension", zap.String("key", key))
delete(out.extraDimensions, key)
}

func (output *Output) AddExtraSpanTag(key, value string) {
output.extraSpanTags[key] = value
func (out *output) AddExtraSpanTag(key, value string) {
out.extraSpanTags[key] = value
}

func (output *Output) RemoveExtraSpanTag(key string) {
delete(output.extraSpanTags, key)
func (out *output) RemoveExtraSpanTag(key string) {
delete(out.extraSpanTags, key)
}

func (output *Output) AddDefaultSpanTag(key, value string) {
output.defaultSpanTags[key] = value
func (out *output) AddDefaultSpanTag(key, value string) {
out.defaultSpanTags[key] = value
}

func (output *Output) RemoveDefaultSpanTag(key string) {
delete(output.defaultSpanTags, key)
func (out *output) RemoveDefaultSpanTag(key string) {
delete(out.defaultSpanTags, key)
}

func (output *Output) filterDatapoints(datapoints []*datapoint.Datapoint) []*datapoint.Datapoint {
func (out *output) filterDatapoints(datapoints []*datapoint.Datapoint) []*datapoint.Datapoint {
filteredDatapoints := make([]*datapoint.Datapoint, 0, len(datapoints))
for _, dp := range datapoints {
if output.monitorFiltering.filterSet == nil || !output.monitorFiltering.filterSet.Matches(dp) {
if out.monitorFiltering.filterSet == nil || !out.monitorFiltering.filterSet.Matches(dp) {
filteredDatapoints = append(filteredDatapoints, dp)
}
}
Loading

0 comments on commit 3f6e6e9

Please sign in to comment.