Skip to content

Commit

Permalink
add logger to observer handler and slight refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rmfitzpatrick committed Nov 29, 2023
1 parent 3ecd5cf commit e38305d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 70 deletions.
10 changes: 5 additions & 5 deletions receiver/receivercreator/internal/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ const (
var doubleUnderscoreRE = regexp.MustCompile("__")

// Receiver creator endpoint properties are the method of configuring individual receivers from a given observer endpoint environment.
// (io.opentelemetry.collector.receiver-creator|receiver-creator.collector.opentelemetry.io).<receiver-type(/name)>: <full mapping value>
// (io.opentelemetry.collector.receiver-creator|receiver-creator.collector.opentelemetry.io).<receiver-type(/name)>.config.<field>(<::subfield>)*: value
// (io.opentelemetry.collector.receiver-creator|receiver-creator.collector.opentelemetry.io).<receiver-type(/name)>.rule: value
// (io.opentelemetry.collector.receiver-creator|receiver-creator.collector.opentelemetry.io).<receiver-type(/name)>.resoureceiver-creatore_attributes.<resoureceiver-creatore_attribute>: value
// (io.opentelemetry.collector.receiver-creator|receiver-creator.collector.opentelemetry.io).<receiver-type(/name)>.resoureceiver-creatore_attributes: "attribute: value"
// (io.opentelemetry.collector.receiver-creator.|receiver-creator.collector.opentelemetry.io/)<receiver-type(/name)>: <full mapping value>
// (io.opentelemetry.collector.receiver-creator.|receiver-creator.collector.opentelemetry.io/)<receiver-type(/name)>.config.<field>(<::subfield>)*: value
// (io.opentelemetry.collector.receiver-creator.|receiver-creator.collector.opentelemetry.io/)<receiver-type(/name)>.rule: value
// (io.opentelemetry.collector.receiver-creator.|receiver-creator.collector.opentelemetry.io/)<receiver-type(/name)>.resource_attributes.<resource_attribute>: value
// (io.opentelemetry.collector.receiver-creator.|receiver-creator.collector.opentelemetry.io/)<receiver-type(/name)>.resource_attributes: "attribute: value"
// Parsing properties requires lookaheads (backtracking) so we use participle instead of re2.

var lex = lexer.MustSimple([]lexer.SimpleRule{
Expand Down
125 changes: 61 additions & 64 deletions receiver/receivercreator/observerhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type observerHandler struct {
nextTracesConsumer consumer.Traces
// runner starts and stops receiver instances.
runner runner
logger *zap.Logger
}

// shutdown all receivers started at runtime.
Expand Down Expand Up @@ -85,38 +86,29 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
var env observer.EndpointEnv
var err error
if env, err = e.Env(); err != nil {
obs.params.TelemetrySettings.Logger.Error("unable to convert endpoint to environment map", zap.String("endpoint", string(e.ID)), zap.Error(err))
obs.logger.Error("unable to convert endpoint to environment map", zap.String("endpoint", string(e.ID)), zap.Error(err))
continue
}

obs.params.TelemetrySettings.Logger.Debug("handling added endpoint", zap.Any("env", env))
obs.logger.Debug("handling added endpoint", zap.Any("env", env))

var receiverTemplates []receiverTemplate
if obs.config.AcceptEndpointProperties {
receiverTemplates = obs.updatedReceiverTemplatesFromEndpointProperties(details)
} else {
for _, template := range obs.config.receiverTemplates {
receiverTemplates = append(receiverTemplates, template)
}
}

for _, template := range receiverTemplates {
for _, template := range obs.updatedReceiverTemplatesFromEndpointProperties(details) {
if matches, ee := template.rule.eval(env); ee != nil {
obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(ee))
obs.logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(ee))
continue
} else if !matches {
obs.params.TelemetrySettings.Logger.Debug("rule doesn't match endpoint", zap.String("endpoint", string(e.ID)), zap.String("receiver", template.id.String()), zap.String("rule", template.Rule))
obs.logger.Debug("rule doesn't match endpoint", zap.String("endpoint", string(e.ID)), zap.String("receiver", template.id.String()), zap.String("rule", template.Rule))
continue
}

obs.params.TelemetrySettings.Logger.Info("starting receiver",
obs.logger.Info("starting receiver",
zap.String("name", template.id.String()),
zap.String("endpoint", e.Target),
zap.String("endpoint_id", string(e.ID)))

resolvedConfig, err := expandConfig(template.config, env)
if err != nil {
obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err))
obs.logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err))
continue
}

Expand All @@ -132,15 +124,15 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
// ones from using expr in their Target values.
discoveredConfig, err := expandConfig(discoveredCfg, env)
if err != nil {
obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err))
obs.logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err))
continue
}

