Skip to content

Commit

Permalink
[receiver/skywalking] Add skywalking jvm metric receiver (open-teleme…
Browse files Browse the repository at this point in the history
…try#26315)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Adding a JVM receiver in Skywalking and translate most of the metrics
into the OpenTelemetry format according to the [OT JVM semantic
conventions.](https://github.com/open-telemetry/semantic-conventions/blob/main/docs/jvm/jvm-metrics.md)
However, for the GC metrics, as the metric type is like gauge while the
corresponding metric in the semantic conventions `jvm.gc.duration`is
histogram type, I'm not translate it strictly according to the semantic
conventions.
**Link to tracking Issue:** <Issue number if applicable>
open-telemetry#20315 
**Testing:** <Describe what testing was performed and which tests were
added.>

test it locally. I export the metrics to Prometheus and display them in
Grafana.
<img width="1767" alt="image"
src="https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/30763743/97541490-d969-4a28-b5d8-87a04dd25b1d">


**Documentation:** <Describe the documentation added.>
  • Loading branch information
aheling11 authored and jmsnll committed Nov 12, 2023
1 parent d788c99 commit d8d544d
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 16 deletions.
27 changes: 27 additions & 0 deletions .chloggen/add_skywalking_metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: skywaklingreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: implement receiver for JVM metrics in Skywalking and adapted it to the OpenTelemetry protocol.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20315]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 8 additions & 1 deletion receiver/skywalkingreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
| Status | |
| ------------- |-----------|
| Stability | [beta]: traces |
| | [development]: metrics |
| Distributions | [contrib], [sumo] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fskywalking%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fskywalking) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fskywalking%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fskywalking) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@JaredTan95](https://www.github.com/JaredTan95) |

[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
<!-- end autogenerated section -->

Receives trace data in [Skywalking](https://skywalking.apache.org/) format.
Receives trace data and metric data in [Skywalking](https://skywalking.apache.org/) format.

Note: The current metrics receiver only supports receiving JVM data.

## Getting Started

Expand All @@ -40,5 +44,8 @@ service:
pipelines:
traces:
receivers: [skywalking]
metrics:
receivers: [skywalking]

```
31 changes: 30 additions & 1 deletion receiver/skywalkingreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithTraces(createTracesReceiver, metadata.TracesStability))
receiver.WithTraces(createTracesReceiver, metadata.TracesStability),
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
}

// CreateDefaultConfig creates the default configuration for Skywalking receiver.
Expand Down Expand Up @@ -85,6 +86,34 @@ func createTracesReceiver(
return r, nil
}

// createMetricsReceiver creates a metrics receiver based on provided config.
func createMetricsReceiver(
_ context.Context,
set receiver.CreateSettings,
cfg component.Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {

// Convert settings in the source c to configuration struct
// that Skywalking receiver understands.
rCfg := cfg.(*Config)

c, err := createConfiguration(rCfg)
if err != nil {
return nil, err
}

r := receivers.GetOrAdd(cfg, func() component.Component {
return newSkywalkingReceiver(c, set)
})

if err = r.Unwrap().(*swReceiver).registerMetricsConsumer(nextConsumer); err != nil {
return nil, err
}

return r, nil
}

// create the config that Skywalking receiver will use.
func createConfiguration(rCfg *Config) (*configuration, error) {
var err error
Expand Down
23 changes: 13 additions & 10 deletions receiver/skywalkingreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ func TestCreateReceiver(t *testing.T) {
traceSink := new(consumertest.TracesSink)
set := receivertest.NewNopCreateSettings()
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.NoError(t, err, "trace receiver creation failed")
assert.NotNil(t, tReceiver, "trace receiver creation failed")

metricSink := new(consumertest.MetricsSink)
mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, metricSink)
assert.NoError(t, err, "metric receiver creation failed")
assert.NotNil(t, mReceiver, "metric receiver creation failed")

mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, nil)
assert.Equal(t, err, component.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestCreateReceiverGeneralConfig(t *testing.T) {
Expand All @@ -71,12 +73,13 @@ func TestCreateReceiverGeneralConfig(t *testing.T) {
set := receivertest.NewNopCreateSettings()
traceSink := new(consumertest.TracesSink)
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.NoError(t, err, "trace receiver creation failed")
assert.NotNil(t, tReceiver, "trace receiver creation failed")

mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, nil)
assert.Equal(t, err, component.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
metricSink := new(consumertest.MetricsSink)
mReceiver, err := factory.CreateMetricsReceiver(context.Background(), set, cfg, metricSink)
assert.NoError(t, err, "metric receiver creation failed")
assert.NotNil(t, mReceiver, "metric receiver creation failed")
}

func TestCreateDefaultGRPCEndpoint(t *testing.T) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metrics"

import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver"
common "skywalking.apache.org/repo/goapi/collect/common/v3"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

const (
collectorHTTPTransport = "http"
grpcTransport = "grpc"
failing = "failing"
)

type Receiver struct {
nextConsumer consumer.Metrics
grpcObsrecv *obsreport.Receiver
httpObsrecv *obsreport.Receiver
agent.UnimplementedJVMMetricReportServiceServer
}

// NewReceiver creates a new Receiver reference.
func NewReceiver(nextConsumer consumer.Metrics, set receiver.CreateSettings) (*Receiver, error) {
grpcObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: grpcTransport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
httpObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{
ReceiverID: set.ID,
Transport: collectorHTTPTransport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
return &Receiver{
nextConsumer: nextConsumer,
grpcObsrecv: grpcObsrecv,
httpObsrecv: httpObsrecv,
}, nil
}

// Collect implements the service Collect traces func.
func (r *Receiver) Collect(ctx context.Context, jvmMetricCollection *agent.JVMMetricCollection) (*common.Commands, error) {
err := consumeMetrics(ctx, jvmMetricCollection, r.nextConsumer)
if err != nil {
return &common.Commands{}, err
}
return &common.Commands{}, nil
}

func consumeMetrics(ctx context.Context, collection *agent.JVMMetricCollection, nextConsumer consumer.Metrics) error {
if collection == nil {
return nil
}
pmd := SwMetricsToMetrics(collection)
return nextConsumer.ConsumeMetrics(ctx, pmd)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metrics"

import (
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
common "skywalking.apache.org/repo/goapi/collect/common/v3"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

const (
jvmScopeName = "runtime_metrics"
gcCountMetricName = "sw.jvm.gc.count"
gcDurationMetricName = "sw.jvm.gc.duration"
MemoryPoolInitName = "jvm.memory.init"
MemoryPoolMaxName = "jvm.memory.max"
MemoryPoolUsedName = "jvm.memory.used"
MemoryPoolCommittedName = "jvm.memory.committed"
ThreadCountName = "jvm.thread.count"
CPUUtilizationName = "jvm.cpu.recent_utilization"
)

func SwMetricsToMetrics(collection *agent.JVMMetricCollection) pmetric.Metrics {
md := pmetric.NewMetrics()
for _, jvmMetric := range collection.GetMetrics() {
resourceMetric := md.ResourceMetrics().AppendEmpty()
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
scopeMetric.Scope().SetName(jvmScopeName)
jvmMetricToResourceMetrics(jvmMetric, scopeMetric)
jvmMetricToResource(collection.Service, collection.ServiceInstance, resourceMetric.Resource())
}
return md
}

func jvmMetricToResource(serviceName string, serviceInstance string, resource pcommon.Resource) {
attrs := resource.Attributes()
attrs.EnsureCapacity(2)
attrs.PutStr(semconv.AttributeServiceName, serviceName)
attrs.PutStr(semconv.AttributeServiceInstanceID, serviceInstance)
}

func jvmMetricToResourceMetrics(jvmMetric *agent.JVMMetric, sm pmetric.ScopeMetrics) {
// gc metric to otlp metric
gcMetricToMetrics(jvmMetric.Time, jvmMetric.Gc, sm)
// memory pool metric to otlp metric
memoryPoolMetricToMetrics(jvmMetric.Time, jvmMetric.MemoryPool, sm)
// thread metric to otlp metric
threadMetricToMetrics(jvmMetric.Time, jvmMetric.Thread, sm)
// cpu metric to otpl metric
cpuMetricToMetrics(jvmMetric.Time, jvmMetric.Cpu, sm)
}

// gcMetricToMetrics translate gc metrics
func gcMetricToMetrics(timestamp int64, gcList []*agent.GC, dest pmetric.ScopeMetrics) {
// gc count and gc duration is not
metricCount := dest.Metrics().AppendEmpty()
metricCount.SetName(gcCountMetricName)
metricCountDps := metricCount.SetEmptyGauge().DataPoints()

metricDuration := dest.Metrics().AppendEmpty()
metricDuration.SetName(gcDurationMetricName)
metricDuration.SetUnit("ms")
metricDurationDps := metricDuration.SetEmptyGauge().DataPoints()
for _, gc := range gcList {
attrs := buildGCAttrs(gc)
fillNumberDataPointIntValue(timestamp, gc.Count, metricCountDps.AppendEmpty(), attrs)
fillNumberDataPointIntValue(timestamp, gc.Time, metricDurationDps.AppendEmpty(), attrs)
}
}

func buildGCAttrs(gc *agent.GC) pcommon.Map {
attrs := pcommon.NewMap()
attrs.PutStr("jvm.gc.name", gc.GetPhase().String())
return attrs
}

// memoryPoolMetricToMetrics translate memoryPool metrics
func memoryPoolMetricToMetrics(timestamp int64, memoryPools []*agent.MemoryPool, sm pmetric.ScopeMetrics) {
PoolNameArr := []string{MemoryPoolInitName, MemoryPoolUsedName, MemoryPoolMaxName, MemoryPoolCommittedName}
dpsMp := make(map[string]pmetric.NumberDataPointSlice)
for _, name := range PoolNameArr {
metric := sm.Metrics().AppendEmpty()
metric.SetName(name)
metric.SetUnit("By")
dpsMp[name] = metric.SetEmptyGauge().DataPoints()
}
for _, memoryPool := range memoryPools {
attrs := buildMemoryPoolAttrs(memoryPool)
fillNumberDataPointIntValue(timestamp, memoryPool.Init, dpsMp[MemoryPoolInitName].AppendEmpty(), attrs)
fillNumberDataPointIntValue(timestamp, memoryPool.Max, dpsMp[MemoryPoolMaxName].AppendEmpty(), attrs)
fillNumberDataPointIntValue(timestamp, memoryPool.Used, dpsMp[MemoryPoolUsedName].AppendEmpty(), attrs)
fillNumberDataPointIntValue(timestamp, memoryPool.Committed, dpsMp[MemoryPoolCommittedName].AppendEmpty(), attrs)
}

}

func buildMemoryPoolAttrs(pool *agent.MemoryPool) pcommon.Map {
attrs := pcommon.NewMap()
attrs.PutStr("jvm.memory.pool.name", pool.GetType().String())

var memoryType string
switch pool.GetType() {
case agent.PoolType_CODE_CACHE_USAGE, agent.PoolType_METASPACE_USAGE, agent.PoolType_PERMGEN_USAGE:
memoryType = "non_heap"
case agent.PoolType_NEWGEN_USAGE, agent.PoolType_OLDGEN_USAGE, agent.PoolType_SURVIVOR_USAGE:
memoryType = "heap"
default:

}
attrs.PutStr("jvm.memory.type", memoryType)
return attrs
}

func threadMetricToMetrics(timestamp int64, thread *agent.Thread, dest pmetric.ScopeMetrics) {
metric := dest.Metrics().AppendEmpty()
metric.SetName(ThreadCountName)
metric.SetUnit("{thread}")
metricDps := metric.SetEmptyGauge().DataPoints()
fillNumberDataPointIntValue(timestamp, thread.LiveCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("live"))
fillNumberDataPointIntValue(timestamp, thread.DaemonCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("daemon"))
fillNumberDataPointIntValue(timestamp, thread.PeakCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("peak"))
fillNumberDataPointIntValue(timestamp, thread.RunnableStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("runnable"))
fillNumberDataPointIntValue(timestamp, thread.BlockedStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("blocked"))
fillNumberDataPointIntValue(timestamp, thread.WaitingStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("waiting"))
fillNumberDataPointIntValue(timestamp, thread.TimedWaitingStateThreadCount, metricDps.AppendEmpty(), buildThreadTypeAttrs("time_waiting"))
}

func buildThreadTypeAttrs(typeValue string) pcommon.Map {
attrs := pcommon.NewMap()
attrs.PutStr("thread.state", typeValue)
var isDaemon bool
if typeValue == "daemon" {
isDaemon = true
}
attrs.PutBool("thread.daemon", isDaemon)
return attrs
}

func cpuMetricToMetrics(timestamp int64, cpu *common.CPU, dest pmetric.ScopeMetrics) {
metric := dest.Metrics().AppendEmpty()
metric.SetName(CPUUtilizationName)
metric.SetUnit("1")
metricDps := metric.SetEmptyGauge().DataPoints()
fillNumberDataPointDoubleValue(timestamp, cpu.UsagePercent, metricDps.AppendEmpty(), pcommon.NewMap())
}

func fillNumberDataPointIntValue(timestamp int64, value int64, point pmetric.NumberDataPoint, attrs pcommon.Map) {
attrs.CopyTo(point.Attributes())
point.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(timestamp)))
point.SetIntValue(value)
}

func fillNumberDataPointDoubleValue(timestamp int64, value float64, point pmetric.NumberDataPoint, attrs pcommon.Map) {
attrs.CopyTo(point.Attributes())
point.SetTimestamp(pcommon.NewTimestampFromTime(time.UnixMilli(timestamp)))
point.SetDoubleValue(value)
}
1 change: 1 addition & 0 deletions receiver/skywalkingreceiver/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ type: skywalking
status:
class: receiver
stability:
development: [metrics]
beta: [traces]
distributions: [contrib, sumo]
codeowners:
Expand Down
Loading

0 comments on commit d8d544d

Please sign in to comment.