Skip to content

Commit

Permalink
Remove the InstrumentationLibrary to Scope translation, OTLP 0.19 no …
Browse files Browse the repository at this point in the history
…longer supports it. (#5819)

* Remove the InstrumentationLibrary to Scope translation, OTLP 0.19 no longer supports it.

Signed-off-by: Bogdan <[email protected]>

* Update CHANGELOG.md

Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
bogdandrutu and Alex Boten authored Aug 4, 2022
1 parent 1c1a668 commit d7b097c
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 586 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### 🛑 Breaking changes 🛑

- Remove the InstrumentationLibrary to Scope translation (part of transition to OTLP 0.19). (#5819)
- This has a side effect that when sending JSON encoded telemetry using OTLP proto <= 0.15.0, telemetry will be dropped.
- Require the storage to be explicitly set for the (experimental) persistent queue (#5784)
- Remove deprecated `confighttp.HTTPClientSettings.ToClientWithHost` (#5803)
- Remove deprecated component stability helpers (#5802):
Expand Down
31 changes: 3 additions & 28 deletions pdata/internal/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,9 @@
package otlp // import "go.opentelemetry.io/collector/pdata/internal/otlp"

import (
otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1"
otlplogs "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1"
)

// InstrumentationLibraryLogsToScope implements the translation of resource logs data
// following the v0.15.0 upgrade:
//
// receivers SHOULD check if instrumentation_library_logs is set
// and scope_logs is not set then the value in instrumentation_library_logs
// SHOULD be used instead by converting InstrumentationLibraryLogs into ScopeLogs.
// If scope_logs is set then instrumentation_library_logs SHOULD be ignored.
//
// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/logs/v1/logs.proto#L58
func InstrumentationLibraryLogsToScope(rls []*otlplogs.ResourceLogs) {
for _, rl := range rls {
if len(rl.ScopeLogs) == 0 {
for _, ill := range rl.InstrumentationLibraryLogs {
scopeLogs := otlplogs.ScopeLogs{
Scope: otlpcommon.InstrumentationScope{
Name: ill.InstrumentationLibrary.Name,
Version: ill.InstrumentationLibrary.Version,
},
LogRecords: ill.LogRecords,
SchemaUrl: ill.SchemaUrl,
}
rl.ScopeLogs = append(rl.ScopeLogs, &scopeLogs)
}
}
rl.InstrumentationLibraryLogs = nil
}
}
// MigrateLogs implements any translation needed due to deprecation in OTLP logs protocol.
// Any plog.Unmarshaler implementation from OTLP (proto/json) MUST call this, and the gRPC Server implementation.
func MigrateLogs(_ []*otlplogs.ResourceLogs) {}
31 changes: 3 additions & 28 deletions pdata/internal/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,9 @@
package otlp // import "go.opentelemetry.io/collector/pdata/internal/otlp"

import (
otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1"
otlpmetrics "go.opentelemetry.io/collector/pdata/internal/data/protogen/metrics/v1"
)

// InstrumentationLibraryMetricsToScope implements the translation of resource metrics data
// following the v0.15.0 upgrade:
//
// receivers SHOULD check if instrumentation_library_metrics is set
// and scope_metrics is not set then the value in instrumentation_library_metrics
// SHOULD be used instead by converting InstrumentationLibraryMetrics into ScopeMetrics.
// If scope_metrics is set then instrumentation_library_metrics SHOULD be ignored.
//
// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/metrics/v1/metrics.proto#L58
func InstrumentationLibraryMetricsToScope(rms []*otlpmetrics.ResourceMetrics) {
for _, rm := range rms {
if len(rm.ScopeMetrics) == 0 {
for _, ilm := range rm.InstrumentationLibraryMetrics {
scopeMetrics := otlpmetrics.ScopeMetrics{
Scope: otlpcommon.InstrumentationScope{
Name: ilm.InstrumentationLibrary.Name,
Version: ilm.InstrumentationLibrary.Version,
},
Metrics: ilm.Metrics,
SchemaUrl: ilm.SchemaUrl,
}
rm.ScopeMetrics = append(rm.ScopeMetrics, &scopeMetrics)
}
}
rm.InstrumentationLibraryMetrics = nil
}
}
// MigrateMetrics implements any translation needed due to deprecation in OTLP metrics protocol.
// Any pmetric.Unmarshaler implementation from OTLP (proto/json) MUST call this, and the gRPC Server implementation.
func MigrateMetrics(_ []*otlpmetrics.ResourceMetrics) {}
31 changes: 3 additions & 28 deletions pdata/internal/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,9 @@
package otlp // import "go.opentelemetry.io/collector/pdata/internal/otlp"

import (
otlpcommon "go.opentelemetry.io/collector/pdata/internal/data/protogen/common/v1"
otlptrace "go.opentelemetry.io/collector/pdata/internal/data/protogen/trace/v1"
)

// InstrumentationLibraryToScope implements the translation of resource span data
// following the v0.15.0 upgrade:
//
// receivers SHOULD check if instrumentation_library_spans is set
// and scope_spans is not set then the value in instrumentation_library_spans
// SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans.
// If scope_spans is set then instrumentation_library_spans SHOULD be ignored.
//
// https://github.com/open-telemetry/opentelemetry-proto/blob/3c2915c01a9fb37abfc0415ec71247c4978386b0/opentelemetry/proto/trace/v1/trace.proto#L58
func InstrumentationLibrarySpansToScope(rss []*otlptrace.ResourceSpans) {
for _, rs := range rss {
if len(rs.ScopeSpans) == 0 {
for _, ils := range rs.InstrumentationLibrarySpans {
scopeSpans := otlptrace.ScopeSpans{
Scope: otlpcommon.InstrumentationScope{
Name: ils.InstrumentationLibrary.Name,
Version: ils.InstrumentationLibrary.Version,
},
Spans: ils.Spans,
SchemaUrl: ils.SchemaUrl,
}
rs.ScopeSpans = append(rs.ScopeSpans, &scopeSpans)
}
}
rs.InstrumentationLibrarySpans = nil
}
}
// MigrateTraces implements any translation needed due to deprecation in OTLP traces protocol.
// Any ptrace.Unmarshaler implementation from OTLP (proto/json) MUST call this, and the gRPC Server implementation.
func MigrateTraces(_ []*otlptrace.ResourceSpans) {}
2 changes: 1 addition & 1 deletion pdata/plog/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ func (d *jsonUnmarshaler) UnmarshalLogs(buf []byte) (Logs, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), &ld); err != nil {
return Logs{}, err
}
otlp.InstrumentationLibraryLogsToScope(ld.ResourceLogs)
otlp.MigrateLogs(ld.ResourceLogs)
return internal.LogsFromProto(ld), nil
}
6 changes: 3 additions & 3 deletions pdata/plog/plogotlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (lr Request) UnmarshalProto(data []byte) error {
if err := lr.orig.Unmarshal(data); err != nil {
return err
}
otlp.InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs)
otlp.MigrateLogs(lr.orig.ResourceLogs)
return nil
}

Expand All @@ -110,7 +110,7 @@ func (lr Request) UnmarshalJSON(data []byte) error {
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), lr.orig); err != nil {
return err
}
otlp.InstrumentationLibraryLogsToScope(lr.orig.ResourceLogs)
otlp.MigrateLogs(lr.orig.ResourceLogs)
return nil
}

