diff --git a/service/documentation.md b/service/documentation.md new file mode 100644 index 00000000000..2cbd48a0708 --- /dev/null +++ b/service/documentation.md @@ -0,0 +1,55 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# service + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### process_cpu_seconds + +Total CPU user and system time in seconds + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| s | Sum | Double | true | + +### process_memory_rss + +Total physical memory (resident set size) + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| By | Gauge | Int | + +### process_runtime_heap_alloc_bytes + +Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc') + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| By | Gauge | Int | + +### process_runtime_total_alloc_bytes + +Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc') + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| By | Sum | Int | true | + +### process_runtime_total_sys_memory_bytes + +Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys') + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| By | Gauge | Int | + +### process_uptime + +Uptime of the process + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| s | Sum | Double | true | diff --git a/service/generated_package_test.go b/service/generated_package_test.go new file mode 100644 index 00000000000..545b6bb0c5e --- /dev/null +++ b/service/generated_package_test.go @@ -0,0 +1,13 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package service + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("go.opentelemetry.io/collector/service/internal/proctelemetry.InitPrometheusServer.func1")) +} diff --git a/service/internal/metadata/generated_telemetry.go b/service/internal/metadata/generated_telemetry.go new file mode 100644 index 00000000000..3d1c7b37a04 --- /dev/null +++ b/service/internal/metadata/generated_telemetry.go @@ -0,0 +1,155 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "context" + "errors" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("go.opentelemetry.io/collector/service") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("go.opentelemetry.io/collector/service") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + ProcessCPUSeconds metric.Float64ObservableCounter + observeProcessCPUSeconds func() float64 + ProcessMemoryRss metric.Int64ObservableGauge + observeProcessMemoryRss func() int64 + ProcessRuntimeHeapAllocBytes metric.Int64ObservableGauge + observeProcessRuntimeHeapAllocBytes func() int64 + ProcessRuntimeTotalAllocBytes metric.Int64ObservableCounter + observeProcessRuntimeTotalAllocBytes func() int64 + ProcessRuntimeTotalSysMemoryBytes metric.Int64ObservableGauge + observeProcessRuntimeTotalSysMemoryBytes func() int64 + ProcessUptime metric.Float64ObservableCounter + observeProcessUptime func() float64 +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithProcessCPUSecondsCallback sets callback for observable ProcessCPUSeconds metric. +func WithProcessCPUSecondsCallback(cb func() float64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessCPUSeconds = cb + } +} + +// WithProcessMemoryRssCallback sets callback for observable ProcessMemoryRss metric. +func WithProcessMemoryRssCallback(cb func() int64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessMemoryRss = cb + } +} + +// WithProcessRuntimeHeapAllocBytesCallback sets callback for observable ProcessRuntimeHeapAllocBytes metric. +func WithProcessRuntimeHeapAllocBytesCallback(cb func() int64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessRuntimeHeapAllocBytes = cb + } +} + +// WithProcessRuntimeTotalAllocBytesCallback sets callback for observable ProcessRuntimeTotalAllocBytes metric. +func WithProcessRuntimeTotalAllocBytesCallback(cb func() int64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessRuntimeTotalAllocBytes = cb + } +} + +// WithProcessRuntimeTotalSysMemoryBytesCallback sets callback for observable ProcessRuntimeTotalSysMemoryBytes metric. +func WithProcessRuntimeTotalSysMemoryBytesCallback(cb func() int64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessRuntimeTotalSysMemoryBytes = cb + } +} + +// WithProcessUptimeCallback sets callback for observable ProcessUptime metric. +func WithProcessUptimeCallback(cb func() float64) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeProcessUptime = cb + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op(&builder) + } + var err, errs error + meter := Meter(settings) + builder.ProcessCPUSeconds, err = meter.Float64ObservableCounter( + "process_cpu_seconds", + metric.WithDescription("Total CPU user and system time in seconds"), + metric.WithUnit("s"), + metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error { + o.Observe(builder.observeProcessCPUSeconds()) + return nil + }), + ) + errs = errors.Join(errs, err) + builder.ProcessMemoryRss, err = meter.Int64ObservableGauge( + "process_memory_rss", + metric.WithDescription("Total physical memory (resident set size)"), + metric.WithUnit("By"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(builder.observeProcessMemoryRss()) + return nil + }), + ) + errs = errors.Join(errs, err) + builder.ProcessRuntimeHeapAllocBytes, err = meter.Int64ObservableGauge( + "process_runtime_heap_alloc_bytes", + metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"), + metric.WithUnit("By"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(builder.observeProcessRuntimeHeapAllocBytes()) + return nil + }), + ) + errs = errors.Join(errs, err) + builder.ProcessRuntimeTotalAllocBytes, err = meter.Int64ObservableCounter( + "process_runtime_total_alloc_bytes", + metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"), + metric.WithUnit("By"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(builder.observeProcessRuntimeTotalAllocBytes()) + return nil + }), + ) + errs = errors.Join(errs, err) + builder.ProcessRuntimeTotalSysMemoryBytes, err = meter.Int64ObservableGauge( + "process_runtime_total_sys_memory_bytes", + metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"), + metric.WithUnit("By"), + metric.WithInt64Callback(func(_ context.Context, o metric.Int64Observer) error { + o.Observe(builder.observeProcessRuntimeTotalSysMemoryBytes()) + return nil + }), + ) + errs = errors.Join(errs, err) + builder.ProcessUptime, err = meter.Float64ObservableCounter( + "process_uptime", + metric.WithDescription("Uptime of the process"), + metric.WithUnit("s"), + metric.WithFloat64Callback(func(_ context.Context, o metric.Float64Observer) error { + o.Observe(builder.observeProcessUptime()) + return nil + }), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/service/internal/metadata/generated_telemetry_test.go b/service/internal/metadata/generated_telemetry_test.go new file mode 100644 index 00000000000..5e225295ba3 --- /dev/null +++ b/service/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "go.opentelemetry.io/collector/service", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "go.opentelemetry.io/collector/service", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/service/internal/proctelemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go index 991897f8b1b..9f8f0874e97 100644 --- a/service/internal/proctelemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -12,13 +12,10 @@ import ( "github.com/shirou/gopsutil/v3/common" "github.com/shirou/gopsutil/v3/process" - otelmetric "go.opentelemetry.io/otel/metric" - "go.uber.org/multierr" -) -const ( - scopeName = "go.opentelemetry.io/collector/service/process_telemetry" - processNameKey = "process_name" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/metadata" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" ) // processMetrics is a struct that contains views related to process metrics (cpu, mem, etc) @@ -28,13 +25,6 @@ type processMetrics struct { proc *process.Process context context.Context - otelProcessUptime otelmetric.Float64ObservableCounter - otelAllocMem otelmetric.Int64ObservableGauge - otelTotalAllocMem otelmetric.Int64ObservableCounter - otelSysMem otelmetric.Int64ObservableGauge - otelCPUSeconds otelmetric.Float64ObservableCounter - otelRSSMemory otelmetric.Int64ObservableGauge - // mu protects everything bellow. mu sync.Mutex lastMsRead time.Time @@ -64,7 +54,7 @@ func WithHostProc(hostProc string) RegisterOption { // RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure // basic information about this process. -func RegisterProcessMetrics(mp otelmetric.MeterProvider, ballastSizeBytes uint64, opts ...RegisterOption) error { +func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeBytes uint64, opts ...RegisterOption) error { set := registerOption{} for _, opt := range opts { opt.apply(&set) @@ -86,73 +76,15 @@ func RegisterProcessMetrics(mp otelmetric.MeterProvider, ballastSizeBytes uint64 return err } - return pm.record(mp.Meter(scopeName)) -} - -func (pm *processMetrics) record(meter otelmetric.Meter) error { - var errs, err error - - pm.otelProcessUptime, err = meter.Float64ObservableCounter( - "process_uptime", - otelmetric.WithDescription("Uptime of the process"), - otelmetric.WithUnit("s"), - otelmetric.WithFloat64Callback(func(_ context.Context, o otelmetric.Float64Observer) error { - o.Observe(pm.updateProcessUptime()) - return nil - })) - errs = multierr.Append(errs, err) - - pm.otelAllocMem, err = meter.Int64ObservableGauge( - "process_runtime_heap_alloc_bytes", - otelmetric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"), - otelmetric.WithUnit("By"), - otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(pm.updateAllocMem()) - return nil - })) - errs = multierr.Append(errs, err) - - pm.otelTotalAllocMem, err = meter.Int64ObservableCounter( - "process_runtime_total_alloc_bytes", - otelmetric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"), - otelmetric.WithUnit("By"), - otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(pm.updateTotalAllocMem()) - return nil - })) - errs = multierr.Append(errs, err) - - pm.otelSysMem, err = meter.Int64ObservableGauge( - "process_runtime_total_sys_memory_bytes", - otelmetric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"), - otelmetric.WithUnit("By"), - otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(pm.updateSysMem()) - return nil - })) - errs = multierr.Append(errs, err) - - pm.otelCPUSeconds, err = meter.Float64ObservableCounter( - "process_cpu_seconds", - otelmetric.WithDescription("Total CPU user and system time in seconds"), - otelmetric.WithUnit("s"), - otelmetric.WithFloat64Callback(func(_ context.Context, o otelmetric.Float64Observer) error { - o.Observe(pm.updateCPUSeconds()) - return nil - })) - errs = multierr.Append(errs, err) - - pm.otelRSSMemory, err = meter.Int64ObservableGauge( - "process_memory_rss", - otelmetric.WithDescription("Total physical memory (resident set size)"), - otelmetric.WithUnit("By"), - otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(pm.updateRSSMemory()) - return nil - })) - errs = multierr.Append(errs, err) - - return errs + _, err = metadata.NewTelemetryBuilder(cfg.ToComponentTelemetrySettings(&component.InstanceID{}), + metadata.WithProcessUptimeCallback(pm.updateProcessUptime), + metadata.WithProcessRuntimeHeapAllocBytesCallback(pm.updateAllocMem), + metadata.WithProcessRuntimeTotalAllocBytesCallback(pm.updateTotalAllocMem), + metadata.WithProcessRuntimeTotalSysMemoryBytesCallback(pm.updateSysMem), + metadata.WithProcessCPUSecondsCallback(pm.updateCPUSeconds), + metadata.WithProcessMemoryRssCallback(pm.updateRSSMemory), + ) + return err } func (pm *processMetrics) updateProcessUptime() float64 { diff --git a/service/internal/proctelemetry/process_telemetry_linux_test.go b/service/internal/proctelemetry/process_telemetry_linux_test.go index 1a15f28fc7e..73605c0ae8e 100644 --- a/service/internal/proctelemetry/process_telemetry_linux_test.go +++ b/service/internal/proctelemetry/process_telemetry_linux_test.go @@ -21,7 +21,7 @@ func TestProcessTelemetryWithHostProc(t *testing.T) { // Make the sure the environment variable value is not used. t.Setenv("HOST_PROC", "foo/bar") - require.NoError(t, RegisterProcessMetrics(tel.MeterProvider, 0, WithHostProc("/proc"))) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0, WithHostProc("/proc"))) // Check that the metrics are actually filled. time.Sleep(200 * time.Millisecond) diff --git a/service/internal/proctelemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go index 40d0f54ef27..f1da8d6a34c 100644 --- a/service/internal/proctelemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -20,13 +20,12 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" ) type testTelemetry struct { - component.TelemetrySettings + servicetelemetry.TelemetrySettings promHandler http.Handler meterProvider *sdkmetric.MeterProvider } @@ -42,7 +41,7 @@ var expectedMetrics = []string{ func setupTelemetry(t *testing.T) testTelemetry { settings := testTelemetry{ - TelemetrySettings: componenttest.NewNopTelemetrySettings(), + TelemetrySettings: servicetelemetry.NewNopTelemetrySettings(), } settings.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal @@ -79,7 +78,7 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli func TestProcessTelemetry(t *testing.T) { tel := setupTelemetry(t) - require.NoError(t, RegisterProcessMetrics(tel.MeterProvider, 0)) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0)) mp, err := fetchPrometheusMetrics(tel.promHandler) require.NoError(t, err) diff --git a/service/metadata.yaml b/service/metadata.yaml new file mode 100644 index 00000000000..d5417740d57 --- /dev/null +++ b/service/metadata.yaml @@ -0,0 +1,68 @@ +type: service + +status: + class: pkg + stability: + development: [traces, metrics, logs] + distributions: [core, contrib] + +tests: + goleak: + ignore: + top: + # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. + - "go.opencensus.io/stats/view.(*worker).start" + - "go.opentelemetry.io/collector/service/internal/proctelemetry.InitPrometheusServer.func1" + +telemetry: + metrics: + process_uptime: + enabled: true + description: Uptime of the process + unit: s + sum: + async: true + value_type: double + monotonic: true + + process_runtime_heap_alloc_bytes: + enabled: true + description: Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc') + unit: By + gauge: + async: true + value_type: int + + process_runtime_total_alloc_bytes: + enabled: true + description: Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc') + unit: By + sum: + async: true + value_type: int + monotonic: true + + process_runtime_total_sys_memory_bytes: + enabled: true + description: Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys') + unit: By + gauge: + async: true + value_type: int + + process_cpu_seconds: + enabled: true + description: Total CPU user and system time in seconds + unit: s + sum: + async: true + value_type: double + monotonic: true + + process_memory_rss: + enabled: true + description: Total physical memory (resident set size) + unit: By + gauge: + async: true + value_type: int diff --git a/service/service.go b/service/service.go index 53ea3eebe65..eb320cfec2c 100644 --- a/service/service.go +++ b/service/service.go @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//go:generate mdatagen metadata.yaml + package service // import "go.opentelemetry.io/collector/service" import ( @@ -289,7 +291,7 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" { // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings.MeterProvider, getBallastSize(srv.host)); err != nil { + if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings, getBallastSize(srv.host)); err != nil { return fmt.Errorf("failed to register process metrics: %w", err) } }