Skip to content

Commit

Permalink
Supportability: print own metrics in logs
Browse files Browse the repository at this point in the history
Resolves open-telemetry#2098

The Collector's own metrics are an important source of information for troubleshooting.
Typically own metrics are scraped using a Prometheus receiver and send to a backend where
they can be examined. However, this only works if the Collector's metric pipeline works
and the backend is available. If this is not the case, which often happens when the
Collector is misconfigured and cannot send metrics or the backend is not available,
then these metrics are not possible to see anywhere.

In an effort to improve supportability of the Collector in such situations we want to
output own metrics in a log file. In difficult situations the local Collector log is
the important source of troubleshooting information. We periodically log the
metric values in a human readable form.

When --log-level=debug command line option is passed the metrics are logged as often
as they are exported by stats (currently every 10 seconds). If the --log-level is not
set to debug then metrics are logged once every 5 minutes.

Sample output:

```
2020-11-10T09:59:34.884-0500	INFO	service/telemetry_log.go:234
  Internal Metrics:
  Metric                                            | Value
  --------------------------------------------------|--------------------------------
  exporter/send_failed_log_records                  |
  exporter/sent_log_records                         |
  fluent_closed_connections                         |
  fluent_events_parsed                              |
  fluent_opened_connections                         |
  fluent_parse_failures                             |
  fluent_records_generated                          |
  grpc.io/client/completed_rpcs                     |
  grpc.io/client/received_bytes_per_rpc             |
  grpc.io/client/received_messages_per_rpc          |
  grpc.io/client/roundtrip_latency                  |
  grpc.io/client/sent_bytes_per_rpc                 |
  grpc.io/client/sent_messages_per_rpc              |
  grpc.io/server/completed_rpcs                     |
  grpc.io/server/received_bytes_per_rpc             |
  grpc.io/server/received_messages_per_rpc          |
  grpc.io/server/sent_bytes_per_rpc                 |
  grpc.io/server/sent_messages_per_rpc              |
  grpc.io/server/server_latency                     |
  kafka_receiver_current_offset                     |
  kafka_receiver_messages                           |
  kafka_receiver_offset_lag                         |
  kafka_receiver_partition_close                    |
  kafka_receiver_partition_start                    |
  process/cpu_seconds                               |            0
  process/memory/rss                                |   44,625,920
  process/runtime/heap_alloc_bytes                  |   13,168,120 By
  process/runtime/total_alloc_bytes                 |   28,170,760 By
  process/runtime/total_sys_memory_bytes            |   76,366,848 By
  process/uptime                                    |    55.006789 s
  processor/accepted_log_records                    |
  processor/accepted_metric_points                  |
  processor/accepted_spans                          |
  processor/batch/batch_send_size_bytes             |
  processor/batch/batch_size_trigger_send           |
  processor/batches_received                        |
  processor/dropped_log_records                     |
  processor/dropped_metric_points                   |
  processor/dropped_spans                           |
  processor/queued_retry/fail_send                  |
  processor/queued_retry/queue_latency              |
  processor/queued_retry/queue_length               |
  processor/queued_retry/send_latency               |
  processor/queued_retry/success_send               |
  processor/refused_log_records                     |
  processor/refused_metric_points                   |
  processor/refused_spans                           |
  processor/spans_dropped                           |
  processor/spans_received                          |
  processor/trace_batches_dropped                   |
  receiver/accepted_log_records                     |
  receiver/refused_log_records                      |
  scraper/errored_metric_points                     |
  scraper/scraped_metric_points                     |
  --------------------------------------------------|--------------------------------

  Component/Dimensions                              | Metric                                  | Value
  --------------------------------------------------|-----------------------------------------|--------------------------------
  exporter=otlphttp                                 | exporter/send_failed_metric_points      |           57
  exporter=otlphttp                                 | exporter/send_failed_spans              |        1,085
  exporter=otlphttp                                 | exporter/sent_metric_points             |            0
  exporter=otlphttp                                 | exporter/sent_spans                     |            0
  processor=batch                                   | processor/batch/batch_send_size         | 2/33.439024/80 (min/mean/max)	Occurrences=41
  processor=batch                                   | processor/batch/timeout_trigger_send    |           41
  receiver=jaeger, transport=collector_http         | receiver/accepted_spans                 |        1,306
  receiver=jaeger, transport=collector_http         | receiver/refused_spans                  |            0
  receiver=prometheus, transport=http               | receiver/accepted_metric_points         |           77
  receiver=prometheus, transport=http               | receiver/refused_metric_points          |            0
  --------------------------------------------------|-----------------------------------------|--------------------------------
```
  • Loading branch information
