Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: intial draft sending medatada in prometheus remote write #23585

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,34 @@ func (prwe *prwExporter) Shutdown(context.Context) error {
return err
}

func (prwe *prwExporter) createMetadata(md pmetric.Metrics) []prompb.MetricMetadata {

var metadata []prompb.MetricMetadata

resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
scopeMetricsSlice := resourceMetrics.ScopeMetrics()

for j := 0; j < scopeMetricsSlice.Len(); j++ {
scopeMetrics := scopeMetricsSlice.At(j)
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {

entry := prompb.MetricMetadata{
Type: prompb.MetricMetadata_MetricType(scopeMetrics.Metrics().At(k).Type()),
MetricFamilyName: scopeMetrics.Metrics().At(k).Name(),
Help: scopeMetrics.Metrics().At(k).Description(),
Unit: scopeMetrics.Metrics().At(k).Unit(),
}
metadata = append(metadata, entry)
}
}
}

return metadata

}

// PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of
// TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally
// exports the map.
Expand All @@ -126,12 +154,15 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:

m := prometheusremotewrite.CreateMetadata(md)

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
if err != nil {
err = consumererror.NewPermanent(err)
}
// Call export even if a conversion error, since there may be points that were successfully converted.
return multierr.Combine(err, prwe.handleExport(ctx, tsMap))
return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m))
}
}

Expand All @@ -147,14 +178,14 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) {
return sanitizedLabels, nil
}

func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries) error {
func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []prompb.MetricMetadata) error {
// There are no metrics to export, so return.
if len(tsMap) == 0 {
return nil
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, maxBatchByteSize)
requests, err := batchTimeSeries(tsMap, m, maxBatchByteSize)
if err != nil {
return err
}
Expand Down
16 changes: 11 additions & 5 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int) ([]*prompb.WriteRequest, error) {
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, m []prompb.MetricMetadata, maxBatchByteSize int) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}
Expand All @@ -20,11 +20,16 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int)
var tsArray []prompb.TimeSeries
sizeOfCurrentBatch := 0

sizeOfM := 0
for _, mi := range m {
sizeOfM += mi.Size()
}

for _, v := range tsMap {
sizeOfSeries := v.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
wrapped := convertTimeseriesToRequest(tsArray)
if sizeOfCurrentBatch+sizeOfM+sizeOfSeries >= maxBatchByteSize {
wrapped := convertTimeseriesToRequest(tsArray, m)
requests = append(requests, wrapped)

tsArray = nil
Expand All @@ -36,14 +41,14 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int)
}

if len(tsArray) != 0 {
wrapped := convertTimeseriesToRequest(tsArray)
wrapped := convertTimeseriesToRequest(tsArray, m)
requests = append(requests, wrapped)
}

return requests, nil
}

func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteRequest {
func convertTimeseriesToRequest(tsArray []prompb.TimeSeries, m []prompb.MetricMetadata) *prompb.WriteRequest {
// the remote_write endpoint only requires the timeseries.
// otlp defines it's own way to handle metric metadata
return &prompb.WriteRequest{
Expand All @@ -52,6 +57,7 @@ func convertTimeseriesToRequest(tsArray []prompb.TimeSeries) *prompb.WriteReques
// * https://github.com/open-telemetry/wg-prometheus/issues/10
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
Timeseries: orderBySampleTimestamp(tsArray),
Metadata: m,
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package prometheusremotewrite // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite"

import (
"github.com/prometheus/prometheus/prompb"
"go.opentelemetry.io/collector/pdata/pmetric"
)

// CreateMetadata converts pmetric.Metrics to prometheus remote write format.
func CreateMetadata(md pmetric.Metrics) []prompb.MetricMetadata {
var metadata []prompb.MetricMetadata

resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
scopeMetricsSlice := resourceMetrics.ScopeMetrics()

for j := 0; j < scopeMetricsSlice.Len(); j++ {
scopeMetrics := scopeMetricsSlice.At(j)
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {

entry := prompb.MetricMetadata{
Type: prompb.MetricMetadata_MetricType(scopeMetrics.Metrics().At(k).Type()),
MetricFamilyName: scopeMetrics.Metrics().At(k).Name(),
Help: scopeMetrics.Metrics().At(k).Description(),
Unit: scopeMetrics.Metrics().At(k).Unit(),
}
metadata = append(metadata, entry)
}
}
}

return metadata
}