Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return merged Resource on schema conflict #4876

Merged
merged 12 commits into from
Feb 5, 2024
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `WithEndpointURL` option to the `exporters/otlp/otlpmetric/otlpmetricgrpc`, `exporters/otlp/otlpmetric/otlpmetrichttp`, `exporters/otlp/otlptrace/otlptracegrpc` and `exporters/otlp/otlptrace/otlptracehttp` packages. (#4808)
- Experimental exemplar exporting is added to the metric SDK.
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#exemplars) for more information about this feature and how to enable it. (#4871)
- `ErrSchemaURLConflict` is added to `go.opentelemetry.io/otel/sdk/resource`.
This error is returned when a merge of two `Resource`s with different (non-empty) schema URL is attepted. (#4876)

### Changed

- The `Merge` and `New` functions in `go.opentelemetry.io/otel/sdk/resource` now returns a partial result if there is a schema URL merge conflict.
Instead of returning `nil` when two `Resource`s with different (non-empty) schema URLs are merged the merged `Resource`, along with the new `ErrSchemaURLConflict` error, is returned.
It is up to the user to decide if they want to use the returned `Resource` or not.
It may have desired attributes overwritten or include stale semantic conventions. (#4876)

### Fixed

Expand Down
25 changes: 23 additions & 2 deletions sdk/resource/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,31 @@ type Detector interface {
// must never be done outside of a new major release.
}

// Detect calls all input detectors sequentially and merges each result with the previous one.
// It returns the merged error too.
// Detect returns a new [Resource] merged from all the Resources each of the
// detectors produces. Each of the detectors are called sequentially, in the
// order they are passed, merging the produced resource into the previous.
//
// This may return a partial Resource along with an error containing
// [ErrPartialResource] if that error is returned from a detector. It may also
// return a merge-conflicting Resource along with an error containing
// [ErrSchemaURLConflict] if merging Resources from different detectors results
// in a schema URL conflict. It is up to the caller to determine if this
// returned Resource should be used or not.
//
// If one of the detectors returns an error that is not [ErrPartialResource],
// the resource produced by the detector will not be merged and the returned
// error will wrap that detector's error.
func Detect(ctx context.Context, detectors ...Detector) (*Resource, error) {
r := new(Resource)
return r, detect(ctx, r, detectors)
}

// detect runs all detectors using ctx and merges the result into res. This
// assumes res is allocated and not nil, it will panic otherwise.
//
// If the detectors or merging resources produces any errors (i.e.
// [ErrPartialResource] [ErrSchemaURLConflict]), a single error wrapping all of
// these errors will be returned. Otherwise, nil is returned.
func detect(ctx context.Context, res *Resource, detectors []Detector) error {
var (
r *Resource
Expand Down Expand Up @@ -78,6 +94,11 @@ func detect(ctx context.Context, res *Resource, detectors []Detector) error {
if len(errs) == 0 {
return nil
}
if errors.Is(errs, ErrSchemaURLConflict) {
// If there has been a merge conflict, ensure the resource has no
// schema URL.
res.schemaURL = ""
}
return errs
}

Expand Down
80 changes: 61 additions & 19 deletions sdk/resource/auto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,47 +16,89 @@ package resource_test

import (
"context"
"errors"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
)

type detector struct {
SchemaURL string
Attributes []attribute.KeyValue
}

func newDetector(schemaURL string, attrs ...attribute.KeyValue) resource.Detector {
return detector{schemaURL, attrs}
}

func (d detector) Detect(context.Context) (*resource.Resource, error) {
return resource.NewWithAttributes(d.SchemaURL, d.Attributes...), nil
}

func TestDetect(t *testing.T) {
v130 := "https://opentelemetry.io/schemas/1.3.0"
v140 := "https://opentelemetry.io/schemas/1.4.0"
v150 := "https://opentelemetry.io/schemas/1.5.0"

alice := attribute.String("name", "Alice")
bob := attribute.String("name", "Bob")
carol := attribute.String("name", "Carol")

admin := attribute.Bool("admin", true)
user := attribute.Bool("admin", false)

cases := []struct {
name string
schema1, schema2 string
isErr bool
name string
detectors []resource.Detector
want *resource.Resource
wantErr error
}{
{
name: "different schema urls",
schema1: "https://opentelemetry.io/schemas/1.3.0",
schema2: "https://opentelemetry.io/schemas/1.4.0",
isErr: true,
name: "two different schema urls",
detectors: []resource.Detector{
newDetector(v130, alice, admin),
newDetector(v140, bob, user),
},
want: resource.NewSchemaless(bob, user),
wantErr: resource.ErrSchemaURLConflict,
},
{
name: "three different schema urls",
detectors: []resource.Detector{
newDetector(v130, alice, admin),
newDetector(v140, bob, user),
newDetector(v150, carol),
},
want: resource.NewSchemaless(carol, user),
wantErr: resource.ErrSchemaURLConflict,
},
{
name: "same schema url",
schema1: "https://opentelemetry.io/schemas/1.4.0",
schema2: "https://opentelemetry.io/schemas/1.4.0",
isErr: false,
name: "same schema url",
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
detectors: []resource.Detector{
newDetector(v140, alice, admin),
newDetector(v140, bob, user),
},
want: resource.NewWithAttributes(v140, bob, user),
},
}

for _, c := range cases {
t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) {
d1 := resource.StringDetector(c.schema1, semconv.HostNameKey, os.Hostname)
d2 := resource.StringDetector(c.schema2, semconv.HostNameKey, os.Hostname)
r, err := resource.Detect(context.Background(), d1, d2)
assert.NotNil(t, r)
if c.isErr {
assert.Error(t, err)
r, err := resource.Detect(context.Background(), c.detectors...)
if c.wantErr != nil {
assert.ErrorIs(t, err, c.wantErr)
if errors.Is(c.wantErr, resource.ErrSchemaURLConflict) {
assert.Zero(t, r.SchemaURL())
}
} else {
assert.NoError(t, err)
}
assert.Equal(t, c.want.SchemaURL(), r.SchemaURL())
assert.ElementsMatch(t, c.want.Attributes(), r.Attributes())
})
}
}
77 changes: 52 additions & 25 deletions sdk/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package resource // import "go.opentelemetry.io/otel/sdk/resource"
import (
"context"
"errors"
"fmt"
"sync"

"go.opentelemetry.io/otel"
Expand All @@ -40,9 +41,20 @@ var (
defaultResourceOnce sync.Once
)

var errMergeConflictSchemaURL = errors.New("cannot merge resource due to conflicting Schema URL")
// ErrSchemaURLConflict is an error returned when two Resources are merged
// together that contain different, non-empty, schema URLs.
var ErrSchemaURLConflict = errors.New("conflicting Schema URL")

// New returns a Resource combined from the user-provided detectors.
// New returns a [Resource] built using opts.
//
// This may return a partial Resource along with an error containing
// [ErrPartialResource] if options that provide a [Detector] are used and that
// error is returned from one or more of the Detectors. It may also return a
// merge-conflict Resource along with an error containing
// [ErrSchemaURLConflict] if merging Resources from the opts results in a
// schema URL conflict (see [Resource.Merge] for more information). It is up to
// the caller to determine if this returned Resource should be used or not
// based on these errors.
func New(ctx context.Context, opts ...Option) (*Resource, error) {
cfg := config{}
for _, opt := range opts {
Expand Down Expand Up @@ -146,16 +158,29 @@ func (r *Resource) Equal(eq *Resource) bool {
return r.Equivalent() == eq.Equivalent()
}

// Merge creates a new resource by combining resource a and b.
// Merge creates a new [Resource] by merging a and b.
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
//
// If there are common keys between a and b, then the value from b will
// overwrite the value from a, even if b's value is empty.
//
// If there are common keys between resource a and b, then the value
// from resource b will overwrite the value from resource a, even
// if resource b's value is empty.
// The SchemaURL of the resources will be merged according to the
// [OpenTelemetry specification rules]:
//
// The SchemaURL of the resources will be merged according to the spec rules:
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/resource/sdk.md#merge
// If the resources have different non-empty schemaURL an empty resource and an error
// will be returned.
// - If a's schema URL is empty then the returned Resource's schema URL will
// be set to the schema URL of b,
// - Else if b's schema URL is empty then the returned Resource's schema URL
// will be set to the schema URL of a,
// - Else if the schema URLs of a and b are the same then that will be the
// schema URL of the returned Resource,
// - Else this is a merging error. If the resources have different,
// non-empty, schema URLs an error containing [ErrSchemaURLConflict] will
// be returned with the merged Resource. The merged Resource will have an
// empty schema URL. It may be the case that some unintended attributes
// have been overwritten or old semantic conventions persisted in the
// returned Resource. It is up to the caller to determine if this returned
// Resource should be used or not.
//
// [OpenTelemetry specification rules]: https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/resource/sdk.md#merge
func Merge(a, b *Resource) (*Resource, error) {
if a == nil && b == nil {
return Empty(), nil
Expand All @@ -167,28 +192,30 @@ func Merge(a, b *Resource) (*Resource, error) {
return a, nil
}

// Merge the schema URL.
var schemaURL string
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
switch true {
case a.schemaURL == "":
schemaURL = b.schemaURL
case b.schemaURL == "":
schemaURL = a.schemaURL
case a.schemaURL == b.schemaURL:
schemaURL = a.schemaURL
default:
return Empty(), errMergeConflictSchemaURL
}

// Note: 'b' attributes will overwrite 'a' with last-value-wins in attribute.Key()
// Meaning this is equivalent to: append(a.Attributes(), b.Attributes()...)
mi := attribute.NewMergeIterator(b.Set(), a.Set())
combine := make([]attribute.KeyValue, 0, a.Len()+b.Len())
for mi.Next() {
combine = append(combine, mi.Attribute())
}
merged := NewWithAttributes(schemaURL, combine...)
return merged, nil

switch {
case a.schemaURL == "":
return NewWithAttributes(b.schemaURL, combine...), nil
case b.schemaURL == "":
return NewWithAttributes(a.schemaURL, combine...), nil
case a.schemaURL == b.schemaURL:
return NewWithAttributes(a.schemaURL, combine...), nil
}
// Return the merged resource with an appropriate error. It is up to
// the user to decide if the returned resource can be used or not.
return NewSchemaless(combine...), fmt.Errorf(
"%w: %s and %s",
ErrSchemaURLConflict,
a.schemaURL,
b.schemaURL,
)
}

// Empty returns an instance of Resource with no attributes. It is
Expand Down
32 changes: 21 additions & 11 deletions sdk/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestMerge(t *testing.T) {
name: "Merge with different schemas",
a: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.4.0", kv41),
b: resource.NewWithAttributes("https://opentelemetry.io/schemas/1.3.0", kv42),
want: nil,
want: []attribute.KeyValue{kv42},
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
isErr: true,
},
}
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestNew(t *testing.T) {

resourceValues map[string]string
schemaURL string
isErr bool
wantErr error
}{
{
name: "No Options returns empty resource",
Expand Down Expand Up @@ -406,9 +406,14 @@ func TestNew(t *testing.T) {
),
resource.WithSchemaURL("https://opentelemetry.io/schemas/1.1.0"),
},
resourceValues: map[string]string{},
schemaURL: "",
isErr: true,
resourceValues: map[string]string{
string(semconv.HostNameKey): func() (hostname string) {
hostname, _ = os.Hostname()
return hostname
}(),
},
schemaURL: "",
wantErr: resource.ErrSchemaURLConflict,
},
{
name: "With conflicting detector schema urls",
Expand All @@ -420,9 +425,14 @@ func TestNew(t *testing.T) {
),
resource.WithSchemaURL("https://opentelemetry.io/schemas/1.2.0"),
},
resourceValues: map[string]string{},
schemaURL: "",
isErr: true,
resourceValues: map[string]string{
string(semconv.HostNameKey): func() (hostname string) {
hostname, _ = os.Hostname()
return hostname
}(),
},
schemaURL: "",
wantErr: resource.ErrSchemaURLConflict,
},
}
for _, tt := range tc {
Expand All @@ -436,10 +446,10 @@ func TestNew(t *testing.T) {
ctx := context.Background()
res, err := resource.New(ctx, tt.options...)

if tt.isErr {
require.Error(t, err)
if tt.wantErr != nil {
assert.ErrorIs(t, err, tt.wantErr)
} else {
require.NoError(t, err)
assert.NoError(t, err)
}

require.EqualValues(t, tt.resourceValues, toMap(res))
Expand Down