tigrannajaryan committed Nov 12, 2020
1 parent 32efe81 commit 1559a49
Show file tree
Hide file tree
Showing 3 changed files with 345 additions and 1 deletion.
5 changes: 4 additions & 1 deletion service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,12 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
if err != nil {
return err
}

view.RegisterExporter(pe)

// Also export our own metrics to our logs.
me := newMetricsToLogExporter(logger)
view.RegisterExporter(me)

logger.Info(
"Serving Prometheus metrics",
zap.String("address", metricsAddr),
Expand Down
241 changes: 241 additions & 0 deletions service/telemetry_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service

import (
"bytes"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/text/language"
"golang.org/x/text/message"
)

// metricsToLogExporter allows to periodically dump the values of metrics to logs.
type metricsToLogExporter struct {
// logger to output to.
logger *zap.Logger

// pendingRecords that are accumulated since last output period.
pendingRecords metricLogRecords
pendingRecordsMutex sync.Mutex

// outputTicker timer to wait until all records are ready for this output period.
outputTicker *time.Ticker

// Last time the records were written to logger.
lastLogged time.Time
}

const (
// Minimum time interval between printing metric data to logs when debug logs
// are disabled.
nonDebugLogMinInterval = 5 * time.Minute

// Time to wait for ExportView() calls to arrive for record batching purposes.
outputBounceTimeInterval = 100 * time.Millisecond
)

// metricLogRecord contains a formatted metric data that is ready to be printed as a log
// record.
type metricLogRecord struct {
name string
tags string
value string
}

type metricLogRecords []metricLogRecord

func (mls metricLogRecords) Len() int {
return len(mls)
}

func (mls metricLogRecords) Less(i, j int) bool {
c := strings.Compare(mls[i].tags, mls[j].tags)
if c == 0 {
return mls[i].name < mls[j].name
}
return c < 0
}

func (mls metricLogRecords) Swap(i, j int) {
mls[i], mls[j] = mls[j], mls[i]
}

func newMetricsToLogExporter(logger *zap.Logger) *metricsToLogExporter {
me := &metricsToLogExporter{
logger: logger,
outputTicker: time.NewTicker(outputBounceTimeInterval),
}
me.outputTicker.Stop()
go me.doPeriodicOutput()
return me
}

func viewDataToRecord(viewData *view.Data) metricLogRecord {
mlr := metricLogRecord{
name: viewData.View.Name,
}

var buffer bytes.Buffer

printer := message.NewPrinter(language.English)

for _, row := range viewData.Rows {
buffer.Reset()
if len(row.Tags) > 0 {
// Prepare list of tags in the form of key1=value1, key2=value2, ...
for i, t := range row.Tags {
if i > 0 {
buffer.WriteString(", ")
}
buffer.WriteString(fmt.Sprintf("%v=%v", t.Key.Name(), t.Value))
}
mlr.tags = buffer.String()
}

// Format value of the metric as a string.
buffer.Reset()
switch v := row.Data.(type) {
case *view.SumData:
buffer.WriteString(floatToStr(printer, v.Value, true) + unitToStr(viewData.View.Measure))
case *view.LastValueData:
buffer.WriteString(floatToStr(printer, v.Value, true) + unitToStr(viewData.View.Measure))
case *view.DistributionData:
buffer.WriteString(floatToStr(printer, v.Min, false) + "/")
buffer.WriteString(floatToStr(printer, v.Mean, false) + "/")
buffer.WriteString(floatToStr(printer, v.Max, false) + unitToStr(viewData.View.Measure) + " (min/mean/max)")
buffer.WriteString("\tOccurrences=" + strconv.FormatInt(v.Count, 10))
default:
buffer.WriteString("Unknown metric data type")
}
mlr.value = buffer.String()
}

return mlr
}

func (m *metricsToLogExporter) ExportView(viewData *view.Data) {
if viewData == nil || viewData.View == nil {
return
}

// Append a row to pending outputs.
m.pendingRecordsMutex.Lock()
m.pendingRecords = append(m.pendingRecords, viewDataToRecord(viewData))
m.pendingRecordsMutex.Unlock()

// Start time to wait for more ExportView() calls to accumulate the data in
// pendingRecords and then output all accumulated. Normally ExportView() calls
// come in batches that we want to output at once.
m.outputTicker.Reset(outputBounceTimeInterval)
}

func floatToStr(printer *message.Printer, v float64, padRight bool) string {
var pattern string
if v == math.Ceil(v) {
// It is an integer number, don't print fractional digits.
if padRight {
pattern = "%12.0f"
} else {
pattern = "%.0f"
}
} else {
if padRight {
pattern = "%12f"
} else {
pattern = "%f"
}
}

return printer.Sprintf(pattern, v)
}

func unitToStr(measure stats.Measure) string {
// "1" mean unity, i.e. unitless metric, don't print it.
if measure.Unit() != "1" && measure.Unit() != "" {
return " " + measure.Unit()
}
return ""
}

func recordsToStr(records metricLogRecords) string {
const patternWithoutTags = " %-50s| %s"

var buffer strings.Builder
buffer.WriteString("\n Internal Metrics:\n")
buffer.WriteString(fmt.Sprintf(patternWithoutTags, "Metric", "Value"))
separator := "\n --------------------------------------------------|--------------------------------"
buffer.WriteString(separator)

// Sort accumulated metrics so that we print related metrics close to each other.
sort.Sort(records)

// First print records that have no tags (sorting places them at the beginning of
// pendingRecords slice.
i := 0
for ; i < len(records); i++ {
mo := records[i]
if mo.tags != "" {
break
}
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf(patternWithoutTags, mo.name, mo.value))
}
buffer.WriteString(separator)

// Now print records with tags, if there are any.
if i < len(records) {
const patternWithTags = " %-50s| %-40s| %s"
buffer.WriteString("\n\n")
buffer.WriteString(fmt.Sprintf(patternWithTags, "Component/Dimensions", "Metric", "Value"))
separator = "\n --------------------------------------------------|-----------------------------------------|--------------------------------"
buffer.WriteString(separator)
for ; i < len(records); i++ {
mo := records[i]
buffer.WriteString("\n")
buffer.WriteString(fmt.Sprintf(patternWithTags, mo.tags, mo.name, mo.value))
}
buffer.WriteString(separator)
}
return buffer.String()
}

