From d8d544d4bd5097642bfb9c4f44a08f56a0aa3e50 Mon Sep 17 00:00:00 2001 From: JeffreyHe <30763743+aheling11@users.noreply.github.com> Date: Wed, 20 Sep 2023 15:33:28 +0800 Subject: [PATCH] [receiver/skywalking] Add skywalking jvm metric receiver (#26315) **Description:** Adding a JVM receiver in Skywalking and translate most of the metrics into the OpenTelemetry format according to the [OT JVM semantic conventions.](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/jvm/jvm-metrics.md) However, for the GC metrics, as the metric type is like gauge while the corresponding metric in the semantic conventions `jvm.gc.duration`is histogram type, I'm not translate it strictly according to the semantic conventions. **Link to tracking Issue:** #20315 **Testing:** test it locally. I export the metrics to Prometheus and display them in Grafana. image **Documentation:** --- .chloggen/add_skywalking_metric.yaml | 27 +++ receiver/skywalkingreceiver/README.md | 9 +- receiver/skywalkingreceiver/factory.go | 31 +++- receiver/skywalkingreceiver/factory_test.go | 23 +-- .../internal/metadata/generated_status.go | 5 +- .../internal/metrics/metric_report_service.go | 69 ++++++++ .../metrics/skywalkingproto_to_metrics.go | 162 ++++++++++++++++++ receiver/skywalkingreceiver/metadata.yaml | 1 + .../skywalkingreceiver/skywalking_receiver.go | 21 ++- 9 files changed, 332 insertions(+), 16 deletions(-) create mode 100755 .chloggen/add_skywalking_metric.yaml create mode 100644 receiver/skywalkingreceiver/internal/metrics/metric_report_service.go create mode 100644 receiver/skywalkingreceiver/internal/metrics/skywalkingproto_to_metrics.go diff --git a/.chloggen/add_skywalking_metric.yaml b/.chloggen/add_skywalking_metric.yaml new file mode 100755 index 000000000000..9775dd6e10c5 --- /dev/null +++ b/.chloggen/add_skywalking_metric.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: skywaklingreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: implement receiver for JVM metrics in Skywalking and adapted it to the OpenTelemetry protocol. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [20315] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/skywalkingreceiver/README.md b/receiver/skywalkingreceiver/README.md index a96192aea66b..284a725979da 100644 --- a/receiver/skywalkingreceiver/README.md +++ b/receiver/skywalkingreceiver/README.md @@ -4,16 +4,20 @@ | Status | | | ------------- |-----------| | Stability | [beta]: traces | +| | [development]: metrics | | Distributions | [contrib], [sumo] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fskywalking%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fskywalking) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fskywalking%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fskywalking) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@JaredTan95](https://www.github.com/JaredTan95) | [beta]: https://github.com/open-telemetry/opentelemetry-collector#beta +[development]: https://github.com/open-telemetry/opentelemetry-collector#development [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [sumo]: https://github.com/SumoLogic/sumologic-otel-collector -Receives trace data in [Skywalking](https://skywalking.apache.org/) format. +Receives trace data and metric data in [Skywalking](https://skywalking.apache.org/) format. + +Note: The current metrics receiver only supports receiving JVM data. ## Getting Started @@ -40,5 +44,8 @@ service: pipelines: traces: receivers: [skywalking] + metrics: + receivers: [skywalking] + ``` diff --git a/receiver/skywalkingreceiver/factory.go b/receiver/skywalkingreceiver/factory.go index 53c9e36021ed..e7b885228cad 100644 --- a/receiver/skywalkingreceiver/factory.go +++ b/receiver/skywalkingreceiver/factory.go @@ -37,7 +37,8 @@ func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithTraces(createTracesReceiver, metadata.TracesStability)) + receiver.WithTraces(createTracesReceiver, metadata.TracesStability), + receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) } // CreateDefaultConfig creates the default configuration for Skywalking receiver. @@ -85,6 +86,34 @@ func createTracesReceiver( return r, nil } +// createMetricsReceiver creates a metrics receiver based on provided config. +func createMetricsReceiver( + _ context.Context, + set receiver.CreateSettings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (receiver.Metrics, error) { + + // Convert settings in the source c to configuration struct + // that Skywalking receiver understands. + rCfg := cfg.(*Config) + + c, err := createConfiguration(rCfg) + if err != nil { + return nil, err + } + + r := receivers.GetOrAdd(cfg, func() component.Component { + return newSkywalkingReceiver(c, set) + }) + + if err = r.Unwrap().(*swReceiver).registerMetricsConsumer(nextConsumer); err != nil { + return nil, err + } + + return r, nil +} + // create the config that Skywalking receiver will use. func createConfiguration(rCfg *Config) (*configuration, error) { var err error diff --git a/receiver/skywalkingreceiver/factory_test.go b/receiver/skywalkingreceiver/factory_test.go index b2cfe466fd74..a33643b7b1e5 100644 --- a/receiver/skywalkingreceiver/factory_test.go +++ b/receiver/skywalkingreceiver/factory_test.go @@ -50,12 +50,14 @@ func TestCreateReceiver(t *testing.T) { traceSink := new(consumertest.TracesSink) set := receivertest.NewNopCreateSettings() tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) - assert.NoError(t, err, "receiver creation failed") - assert.NotNil(t, tReceiver, "receiver creation failed") + assert.NoError(t, err, "trace receiver creation failed") + assert.NotNil(t, tReceiver, "trace receiver creation failed") + + metricSink := new(consumertest.MetricsSink) + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, metricSink) + assert.NoError(t, err, "metric receiver creation failed") + assert.NotNil(t, mReceiver, "metric receiver creation failed") - mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, nil) - assert.Equal(t, err, component.ErrDataTypeIsNotSupported) - assert.Nil(t, mReceiver) } func TestCreateReceiverGeneralConfig(t *testing.T) { @@ -71,12 +73,13 @@ func TestCreateReceiverGeneralConfig(t *testing.T) { set := receivertest.NewNopCreateSettings() traceSink := new(consumertest.TracesSink) tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) - assert.NoError(t, err, "receiver creation failed") - assert.NotNil(t, tReceiver, "receiver creation failed") + assert.NoError(t, err, "trace receiver creation failed") + assert.NotNil(t, tReceiver, "trace receiver creation failed") - mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, nil) - assert.Equal(t, err, component.ErrDataTypeIsNotSupported) - assert.Nil(t, mReceiver) + metricSink := new(consumertest.MetricsSink) + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, metricSink) + assert.NoError(t, err, "metric receiver creation failed") + assert.NotNil(t, mReceiver, "metric receiver creation failed") } func TestCreateDefaultGRPCEndpoint(t *testing.T) { diff --git a/receiver/skywalkingreceiver/internal/metadata/generated_status.go b/receiver/skywalkingreceiver/internal/metadata/generated_status.go index 41ab170a888d..5c7e14dd2798 100644 --- a/receiver/skywalkingreceiver/internal/metadata/generated_status.go +++ b/receiver/skywalkingreceiver/internal/metadata/generated_status.go @@ -7,6 +7,7 @@ import ( ) const ( - Type = "skywalking" - TracesStability = component.StabilityLevelBeta + Type = "skywalking" + TracesStability = component.StabilityLevelBeta + MetricsStability = component.StabilityLevelDevelopment ) diff --git a/receiver/skywalkingreceiver/internal/metrics/metric_report_service.go b/receiver/skywalkingreceiver/internal/metrics/metric_report_service.go new file mode 100644 index 000000000000..7241b728e65d --- /dev/null +++ b/receiver/skywalkingreceiver/internal/metrics/metric_report_service.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metrics" + +import ( + "context" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/receiver" + common "skywalking.apache.org/repo/goapi/collect/common/v3" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" +) + +const ( + collectorHTTPTransport = "http" + grpcTransport = "grpc" + failing = "failing" +) + +type Receiver struct { + nextConsumer consumer.Metrics + grpcObsrecv *obsreport.Receiver + httpObsrecv *obsreport.Receiver + agent.UnimplementedJVMMetricReportServiceServer +} + +// NewReceiver creates a new Receiver reference. +func NewReceiver(nextConsumer consumer.Metrics, set receiver.CreateSettings) (*Receiver, error) { + grpcObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: set.ID, + Transport: grpcTransport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } + httpObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: set.ID, + Transport: collectorHTTPTransport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } + return &Receiver{ + nextConsumer: nextConsumer, + grpcObsrecv: grpcObsrecv, + httpObsrecv: httpObsrecv, + }, nil +} + +// Collect implements the service Collect traces func. +func (r *Receiver) Collect(ctx context.Context, jvmMetricCollection *agent.JVMMetricCollection) (*common.Commands, error) { + err := consumeMetrics(ctx, jvmMetricCollection, r.nextConsumer) + if err != nil { + return &common.Commands{}, err + } + return &common.Commands{}, nil +} + +func consumeMetrics(ctx context.Context, collection *agent.JVMMetricCollection, nextConsumer consumer.Metrics) error { + if collection == nil { + return nil + } + pmd := SwMetricsToMetrics(collection) + return nextConsumer.ConsumeMetrics(ctx, pmd) + +} diff --git a/receiver/skywalkingreceiver/internal/metrics/skywalkingproto_to_metrics.go b/receiver/skywalkingreceiver/internal/metrics/skywalkingproto_to_metrics.go new file mode 100644 index 000000000000..442e9b0b44e9 --- /dev/null +++ b/receiver/skywalkingreceiver/internal/metrics/skywalkingproto_to_metrics.go @@ -0,0 +1,162 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metrics" + +import ( + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.18.0" + common "skywalking.apache.org/repo/goapi/collect/common/v3" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" +) + +const ( + jvmScopeName = "runtime_metrics" + gcCountMetricName = "sw.jvm.gc.count" + gcDurationMetricName = "sw.jvm.gc.duration" + MemoryPoolInitName = "jvm.memory.init" + MemoryPoolMaxName = "jvm.memory.max" + MemoryPoolUsedName = "jvm.memory.used" + MemoryPoolCommittedName = "jvm.memory.committed" + ThreadCountName = "jvm.thread.count" + CPUUtilizationName = "jvm.cpu.recent_utilization" +) + +func SwMetricsToMetrics(collection *agent.JVMMetricCollection) pmetric.Metrics { + md := pmetric.NewMetrics() + for _, jvmMetric := range collection.GetMetrics() { + resourceMetric := md.ResourceMetrics().AppendEmpty() + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + scopeMetric.Scope().SetName(jvmScopeName) + jvmMetricToResourceMetrics(jvmMetric, scopeMetric) + jvmMetricToResource(collection.Service, collection.ServiceInstance, resourceMetric.Resource()) + } + return md +} + +func jvmMetricToResource(serviceName string, serviceInstance string, resource pcommon.Resource) { + attrs := resource.Attributes() + attrs.EnsureCapacity(2) + attrs.PutStr(semconv.AttributeServiceName, serviceName) + attrs.PutStr(semconv.AttributeServiceInstanceID, serviceInstance) +} + +func jvmMetricToResourceMetrics(jvmMetric *agent.JVMMetric, sm pmetric.ScopeMetrics) { + // gc metric to otlp metric + gcMetricToMetrics(jvmMetric.Time, jvmMetric.Gc, sm) + // memory pool metric to otlp metric + memoryPoolMetricToMetrics(jvmMetric.Time, jvmMetric.MemoryPool, sm) + // thread metric to otlp metric + threadMetricToMetrics(jvmMetric.Time, jvmMetric.Thread, sm) + // cpu metric to otpl metric + cpuMetricToMetrics(jvmMetric.Time, jvmMetric.Cpu, sm) +} + +// gcMetricToMetrics translate gc metrics +func gcMetricToMetrics(timestamp int64, gcList []*agent.GC, dest pmetric.ScopeMetrics) { + // gc count and gc duration is not + metricCount := dest.Metrics().AppendEmpty() + metricCount.SetName(gcCountMetricName) + metricCountDps := metricCount.SetEmptyGauge().DataPoints() + + metricDuration := dest.Metrics().AppendEmpty() + metricDuration.SetName(gcDurationMetricName) + metricDuration.SetUnit("ms") + metricDurationDps := metricDuration.SetEmptyGauge().DataPoints() + for _, gc := range gcList { + attrs := buildGCAttrs(gc) + fillNumberDataPointIntValue(timestamp, gc.Count, metricCountDps.AppendEmpty(), attrs) + fillNumberDataPointIntValue(timestamp, gc.Time, metricDurationDps.AppendEmpty(), attrs) + } +} + +func buildGCAttrs(gc *agent.GC) pcommon.Map { + attrs := pcommon.NewMap() + attrs.PutStr("jvm.gc.name", gc.GetPhase().String()) + return attrs +} + +// memoryPoolMetricToMetrics translate memoryPool metrics +func memoryPoolMetricToMetrics(timestamp int64, memoryPools []*agent.MemoryPool, sm pmetric.ScopeMetrics) { + PoolNameArr := []string{MemoryPoolInitName, MemoryPoolUsedName, MemoryPoolMaxName, MemoryPoolCommittedName} + dpsMp := make(map[string]pmetric.NumberDataPointSlice) + for _, name := range PoolNameArr { + metric := sm.Metrics().AppendEmpty() + metric.SetName(name) + metric.SetUnit("By") + dpsMp[name] = metric.SetEmptyGauge().DataPoints() + } + for _, memoryPool := range memoryPools { + attrs := buildMemoryPoolAttrs(memoryPool) + fillNumberDataPointIntValue(timestamp, memoryPool.Init, dpsMp[MemoryPoolInitName].AppendEmpty(), attrs) + fillNumberDataPointIntValue(timestamp, memoryPool.Max, dpsMp[MemoryPoolMaxName].AppendEmpty(), attrs) + fillNumberDataPointIntValue(timestamp, memoryPool.Used, dpsMp[MemoryPoolUsedName].AppendEmpty(), attrs) + fillNumberDataPointIntValue(timestamp, memoryPool.Committed, dpsMp[MemoryPoolCommittedName].AppendEmpty(), attrs) + } + +} + +func buildMemoryPoolAttrs(pool *agent.MemoryPool) pcommon.Map { + attrs := pcommon.NewMap() + attrs.PutStr("jvm.memory.pool.name", pool.GetType().String()) + + var memoryType string + switch pool.GetType() { + case agent.PoolType_CODE_CACHE_USAGE, agent.PoolType_METASPACE_USAGE, agent.PoolType_PERMGEN_USAGE: + memoryType = "non_heap" + case agent.PoolType_NEWGEN_USAGE, agent.PoolType_OLDGEN_USAGE, agent.PoolType_SURVIVOR_USAGE: + memoryType = "heap" + default: + + } + attrs.PutStr("jvm.memory.type", memoryType) + return attrs +} + +func threadMetricToMetrics(timestamp int64, thread *agent.Thread, dest pmetric.ScopeMetrics) { + metric := dest.Metrics().AppendEmpty() + metric.SetName(ThreadCountName) + metric.SetUnit("{thread}") + metricDps := metric.SetEmptyGauge().DataPoints() + fillNumberDataPointIntValue(timestamp, thread.LiveCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("live")) + fillNumberDataPointIntValue(timestamp, thread.DaemonCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("daemon")) + fillNumberDataPointIntValue(timestamp, thread.PeakCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("peak")) + fillNumberDataPointIntValue(timestamp, thread.RunnableStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("runnable")) + fillNumberDataPointIntValue(timestamp, thread.BlockedStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("blocked")) + fillNumberDataPointIntValue(timestamp, thread.WaitingStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("waiting")) + fillNumberDataPointIntValue(timestamp, thread.TimedWaitingStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("time_waiting")) +} + +func buildThreadTypeAttrs(typeValue string) pcommon.Map { + attrs := pcommon.NewMap() + attrs.PutStr("thread.state", typeValue) + var isDaemon bool + if typeValue == "daemon" { + isDaemon = true + } + attrs.PutBool("thread.daemon", isDaemon) + return attrs +} + +func cpuMetricToMetrics(timestamp int64, cpu *common.CPU, dest pmetric.ScopeMetrics) { + metric := dest.Metrics().AppendEmpty() + metric.SetName(CPUUtilizationName) + metric.SetUnit("1") + metricDps := metric.SetEmptyGauge().DataPoints() + fillNumberDataPointDoubleValue(timestamp, cpu.UsagePercent, metricDps.AppendEmpty(), pcommon.NewMap()) +} + +func fillNumberDataPointIntValue(timestamp int64, value int64, point pmetric.NumberDataPoint, attrs pcommon.Map) { + attrs.CopyTo(point.Attributes()) + point.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(timestamp))) + point.SetIntValue(value) +} + +func fillNumberDataPointDoubleValue(timestamp int64, value float64, point pmetric.NumberDataPoint, attrs pcommon.Map) { + attrs.CopyTo(point.Attributes()) + point.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(timestamp))) + point.SetDoubleValue(value) +} diff --git a/receiver/skywalkingreceiver/metadata.yaml b/receiver/skywalkingreceiver/metadata.yaml index 76b1806a3750..43edd9e827a6 100644 --- a/receiver/skywalkingreceiver/metadata.yaml +++ b/receiver/skywalkingreceiver/metadata.yaml @@ -3,6 +3,7 @@ type: skywalking status: class: receiver stability: + development: [metrics] beta: [traces] distributions: [contrib, sumo] codeowners: diff --git a/receiver/skywalkingreceiver/skywalking_receiver.go b/receiver/skywalkingreceiver/skywalking_receiver.go index d46241c8b2eb..91e324609163 100644 --- a/receiver/skywalkingreceiver/skywalking_receiver.go +++ b/receiver/skywalkingreceiver/skywalking_receiver.go @@ -25,6 +25,7 @@ import ( profile "skywalking.apache.org/repo/goapi/collect/language/profile/v3" management "skywalking.apache.org/repo/goapi/collect/management/v3" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" ) @@ -51,6 +52,8 @@ type swReceiver struct { traceReceiver *trace.Receiver + metricsReceiver *metrics.Receiver + dummyReportService *dummyReportService } @@ -78,6 +81,19 @@ func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error { return nil } +// registerTraceConsumer register a TracesReceiver that receives trace +func (sr *swReceiver) registerMetricsConsumer(mc consumer.Metrics) error { + if mc == nil { + return component.ErrNilNextConsumer + } + var err error + sr.metricsReceiver, err = metrics.NewReceiver(mc, sr.settings) + if err != nil { + return err + } + return nil +} + func (sr *swReceiver) collectorGRPCAddr() string { var port int if sr.config != nil { @@ -156,6 +172,9 @@ func (sr *swReceiver) startCollector(host component.Host) error { if sr.traceReceiver != nil { v3.RegisterTraceSegmentReportServiceServer(sr.grpc, sr.traceReceiver) } + if sr.metricsReceiver != nil { + v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.metricsReceiver) + } sr.dummyReportService = &dummyReportService{} management.RegisterManagementServiceServer(sr.grpc, sr.dummyReportService) cds.RegisterConfigurationDiscoveryServiceServer(sr.grpc, sr.dummyReportService) @@ -164,8 +183,6 @@ func (sr *swReceiver) startCollector(host component.Host) error { v3.RegisterMeterReportServiceServer(sr.grpc, &meterService{}) v3.RegisterCLRMetricReportServiceServer(sr.grpc, &clrService{}) v3.RegisterBrowserPerfServiceServer(sr.grpc, sr.dummyReportService) - //TODO: add jvm metrics service - v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.dummyReportService) sr.goroutines.Add(1) go func() {