Expand Down Expand Up @@ -162,7 +162,7 @@ type rawLogsServer struct {
}

func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlog.ExportLogsServiceRequest) (*otlpcollectorlog.ExportLogsServiceResponse, error) {
otlp.InstrumentationLibraryLogsToScope(request.ResourceLogs)
otlp.MigrateLogs(request.ResourceLogs)
rsp, err := s.srv.Export(ctx, Request{orig: request})
return rsp.orig, err
}
163 changes: 0 additions & 163 deletions pdata/plog/plogotlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

v1 "go.opentelemetry.io/collector/pdata/internal/data/protogen/logs/v1"
"go.opentelemetry.io/collector/pdata/internal/otlp"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand Down Expand Up @@ -65,67 +63,6 @@ var logsRequestJSON = []byte(`
]
}`)

var logsTransitionData = [][]byte{
[]byte(`
{
"resourceLogs": [
{
"resource": {},
"instrumentationLibraryLogs": [
{
"instrumentationLibrary": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}`),
[]byte(`
{
"resourceLogs": [
{
"resource": {},
"instrumentationLibraryLogs": [
{
"instrumentationLibrary": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
],
"scopeLogs": [
{
"scope": {},
"logRecords": [
{
"body": {
"stringValue": "test_log_record"
},
"traceId": "",
"spanId": ""
}
]
}
]
}
]
}`),
}

