Skip to content

Commit

Permalink
Add support for logs in cloudfoundryreceiver (#33044)
Browse files Browse the repository at this point in the history
**Description:** 

Adding support receive logs  from Cloudfoundry.

**Link to tracking Issue:** #32671

**Testing:** Basic testing inline with the current tests

**Documentation:**  

* Add  new section for logs and their attributes.
* Update behaviour of ShardID property (`rlp_gateway.shard_id`).

cc @CemDK  @m1rp

---------

Co-authored-by: Cem Deniz Kabakci <[email protected]>
Co-authored-by: Sam Clulow <[email protected]>
Co-authored-by: Cem Deniz Kabakci <[email protected]>
Co-authored-by: Tomás Mota <[email protected]>
Co-authored-by: Tomas Mota <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
7 people authored Jul 3, 2024
1 parent 50d01eb commit 5356c9d
Show file tree
Hide file tree
Showing 17 changed files with 432 additions and 110 deletions.
27 changes: 27 additions & 0 deletions .chloggen/cloudfoundryreceiver_add_logs_support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: cloudfoundryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support to receive CloudFoundry Logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32671]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
65 changes: 43 additions & 22 deletions receiver/cloudfoundryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: metrics |
| Stability | [development]: logs |
| | [beta]: metrics |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fcloudfoundry%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fcloudfoundry) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fcloudfoundry%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fcloudfoundry) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@crobert-1](https://www.github.com/crobert-1) \| Seeking more code owners! |
| Emeritus | [@agoallikmaa](https://www.github.com/agoallikmaa), [@pellared](https://www.github.com/pellared) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->
Expand Down Expand Up @@ -42,7 +44,7 @@ The receiver takes the following configuration options:
| --- | --- | --- |
| `rlp_gateway.endpoint` | required | URL of the RLP gateway, typically `https://log-stream.<cf-system-domain>` |
| `rlp_gateway.tls.insecure_skip_verify` | `false` | whether to skip TLS verify for the RLP gateway endpoint |
| `rlp_gateway.shard_id` | `opentelemetry` | metrics are load balanced among receivers that use the same shard ID, therefore this must only be set if there are multiple receivers which must both receive all the metrics instead of them being balanced between them |
| `rlp_gateway.shard_id` | `opentelemetry` | metrics or logs are load balanced among receivers that use the same shard ID, therefore this must only be set if there are multiple receivers which must both receive all the metrics instead of them being balanced between them. This string will be a prefix used to build a different ShardID for each envelope type; for logs the final ShardID will have the `_logs` suffix, for metrics will be `_metrics` |
| `uaa.endpoint` | required | URL of the UAA provider, typically `https://uaa.<cf-system-domain>` |
| `uaa.tls.insecure_skip_verify` | `false` | whether to skip TLS verify for the UAA endpoint |
| `uaa.username` | required | name of the UAA user (required grant types/authorities described above) |
Expand Down Expand Up @@ -73,18 +75,11 @@ receivers:
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
## Metrics
Reported metrics are grouped under an instrumentation library named `otelcol/cloudfoundry`. Metric names are as
specified by [Cloud Foundry metrics documentation](https://docs.cloudfoundry.org/running/all_metrics.html), but the
origin name is prepended to the metric name with `.` separator. All metrics either of type `Gauge` or `Sum`.

### Attributes
## Telemetry common Attributes
All the metrics have the following attributes:
The receiver maps the envelope attribute tags to the following OpenTelemetry attributes:
* `origin` - origin name as documented by Cloud Foundry
* `source` - for applications, the GUID of the application, otherwise equal to `origin`

For Cloud Foundry/Tanzu Application Service deployed in BOSH, the following attributes are also present, using their
canonical BOSH meanings:
Expand All @@ -94,21 +89,47 @@ canonical BOSH meanings:
* `ip` - BOSH instance IP
* `job` - BOSH job name

For metrics originating with `rep` origin name (specific to applications), the following metrics are present:
On TAS/PCF versions 2.8.0+ and cf-deployment versions v11.1.0+, the following additional attributes are present for application metrics: `app_id`, `app_name`, `space_id`, `space_name`, `organization_id`, `organization_name` which provide the GUID and name of application, space and organization respectively.

* `instance_id` - numerical index of the application instance. However, also present for `bbs` origin, where it matches
the value of `index`
* `process_id` - process ID (GUID). For a process of type "web" which is the main process of an application, this is
equal to `source_id` and `app_id`
This might not be a comprehensive list of attributes, as the receiver passes on whatever attributes the gateway
provides, which may include some that are specific to TAS and possibly new ones in future Cloud Foundry versions as
well.

## Metrics

Reported metrics are grouped under an instrumentation library named `otelcol/cloudfoundry`. Metric names are as
specified by [Cloud Foundry metrics documentation](https://docs.cloudfoundry.org/running/all_metrics.html), but the
origin name is prepended to the metric name with `.` separator. All metrics either of type `Gauge` or `Sum`.

### Attributes

The receiver maps the envelope attribute to the following OpenTelemetry attributes:

* `source_id` - for applications, the GUID of the application, otherwise equal to `origin`

For metrics originating with `rep` origin name (specific to applications), the following attributes are present:

* `instance_id` - numerical index of the application instance. However, also present for `bbs` origin, where it matches the value of `index`
* `process_id` - process ID (GUID). For a process of type "web" which is the main process of an application, this is equal to `source_id` and `app_id`
* `process_instance_id` - unique ID of a process instance, should be treated as an opaque string
* `process_type` - process type. Each application has exactly one process of type `web`, but many have any number of
other processes

On TAS/PCF versions 2.8.0+ and cf-deployment versions v11.1.0+, the following additional attributes are present for
application metrics: `app_id`, `app_name`, `space_id`, `space_name`, `organization_id`, `organization_name` which
provide the GUID and name of application, space and organization respectively.

This might not be a comprehensive list of attributes, as the receiver passes on whatever attributes the gateway
provides, which may include some that are specific to TAS and possibly new ones in future Cloud Foundry versions as
well.
## Logs

The receiver maps loggregator envelopes of these types to the following OpenTelemetry log severity text and severity number:
* type `OUT` becomes `info` and severity number `9`
* type `ERR` becomes `error` and severity number `17`
* If any other log types are received, they're discarded and result in an error log message in the collector.

### Attributes

The receiver maps the envelope attribute tags to the following OpenTelemetry attributes:

* `source_id` - for applications, the GUID of the application, otherwise the GUID of the log generator
* `source_type` - The source of the log, any subset of `{API|APP|CELL|HEALTH|LGR|RTR|SSH|STG}`, for `APP` type extra labels are separated by a dash, example: `APP/PROC/WEB`
* `instance_id` - numerical index of the origin. If origin is `rep` (`source_type` is `APP`) this is the application index. However, for other cases this is the instance index.
* `process_id` - process ID (GUID)
* `process_instance_id` - unique ID of a process instance, should be treated as an opaque string
* `process_type` - process type. Each application has exactly one process of type `web`, but many have any number of other processes
5 changes: 5 additions & 0 deletions receiver/cloudfoundryreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net/url"
"strings"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
Expand Down Expand Up @@ -47,6 +48,10 @@ func (c *Config) Validate() error {
return err
}

if strings.TrimSpace(c.RLPGateway.ShardID) == "" {
return errors.New("shardID cannot be empty")
}

err = validateURLOption("uaa.endpoint", c.UAA.Endpoint)
if err != nil {
return err
Expand Down
29 changes: 29 additions & 0 deletions receiver/cloudfoundryreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid"),
errorMessage: "failed to parse rlp_gateway.endpoint as url: parse \"https://[invalid\": missing ']' in host",
},
{
id: component.NewIDWithName(metadata.Type, "shardidnotdefined"),
expected: &Config{
RLPGateway: RLPGatewayConfig{
ClientConfig: confighttp.ClientConfig{
Endpoint: "https://log-stream.sys.example.internal",
TLSSetting: configtls.ClientConfig{
InsecureSkipVerify: true,
},
Timeout: time.Second * 20,
},
ShardID: "opentelemetry",
},
UAA: UAAConfig{
LimitedClientConfig: LimitedClientConfig{
Endpoint: "https://uaa.sys.example.internal",
TLSSetting: LimitedTLSClientSetting{
InsecureSkipVerify: true,
},
},
Username: "admin",
Password: "test",
},
},
},
}
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
Expand Down Expand Up @@ -96,6 +121,10 @@ func TestInvalidConfigValidation(t *testing.T) {
configuration.UAA.Password = ""
require.Error(t, configuration.Validate())

configuration = loadSuccessfulConfig(t)
configuration.RLPGateway.ShardID = ""
require.Error(t, configuration.Validate())

configuration = loadSuccessfulConfig(t)
configuration.UAA.Endpoint = "https://[invalid"
require.Error(t, configuration.Validate())
Expand Down
26 changes: 23 additions & 3 deletions receiver/cloudfoundryreceiver/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package cloudfoundryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudfoundryreceiver"

import (
"fmt"
"time"

"code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand All @@ -19,7 +21,6 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme
namePrefix := envelope.Tags["origin"] + "."

switch message := envelope.Message.(type) {
case *loggregator_v2.Envelope_Log:
case *loggregator_v2.Envelope_Counter:
metric := metricSlice.AppendEmpty()
metric.SetName(namePrefix + message.Counter.GetName())
Expand All @@ -41,15 +42,34 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme
}
}

func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogRecordSlice, startTime time.Time) error {
log := logSlice.AppendEmpty()
log.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp()))
log.SetObservedTimestamp(pcommon.NewTimestampFromTime(startTime))
logLine := string(envelope.GetLog().GetPayload())
log.Body().SetStr(logLine)
//exhaustive:enforce
switch envelope.GetLog().GetType() {
case loggregator_v2.Log_OUT:
log.SetSeverityText(plog.SeverityNumberInfo.String())
log.SetSeverityNumber(plog.SeverityNumberInfo)
case loggregator_v2.Log_ERR:
log.SetSeverityText(plog.SeverityNumberError.String())
log.SetSeverityNumber(plog.SeverityNumberError)
default:
return fmt.Errorf("unsupported envelope log type: %s", envelope.GetLog().GetType())
}
copyEnvelopeAttributes(log.Attributes(), envelope)
return nil
}

func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Envelope) {
for key, value := range envelope.Tags {
attributes.PutStr(attributeNamePrefix+key, value)
}

if envelope.SourceId != "" {
attributes.PutStr(attributeNamePrefix+"source_id", envelope.SourceId)
}

if envelope.InstanceId != "" {
attributes.PutStr(attributeNamePrefix+"instance_id", envelope.InstanceId)
}
Expand Down
97 changes: 92 additions & 5 deletions receiver/cloudfoundryreceiver/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand Down Expand Up @@ -53,14 +54,14 @@ func TestConvertCountEnvelope(t *testing.T) {
assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp())
assert.Equal(t, 10.0, dataPoint.DoubleValue())

assertAttributes(t, dataPoint.Attributes(), map[string]string{
assertAttributes(t, map[string]string{
"org.cloudfoundry.source_id": "uaa",
"org.cloudfoundry.origin": "gorouter",
"org.cloudfoundry.deployment": "cf",
"org.cloudfoundry.job": "router",
"org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246",
"org.cloudfoundry.ip": "10.244.0.34",
})
}, dataPoint.Attributes())
}