resAttrs := map[string]string{}
for k, v := range template.ResourceAttributes {
strVal, ok := v.(string)
if !ok {
obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v))
obs.logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v))
continue
}
resAttrs[k] = strVal
Expand All @@ -158,7 +150,7 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
obs.nextMetricsConsumer,
obs.nextTracesConsumer,
); err != nil {
obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err))
obs.logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err))
continue
}

Expand All @@ -172,7 +164,7 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
discoveredConfig,
consumer,
); err != nil {
obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err))
obs.logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err))
continue
}

Expand All @@ -188,7 +180,7 @@ func (obs *observerHandler) OnRemove(removed []observer.Endpoint) {

for _, e := range removed {
// debug log the endpoint to improve usability
if ce := obs.params.TelemetrySettings.Logger.Check(zap.DebugLevel, "handling removed endpoint"); ce != nil {
if ce := obs.logger.Check(zap.DebugLevel, "handling removed endpoint"); ce != nil {
env, err := e.Env()
fields := []zap.Field{zap.String("endpoint_id", string(e.ID))}
if err == nil {
Expand All @@ -198,10 +190,10 @@ func (obs *observerHandler) OnRemove(removed []observer.Endpoint) {
}

for _, rcvr := range obs.receiversByEndpointID.Get(e.ID) {
obs.params.TelemetrySettings.Logger.Info("stopping receiver", zap.Reflect("receiver", rcvr), zap.String("endpoint_id", string(e.ID)))
obs.logger.Info("stopping receiver", zap.Reflect("receiver", rcvr), zap.String("endpoint_id", string(e.ID)))

if err := obs.runner.shutdown(rcvr); err != nil {
obs.params.TelemetrySettings.Logger.Error("failed to stop receiver", zap.Reflect("receiver", rcvr), zap.Error(err))
obs.logger.Error("failed to stop receiver", zap.Reflect("receiver", rcvr), zap.Error(err))
continue
}
}
Expand All @@ -223,11 +215,12 @@ type templateToUpdate struct {
}

func (obs *observerHandler) updatedReceiverTemplatesFromEndpointProperties(details observer.EndpointDetails) []receiverTemplate {
logger := obs.params.TelemetrySettings.Logger
// final templates to return for evaluation
var receiverTemplates []receiverTemplate

properties := internal.PropertyConfFromEndpointEnv(details, logger)
var properties *confmap.Conf
if obs.config.AcceptEndpointProperties {
properties = internal.PropertyConfFromEndpointEnv(details, obs.logger)
}
if properties == nil {
for _, template := range obs.config.receiverTemplates {
receiverTemplates = append(receiverTemplates, template)
Expand All @@ -239,42 +232,12 @@ func (obs *observerHandler) updatedReceiverTemplatesFromEndpointProperties(detai
propertiesSM := properties.ToStringMap()
for receiver, template := range obs.config.receiverTemplates {
if _, ok := propertiesSM[receiver]; !ok {
logger.Debug("receiver template unchanged by endpoint properties", zap.String("receiver", receiver))
obs.logger.Debug("receiver template unchanged by endpoint properties", zap.String("receiver", receiver))
receiverTemplates = append(receiverTemplates, template)
}
}

var templatesToUpdate []templateToUpdate

for receiver := range propertiesSM {
rProps, err := properties.Sub(receiver)
if err != nil {
logger.Info("failed extracting expected receiver properties", zap.String("receiver", receiver), zap.Error(err))
if template, ok := obs.config.receiverTemplates[receiver]; ok {
// fallback by not modifying existing template
receiverTemplates = append(receiverTemplates, template)
}
continue
}

var template receiverTemplate
var ok bool
if template, ok = obs.config.receiverTemplates[receiver]; ok {
// copy because we will be modifying from properties for only this endpoint
template = template.copy()
logger.Debug("existing receiver template updated by endpoint properties", zap.String("receiver", receiver))
} else {
if template, err = newReceiverTemplate(receiver, map[string]any{tmpPropertyCreatedTemplate: struct{}{}}); err != nil {
logger.Info("failed creating new receiver template for property values", zap.String("receiver", receiver), zap.Error(err))
continue
}
logger.Debug("receiver template created by endpoint properties", zap.String("receiver", receiver))
}

templatesToUpdate = append(templatesToUpdate, templateToUpdate{
propsConf: rProps, template: &template,
})
}
templatesToUpdate := obs.getTemplatesToUpdateWithProperties(receiverTemplates, properties)

for _, toUpdate := range templatesToUpdate {
rProps := toUpdate.propsConf
Expand All @@ -287,19 +250,19 @@ func (obs *observerHandler) updatedReceiverTemplatesFromEndpointProperties(detai
}

if propsConf, err := rProps.Sub(configKey); err != nil {
logger.Info("failed creating conf property values", zap.String("receiver", template.id.String()), zap.Error(err))
obs.logger.Info("failed creating conf property values", zap.String("receiver", template.id.String()), zap.Error(err))
} else {
templateConf := confmap.NewFromStringMap(template.config)
if err = templateConf.Merge(propsConf); err != nil {
logger.Info("failed merging conf property values", zap.String("receiver", template.id.String()), zap.Error(err))
obs.logger.Info("failed merging conf property values", zap.String("receiver", template.id.String()), zap.Error(err))
} else {
template.config = templateConf.ToStringMap()
}
}

if propRule, hasRule := rPropsSM[internal.RuleType]; hasRule {
if receiverRule, ok := propRule.(string); !ok || receiverRule == "" {
logger.Debug(
obs.logger.Debug(
"invalid property rule",
zap.String("receiver", template.id.String()),
zap.Any("rule", propRule),
Expand All @@ -311,7 +274,7 @@ func (obs *observerHandler) updatedReceiverTemplatesFromEndpointProperties(detai
}
} else {
if exprRule, e := newRule(receiverRule); e != nil {
logger.Info("failed determining valid rule for property-created template", zap.String("receiver", template.id.String()), zap.Error(e))
obs.logger.Info("failed determining valid rule for property-created template", zap.String("receiver", template.id.String()), zap.Error(e))
if propertyCreated || template.Rule == "" {
// there's nothing we can do without a valid rule for property-created templates
continue
Expand All @@ -322,13 +285,13 @@ func (obs *observerHandler) updatedReceiverTemplatesFromEndpointProperties(detai
}
}
} else if propertyCreated {
logger.Info("missing rule for property-created template", zap.String("receiver", template.id.String()))
obs.logger.Info("missing rule for property-created template", zap.String("receiver", template.id.String()))
continue
}

if _, hasResourceAttrs := rPropsSM["resource_attributes"]; hasResourceAttrs {
if propAttrConf, e := rProps.Sub("resource_attributes"); e != nil {
logger.Info("failed retrieving property-provided resource attributes", zap.String("receiver", template.id.String()), zap.Error(e))
obs.logger.Info("failed retrieving property-provided resource attributes", zap.String("receiver", template.id.String()), zap.Error(e))
} else {
for k, v := range propAttrConf.ToStringMap() {
template.ResourceAttributes[k] = v
Expand All @@ -339,3 +302,37 @@ func (obs *observerHandler) updatedReceiverTemplatesFromEndpointProperties(detai
}
return receiverTemplates
}

func (obs *observerHandler) getTemplatesToUpdateWithProperties(receiverTemplates []receiverTemplate, properties *confmap.Conf) []templateToUpdate {
var templatesToUpdate []templateToUpdate
for receiver := range properties.ToStringMap() {
rProps, err := properties.Sub(receiver)
if err != nil {
obs.logger.Info("failed extracting expected receiver properties", zap.String("receiver", receiver), zap.Error(err))
if template, ok := obs.config.receiverTemplates[receiver]; ok {
// fallback by not modifying existing template
receiverTemplates = append(receiverTemplates, template)
}
continue
}

var template receiverTemplate
var ok bool
if template, ok = obs.config.receiverTemplates[receiver]; ok {
// copy because we will be modifying from properties for only this endpoint
template = template.copy()
obs.logger.Debug("existing receiver template updated by endpoint properties", zap.String("receiver", receiver))
} else {
if template, err = newReceiverTemplate(receiver, map[string]any{tmpPropertyCreatedTemplate: struct{}{}}); err != nil {
obs.logger.Info("failed creating new receiver template for property values", zap.String("receiver", receiver), zap.Error(err))
continue
}
obs.logger.Debug("receiver template created by endpoint properties", zap.String("receiver", receiver))
}

templatesToUpdate = append(templatesToUpdate, templateToUpdate{
propsConf: rProps, template: &template,
})
}
return templatesToUpdate
}
3 changes: 2 additions & 1 deletion receiver/receivercreator/observerhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func newObserverHandler(
nextLogsConsumer: nextLogs,
nextMetricsConsumer: nextMetrics,
nextTracesConsumer: nextTraces,
logger: set.TelemetrySettings.Logger,
}, mr
}

Expand Down Expand Up @@ -789,7 +790,7 @@ func TestReceiverInvalidEndpointEnvContents(t *testing.T) {
core, obs := zapObserver.New(zap.DebugLevel)
logger := zap.New(core)

handler.params.TelemetrySettings.Logger = logger
handler.logger = logger
handler.OnAdd([]observer.Endpoint{tt.endpoint})

rcvr := mr.startedComponent
Expand Down
1 change: 1 addition & 0 deletions receiver/receivercreator/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (rc *receiverCreator) Start(_ context.Context, host component.Host) error {
nextMetricsConsumer: rc.nextMetricsConsumer,
nextTracesConsumer: rc.nextTracesConsumer,
runner: newReceiverRunner(rc.params, &loggingHost{host, rc.params.Logger}),
logger: rc.params.TelemetrySettings.Logger,
}

observers := map[component.ID]observer.Observable{}
Expand Down

0 comments on commit e38305d

Please sign in to comment.