func TestRequestToPData(t *testing.T) {
tr := NewRequest()
assert.Equal(t, tr.Logs().LogRecordCount(), 0)
Expand All @@ -143,18 +80,6 @@ func TestRequestJSON(t *testing.T) {
assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got))
}

func TestRequestJSONTransition(t *testing.T) {
for _, data := range logsTransitionData {
lr := NewRequest()
assert.NoError(t, lr.UnmarshalJSON(data))
assert.Equal(t, "test_log_record", lr.Logs().ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString())

got, err := lr.MarshalJSON()
assert.NoError(t, err)
assert.Equal(t, strings.Join(strings.Fields(string(logsRequestJSON)), ""), string(got))
}
}

func TestGrpc(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
Expand Down Expand Up @@ -188,83 +113,6 @@ func TestGrpc(t *testing.T) {
assert.Equal(t, NewResponse(), resp)
}

func TestGrpcTransition(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
RegisterServer(s, &fakeLogsServer{t: t})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.Serve(lis))
}()
t.Cleanup(func() {
s.Stop()
wg.Wait()
})

cc, err := grpc.Dial("bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, cc.Close())
})

logClient := NewClient(cc)

req := generateLogsRequestWithInstrumentationLibrary()
otlp.InstrumentationLibraryLogsToScope(req.orig.ResourceLogs)
resp, err := logClient.Export(context.Background(), req)
assert.NoError(t, err)
assert.Equal(t, NewResponse(), resp)
}

type fakeRawServer struct {
t *testing.T
}

func (s fakeRawServer) Export(_ context.Context, req Request) (Response, error) {
assert.Equal(s.t, 1, req.Logs().LogRecordCount())
return NewResponse(), nil
}

func TestGrpcExport(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
RegisterServer(s, &fakeRawServer{t: t})
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, s.Serve(lis))
}()
t.Cleanup(func() {
s.Stop()
wg.Wait()
})

cc, err := grpc.Dial("bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock())
assert.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, cc.Close())
})

logClient := NewClient(cc)

resp, err := logClient.Export(context.Background(), generateLogsRequestWithInstrumentationLibrary())
assert.NoError(t, err)
assert.Equal(t, NewResponse(), resp)
}

func TestGrpcError(t *testing.T) {
lis := bufconn.Listen(1024 * 1024)
s := grpc.NewServer()
Expand Down Expand Up @@ -316,14 +164,3 @@ func generateLogsRequest() Request {
ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStringVal("test_log_record")
return NewRequestFromLogs(ld)
}

func generateLogsRequestWithInstrumentationLibrary() Request {
lr := generateLogsRequest()
lr.orig.ResourceLogs[0].InstrumentationLibraryLogs = []*v1.InstrumentationLibraryLogs{ //nolint:staticcheck // SA1019 ignore this!
{
LogRecords: lr.orig.ResourceLogs[0].ScopeLogs[0].LogRecords,
},
}
lr.orig.ResourceLogs[0].ScopeLogs = []*v1.ScopeLogs{}
return lr
}
2 changes: 1 addition & 1 deletion pdata/pmetric/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ func (d *jsonUnmarshaler) UnmarshalMetrics(buf []byte) (Metrics, error) {
if err := d.delegate.Unmarshal(bytes.NewReader(buf), &md); err != nil {
return Metrics{}, err
}
otlp.InstrumentationLibraryMetricsToScope(md.ResourceMetrics)
otlp.MigrateMetrics(md.ResourceMetrics)
return internal.MetricsFromProto(md), nil
}
4 changes: 2 additions & 2 deletions pdata/pmetric/pmetricotlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (mr Request) UnmarshalJSON(data []byte) error {
if err := jsonUnmarshaler.Unmarshal(bytes.NewReader(data), mr.orig); err != nil {
return err
}
otlp.InstrumentationLibraryMetricsToScope(mr.orig.ResourceMetrics)
otlp.MigrateMetrics(mr.orig.ResourceMetrics)
return nil
}

Expand Down Expand Up @@ -158,7 +158,7 @@ type rawMetricsServer struct {
}

func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) {
otlp.InstrumentationLibraryMetricsToScope(request.ResourceMetrics)
otlp.MigrateMetrics(request.ResourceMetrics)
rsp, err := s.srv.Export(ctx, Request{orig: request})
return rsp.orig, err
}
Loading

0 comments on commit d7b097c

Please sign in to comment.