func TestConvertGaugeEnvelope(t *testing.T) {
Expand Down Expand Up @@ -129,7 +130,7 @@ func TestConvertGaugeEnvelope(t *testing.T) {
assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp())
assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp())
assert.Equal(t, 17046641.0, dataPoint.DoubleValue())
assertAttributes(t, dataPoint.Attributes(), expectedAttributes)
assertAttributes(t, expectedAttributes, dataPoint.Attributes())

metric = metricSlice.At(1 - memoryMetricPosition)
assert.Equal(t, "rep.disk", metric.Name())
Expand All @@ -139,10 +140,96 @@ func TestConvertGaugeEnvelope(t *testing.T) {
assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp())
assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp())
assert.Equal(t, 10231808.0, dataPoint.DoubleValue())
assertAttributes(t, dataPoint.Attributes(), expectedAttributes)
assertAttributes(t, expectedAttributes, dataPoint.Attributes())
}

func assertAttributes(t *testing.T, attributes pcommon.Map, expected map[string]string) {
func TestConvertLogsEnvelope(t *testing.T) {
now := time.Now()
before := time.Now().Add(-time.Second)
t.Parallel()
tests := []struct {
id string
envelope loggregator_v2.Envelope
expected map[string]any
}{
{
id: "normal-without-sourcetype-tag",
envelope: loggregator_v2.Envelope{
Timestamp: before.UnixNano(),
SourceId: "744e75bb-69d1-4cf4-b037-76875368097b",
Tags: map[string]string{},
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{
Payload: []byte(`test-app. Says Hello. on index: 0`),
Type: loggregator_v2.Log_OUT,
},
},
},
expected: map[string]any{
"Timestamp": before,
"Attributes": map[string]string{
"org.cloudfoundry.source_id": "744e75bb-69d1-4cf4-b037-76875368097b",
},
"Body": `test-app. Says Hello. on index: 0`,
"SeverityNumber": plog.SeverityNumberInfo,
"SeverityText": plog.SeverityNumberInfo.String(),
},
},
{
id: "json-log-with-sourcetype-error",
envelope: loggregator_v2.Envelope{
Timestamp: before.UnixNano(),
SourceId: "df75aec8-b937-4dc8-9b4d-c336e36e3895",
Tags: map[string]string{
"source_type": "APP/PROC/WEB",
"origin": "rep",
"deployment": "cf",
"job": "diego-cell",
"index": "bc276108-8282-48a5-bae7-c009c4392246",
"ip": "10.80.0.2",
},
Message: &loggregator_v2.Envelope_Log{
Log: &loggregator_v2.Log{
Payload: []byte(`{"timestamp":"2024-05-29T16:16:28.063062903Z","level":"info","source":"guardian","message":"guardian.api.garden-server.get-properties.got-properties","data":{"handle":"e885e8be-c6a7-43b1-5066-a821","session":"2.1.209666"}}`),
Type: loggregator_v2.Log_ERR,
},
},
},
expected: map[string]any{
"Timestamp": before,
"Attributes": map[string]string{
"org.cloudfoundry.source_id": "df75aec8-b937-4dc8-9b4d-c336e36e3895",
"org.cloudfoundry.source_type": "APP/PROC/WEB",
"org.cloudfoundry.origin": "rep",
"org.cloudfoundry.deployment": "cf",
"org.cloudfoundry.job": "diego-cell",
"org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246",
"org.cloudfoundry.ip": "10.80.0.2",
},
"Body": `{"timestamp":"2024-05-29T16:16:28.063062903Z","level":"info","source":"guardian","message":"guardian.api.garden-server.get-properties.got-properties","data":{"handle":"e885e8be-c6a7-43b1-5066-a821","session":"2.1.209666"}}`,
"SeverityNumber": plog.SeverityNumberError,
"SeverityText": plog.SeverityNumberError.String(),
},
},
}
for i := range tests {
tt := tests[i]
t.Run(tt.id, func(t *testing.T) {
logSlice := plog.NewLogRecordSlice()
e := convertEnvelopeToLogs(&tt.envelope, logSlice, now)
require.Equal(t, nil, e)
require.Equal(t, 1, logSlice.Len())
log := logSlice.At(0)
assert.Equal(t, tt.expected["Body"], log.Body().AsString())
assert.Equal(t, tt.expected["SeverityText"], log.SeverityText())
assert.Equal(t, pcommon.NewTimestampFromTime(tt.expected["Timestamp"].(time.Time)), log.Timestamp())
assert.Equal(t, pcommon.NewTimestampFromTime(now), log.ObservedTimestamp())
assertAttributes(t, tt.expected["Attributes"].(map[string]string), log.Attributes())
})
}
}

func assertAttributes(t *testing.T, expected map[string]string, attributes pcommon.Map) {
assert.Equal(t, len(expected), attributes.Len())

for key, expectedValue := range expected {
Expand Down
2 changes: 1 addition & 1 deletion receiver/cloudfoundryreceiver/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//go:generate mdatagen metadata.yaml

// Package cloudfoundryreceiver implements a receiver that can be used by the
// Opentelemetry collector to receive Cloud Foundry metrics via its Reverse
// OpenTelemetry collector to receive Cloud Foundry metrics and logs via its Reverse
// Log Proxy (RLP) Gateway component. The protocol is handled by the
// go-loggregator library, which uses HTTP to connect to the gateway and receive
// JSON-protobuf encoded v2 Envelope messages as documented by loggregator-api.
Expand Down
Loading

0 comments on commit 5356c9d

Please sign in to comment.