Skip to content

Commit

Permalink
Add otlpjson as a serializer implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed May 21, 2021
1 parent fe57963 commit 5581599
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 35 deletions.
39 changes: 24 additions & 15 deletions exporter/fileexporter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,56 @@ import (
"os"
"sync"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
"go.opentelemetry.io/collector/internal/model/serializer"
)

// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config.
var marshaler = &jsonpb.Marshaler{}

// fileExporter is the implementation of file exporter that writes telemetry data to a file
// in Protobuf-JSON format.
type fileExporter struct {
path string
file io.WriteCloser
mutex sync.Mutex
tracesMarshaler serializer.TracesMarshaler
metricsMarshaler serializer.MetricsMarshaler
logsMarshaler serializer.LogsMarshaler
path string
file io.WriteCloser
mutex sync.Mutex
}

func (e *fileExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *fileExporter) ConsumeTraces(_ context.Context, td pdata.Traces) error {
return exportMessageAsLine(e, internal.TracesToOtlp(td.InternalRep()))
if buf, err := e.tracesMarshaler.MarshalTraces(td); err != nil {
return err
} else {
return exportMessageAsLine(e, buf)
}
}

func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
return exportMessageAsLine(e, internal.MetricsToOtlp(md.InternalRep()))
if buf, err := e.metricsMarshaler.MarshalMetrics(md); err != nil {
return err
} else {
return exportMessageAsLine(e, buf)
}
}

func (e *fileExporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
return exportMessageAsLine(e, internal.LogsToOtlp(ld.InternalRep()))
if buf, err := e.logsMarshaler.MarshalLogs(ld); err != nil {
return err
} else {
return exportMessageAsLine(e, buf)
}
}