func (m *metricsToLogExporter) doPeriodicOutput() {
for range m.outputTicker.C {
m.pendingRecordsMutex.Lock()

now := time.Now()
if m.logger.Core().Enabled(zapcore.DebugLevel) || now.Sub(m.lastLogged) >= nonDebugLogMinInterval {
// Output if the minimum interval has passed since last output, or if
// debug level logging is enabled.
m.logger.Info(recordsToStr(m.pendingRecords))
m.lastLogged = now
}

m.pendingRecords = []metricLogRecord{}
m.pendingRecordsMutex.Unlock()

m.outputTicker.Stop()
}
}
100 changes: 100 additions & 0 deletions service/telemetry_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

func createTagKey(name string) tag.Key {
key, err := tag.NewKey(name)
if err != nil {
panic("can't create a tag")
}
return key
}

func createViewData() *view.Data {
vd := &view.Data{
View: &view.View{
Name: "metric_name",
Measure: stats.Int64("metric_name", "", "By"),
Description: "",
},
Rows: []*view.Row{
{
Data: &view.SumData{Value: 123},
},
},
}
return vd
}

func createViewData2Tags() *view.Data {
vd := &view.Data{
View: &view.View{
Name: "process_time",
Measure: stats.Float64("process_time", "", ""),
Description: "",
},
Rows: []*view.Row{
{
Tags: []tag.Tag{
{Key: createTagKey("exporter"), Value: "otlp"},
{Key: createTagKey("component"), Value: "exporter"},
},
Data: &view.LastValueData{Value: 123.45},
},
},
}
return vd
}

func Test_viewDataToRecord(t *testing.T) {
lr := viewDataToRecord(createViewData())

assert.EqualValues(t, "", lr.tags)
assert.EqualValues(t, "metric_name", lr.name)
assert.EqualValues(t, " 123 By", lr.value)

lr = viewDataToRecord(createViewData2Tags())

assert.EqualValues(t, "exporter=otlp, component=exporter", lr.tags)
assert.EqualValues(t, "process_time", lr.name)
assert.EqualValues(t, " 123.450000", lr.value)
}

func Test_recordsToStr(t *testing.T) {
lr1 := viewDataToRecord(createViewData())
lr2 := viewDataToRecord(createViewData2Tags())

str := recordsToStr([]metricLogRecord{lr1, lr2})
assert.EqualValues(t, `
Internal Metrics:
Metric | Value
--------------------------------------------------|--------------------------------
metric_name | 123 By
--------------------------------------------------|--------------------------------
Component/Dimensions | Metric | Value
--------------------------------------------------|-----------------------------------------|--------------------------------
exporter=otlp, component=exporter | process_time | 123.450000
--------------------------------------------------|-----------------------------------------|--------------------------------`,
str)
}

0 comments on commit 1559a49

Please sign in to comment.