-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support adding process tags in OTEL via env variable #2220
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"github.com/open-telemetry/opentelemetry-collector/config" | ||
"github.com/open-telemetry/opentelemetry-collector/config/configmodels" | ||
"github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension" | ||
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor" | ||
"github.com/open-telemetry/opentelemetry-collector/receiver" | ||
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver" | ||
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver" | ||
|
@@ -36,7 +37,6 @@ const ( | |
httpThriftBinaryEndpoint = "localhost:14268" | ||
udpThriftCompactEndpoint = "localhost:6831" | ||
udpThriftBinaryEndpoint = "localhost:6832" | ||
httpSamplingEndpoint = "localhost:5778" | ||
) | ||
|
||
// CollectorConfig creates default collector configuration. | ||
|
@@ -56,17 +56,20 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config | |
recTypes = append(recTypes, string(v.Type())) | ||
} | ||
hc := factories.Extensions["health_check"].CreateDefaultConfig() | ||
resProcessor := factories.Processors["resource"].CreateDefaultConfig() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't the same approach going to be used for collector, as with the agent default config, to only use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, done |
||
return &configmodels.Config{ | ||
Receivers: receivers, | ||
Processors: configmodels.Processors{"resource": resProcessor}, | ||
Exporters: exporters, | ||
Extensions: configmodels.Extensions{"health_check": hc}, | ||
Service: configmodels.Service{ | ||
Extensions: []string{"health_check"}, | ||
Pipelines: configmodels.Pipelines{ | ||
"traces": { | ||
InputType: configmodels.TracesDataType, | ||
Receivers: recTypes, | ||
Exporters: expTypes, | ||
InputType: configmodels.TracesDataType, | ||
Receivers: recTypes, | ||
Processors: []string{"resource"}, | ||
Exporters: expTypes, | ||
}, | ||
}, | ||
}, | ||
|
@@ -125,17 +128,24 @@ func createExporters(storageTypes string, factories config.Factories) (configmod | |
func AgentConfig(factories config.Factories) *configmodels.Config { | ||
jaegerExporter := factories.Exporters["jaeger"] | ||
hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config) | ||
processors := configmodels.Processors{} | ||
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config) | ||
if len(resProcessor.Labels) > 0 { | ||
processors[resProcessor.Name()] = resProcessor | ||
} | ||
return &configmodels.Config{ | ||
Receivers: createAgentReceivers(factories), | ||
Processors: processors, | ||
Exporters: configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()}, | ||
Extensions: configmodels.Extensions{"health_check": hc}, | ||
Service: configmodels.Service{ | ||
Extensions: []string{"health_check"}, | ||
Pipelines: map[string]*configmodels.Pipeline{ | ||
"traces": { | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Exporters: []string{"jaeger"}, | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Processors: processorNames(processors), | ||
Exporters: []string{"jaeger"}, | ||
}, | ||
}, | ||
}, | ||
|
@@ -161,3 +171,11 @@ func createAgentReceivers(factories config.Factories) configmodels.Receivers { | |
} | ||
return recvs | ||
} | ||
|
||
func processorNames(processors configmodels.Processors) []string { | ||
var names []string | ||
for _, v := range processors { | ||
names = append(names, v.Name()) | ||
} | ||
return names | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ import ( | |
"github.com/open-telemetry/opentelemetry-collector/config" | ||
"github.com/open-telemetry/opentelemetry-collector/config/configmodels" | ||
"github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter" | ||
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor" | ||
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver" | ||
"github.com/spf13/viper" | ||
"github.com/stretchr/testify/assert" | ||
|
@@ -51,9 +52,10 @@ func TestDefaultCollectorConfig(t *testing.T) { | |
exporterTypes: []string{elasticsearch.TypeStr}, | ||
pipeline: configmodels.Pipelines{ | ||
"traces": { | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Exporters: []string{elasticsearch.TypeStr}, | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Processors: []string{"resource"}, | ||
Exporters: []string{elasticsearch.TypeStr}, | ||
}, | ||
}, | ||
}, | ||
|
@@ -63,9 +65,10 @@ func TestDefaultCollectorConfig(t *testing.T) { | |
exporterTypes: []string{cassandra.TypeStr}, | ||
pipeline: configmodels.Pipelines{ | ||
"traces": { | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Exporters: []string{cassandra.TypeStr}, | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Processors: []string{"resource"}, | ||
Exporters: []string{cassandra.TypeStr}, | ||
}, | ||
}, | ||
}, | ||
|
@@ -75,9 +78,10 @@ func TestDefaultCollectorConfig(t *testing.T) { | |
exporterTypes: []string{kafka.TypeStr}, | ||
pipeline: configmodels.Pipelines{ | ||
"traces": { | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Exporters: []string{kafka.TypeStr}, | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Processors: []string{"resource"}, | ||
Exporters: []string{kafka.TypeStr}, | ||
}, | ||
}, | ||
}, | ||
|
@@ -87,9 +91,10 @@ func TestDefaultCollectorConfig(t *testing.T) { | |
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr}, | ||
pipeline: configmodels.Pipelines{ | ||
"traces": { | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr}, | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Processors: []string{"resource"}, | ||
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr}, | ||
}, | ||
}, | ||
}, | ||
|
@@ -99,9 +104,10 @@ func TestDefaultCollectorConfig(t *testing.T) { | |
exporterTypes: []string{cassandra.TypeStr}, | ||
pipeline: configmodels.Pipelines{ | ||
"traces": { | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger", "zipkin"}, | ||
Exporters: []string{cassandra.TypeStr}, | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger", "zipkin"}, | ||
Processors: []string{"resource"}, | ||
Exporters: []string{cassandra.TypeStr}, | ||
}, | ||
}, | ||
}, | ||
|
@@ -128,6 +134,7 @@ func TestDefaultCollectorConfig(t *testing.T) { | |
assert.Equal(t, len(test.pipeline["traces"].Receivers), len(cfg.Receivers)) | ||
assert.Equal(t, "jaeger", cfg.Receivers["jaeger"].Name()) | ||
assert.Equal(t, len(test.exporterTypes), len(cfg.Exporters)) | ||
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"]) | ||
|
||
types := []string{} | ||
for _, v := range cfg.Exporters { | ||
|
@@ -141,22 +148,45 @@ func TestDefaultCollectorConfig(t *testing.T) { | |
} | ||
|
||
func TestDefaultAgentConfig(t *testing.T) { | ||
v, _ := jConfig.Viperize(grpc.AddFlags) | ||
factories := Components(v) | ||
cfg := AgentConfig(factories) | ||
assert.Equal(t, configmodels.Service{ | ||
Extensions: []string{"health_check"}, | ||
Pipelines: configmodels.Pipelines{ | ||
"traces": &configmodels.Pipeline{ | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Exporters: []string{"jaeger"}, | ||
tests := []struct { | ||
config map[string]interface{} | ||
service configmodels.Service | ||
}{ | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry should have mentioned in preview review - there needs to be a test without the flag being specified to confirm There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that was my intention when I refactored the test to use arrays, but I apparently forgot to add the second use case. |
||
config: map[string]interface{}{"resource.labels": "foo=bar"}, | ||
service: configmodels.Service{ | ||
Extensions: []string{"health_check"}, | ||
Pipelines: configmodels.Pipelines{ | ||
"traces": &configmodels.Pipeline{ | ||
InputType: configmodels.TracesDataType, | ||
Receivers: []string{"jaeger"}, | ||
Processors: []string{"resource"}, | ||
Exporters: []string{"jaeger"}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}, cfg.Service) | ||
assert.Equal(t, 0, len(cfg.Processors)) | ||
assert.Equal(t, 1, len(cfg.Receivers)) | ||
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"]) | ||
assert.Equal(t, 1, len(cfg.Exporters)) | ||
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"]) | ||
} | ||
for _, test := range tests { | ||
v, _ := jConfig.Viperize(grpc.AddFlags) | ||
for key, val := range test.config { | ||
v.Set(key, val) | ||
} | ||
factories := Components(v) | ||
cfg := AgentConfig(factories) | ||
|
||
assert.Equal(t, test.service, cfg.Service) | ||
assert.Equal(t, 1, len(cfg.Receivers)) | ||
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"]) | ||
assert.Equal(t, 1, len(cfg.Exporters)) | ||
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"]) | ||
processorMap := map[string]bool{} | ||
for _, p := range test.service.Pipelines["traces"].Processors { | ||
processorMap[p] = true | ||
} | ||
if processorMap["resource"] { | ||
assert.Equal(t, 1, len(cfg.Processors)) | ||
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"]) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be named
JaegerTagsDeprecated
as not specific to agent?Assuming not specific to agent - the changes seem limited to the agent config at the moment - shouldn't they also be applied to collector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collector uses only
--collector.tags