func exportMessageAsLine(e *fileExporter, message proto.Message) error {
func exportMessageAsLine(e *fileExporter, message []byte) error {
// Ensure only one write operation happens at a time.
e.mutex.Lock()
defer e.mutex.Unlock()
if err := marshaler.Marshal(e.file, message); err != nil {
if _, err := e.file.Write(message); err != nil {
return err
}
if _, err := io.WriteString(e.file, "\n"); err != nil {
Expand Down
37 changes: 17 additions & 20 deletions exporter/fileexporter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,22 @@
package fileexporter

import (
"bytes"
"context"
"io/ioutil"
"os"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/internal"
collectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/internal/otlpjson"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/testutil"
)

var unmarshaler = otlpjson.NewTracesUnmarshaler()

func TestFileTracesExporter(t *testing.T) {
fe := &fileExporter{path: tempFileName(t)}
require.NotNil(t, fe)
Expand All @@ -42,12 +39,12 @@ func TestFileTracesExporter(t *testing.T) {
assert.NoError(t, fe.ConsumeTraces(context.Background(), td))
assert.NoError(t, fe.Shutdown(context.Background()))

var unmarshaler = &jsonpb.Unmarshaler{}
got := &collectortrace.ExportTraceServiceRequest{}
buf, err := ioutil.ReadFile(fe.path)
assert.NoError(t, err)
assert.NoError(t, unmarshaler.Unmarshal(bytes.NewReader(buf), got))
assert.EqualValues(t, internal.TracesToOtlp(td.InternalRep()), got)
var got interface{}
got, err = unmarshaler.DeserializeTraces(buf)
assert.NoError(t, err)
assert.EqualValues(t, td, got)
}

func TestFileTracesExporterError(t *testing.T) {
Expand All @@ -72,12 +69,12 @@ func TestFileMetricsExporter(t *testing.T) {
assert.NoError(t, fe.ConsumeMetrics(context.Background(), md))
assert.NoError(t, fe.Shutdown(context.Background()))

var unmarshaler = &jsonpb.Unmarshaler{}
got := &collectormetrics.ExportMetricsServiceRequest{}
buf, err := ioutil.ReadFile(fe.path)
assert.NoError(t, err)
assert.NoError(t, unmarshaler.Unmarshal(bytes.NewReader(buf), got))
assert.EqualValues(t, internal.MetricsToOtlp(md.InternalRep()), got)
var got interface{}
got, err = unmarshaler.DeserializeMetrics(buf)
assert.NoError(t, err)
assert.EqualValues(t, md, got)
}

func TestFileMetricsExporterError(t *testing.T) {
Expand All @@ -97,17 +94,17 @@ func TestFileLogsExporter(t *testing.T) {
fe := &fileExporter{path: tempFileName(t)}
require.NotNil(t, fe)

otlp := testdata.GenerateLogsTwoLogRecordsSameResource()
ld := testdata.GenerateLogsTwoLogRecordsSameResource()
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, fe.ConsumeLogs(context.Background(), otlp))
assert.NoError(t, fe.ConsumeLogs(context.Background(), ld))
assert.NoError(t, fe.Shutdown(context.Background()))

var unmarshaler = &jsonpb.Unmarshaler{}
got := &collectorlogs.ExportLogsServiceRequest{}
buf, err := ioutil.ReadFile(fe.path)
assert.NoError(t, err)
assert.NoError(t, unmarshaler.Unmarshal(bytes.NewReader(buf), got))
assert.EqualValues(t, internal.LogsToOtlp(otlp.InternalRep()), got)
var got interface{}
got, err = unmarshaler.DeserializeLogs(buf)
assert.NoError(t, err)
assert.EqualValues(t, ld, got)
}

func TestFileLogsExporterErrors(t *testing.T) {
Expand Down
58 changes: 58 additions & 0 deletions internal/otlpjson/marshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlpjson

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"

otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
otlpcollectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
"go.opentelemetry.io/collector/internal/model/serializer"
)

type marshaler struct {
jsonpb.Marshaler
}

func NewTracesMarshaler() serializer.TracesMarshaler {
return &marshaler{jsonpb.Marshaler{}}
}

func (en *marshaler) MarshalLogs(model interface{}) ([]byte, error) {
buf := bytes.Buffer{}
if err := en.Marshal(&buf, model.(*otlpcollectorlogs.ExportLogsServiceRequest)); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (en *marshaler) MarshalMetrics(model interface{}) ([]byte, error) {
buf := bytes.Buffer{}
if err := en.Marshal(&buf, model.(*otlpcollectormetrics.ExportMetricsServiceRequest)); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (en *marshaler) MarshalTraces(model interface{}) ([]byte, error) {
buf := bytes.Buffer{}
if err := en.Marshal(&buf, model.(*otlpcollectortrace.ExportTraceServiceRequest)); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
57 changes: 57 additions & 0 deletions internal/otlpjson/unmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otlpjson

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
"go.opentelemetry.io/collector/internal/model/serializer"
)

type Unmarshaler struct {
unmarshaler jsonpb.Unmarshaler
}

func NewTracesUnmarshaler() serializer.TracesUnmarshaler {
return &Unmarshaler{unmarshaler: jsonpb.Unmarshaler{}}
}

func (d *Unmarshaler) UnmarshalLogs(buf []byte) (interface{}, error) {
ld := pdata.NewLogs()
if err := d.unmarshaler.Unmarshal(bytes.NewReader(buf), internal.LogsToOtlp(ld.InternalRep())); err != nil {
return nil, err
}
return ld, nil
}

func (d *Unmarshaler) UnmarshalMetrics(buf []byte) (interface{}, error) {
md := pdata.NewMetrics()
if err := d.unmarshaler.Unmarshal(bytes.NewReader(buf), internal.MetricsToOtlp(md.InternalRep())); err != nil {
return nil, err
}
return md, nil
}

func (d *Unmarshaler) UnmarshalTraces(buf []byte) (interface{}, error) {
td := pdata.NewTraces()
if err := d.unmarshaler.Unmarshal(bytes.NewReader(buf), internal.TracesToOtlp(td.InternalRep())); err != nil {
return nil, err
}
return td, nil
}

0 comments on commit 5581599

Please sign in to comment.