diff --git a/pkg/sampling/common.go b/pkg/sampling/common.go new file mode 100644 index 000000000000..68d17785811e --- /dev/null +++ b/pkg/sampling/common.go @@ -0,0 +1,124 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "errors" + "io" + "strings" + + "go.uber.org/multierr" +) + +type KV struct { + Key string + Value string +} + +var ( + ErrTraceStateSize = errors.New("invalid tracestate size") +) + +// keyValueScanner defines distinct scanner behaviors for lists of +// key-values. +type keyValueScanner struct { + // maxItems is 32 or -1 + maxItems int + // trim is set if OWS (optional whitespace) should be removed + trim bool + // separator is , or ; + separator byte + // equality is = or : + equality byte +} + +// commonTraceState is embedded in both W3C and OTel trace states. +type commonTraceState struct { + kvs []KV +} + +func (cts commonTraceState) HasExtraValues() bool { + return len(cts.kvs) != 0 +} + +func (cts commonTraceState) ExtraValues() []KV { + return cts.kvs +} + +// trimOws removes optional whitespace on both ends of a string. +func trimOws(input string) string { + // Hard-codes the value of owsCharset + for len(input) > 0 && (input[0] == ' ' || input[0] == '\t') { + input = input[1:] + } + for len(input) > 0 && (input[len(input)-1] == ' ' || input[len(input)-1] == '\t') { + input = input[:len(input)-1] + } + return input +} + +// scanKeyValues is common code to scan either W3C or OTel tracestate +// entries, as parameterized in the keyValueScanner struct. +func (s keyValueScanner) scanKeyValues(input string, f func(key, value string) error) error { + var rval error + items := 0 + for input != "" { + items++ + if s.maxItems > 0 && items >= s.maxItems { + // W3C specifies max 32 entries, tested here + // instead of via the regexp. + return ErrTraceStateSize + } + + sep := strings.IndexByte(input, s.separator) + + var member string + if sep < 0 { + member = input + input = "" + } else { + member = input[:sep] + input = input[sep+1:] + } + + if s.trim { + // Trim only required for W3C; OTel does not + // specify whitespace for its value encoding. + member = trimOws(member) + } + + if member == "" { + // W3C allows empty list members. + continue + } + + eq := strings.IndexByte(member, s.equality) + if eq < 0 { + // A regexp should have rejected this input. + continue + } + if err := f(member[:eq], member[eq+1:]); err != nil { + rval = multierr.Append(rval, err) + } + } + return rval +} + +// serializer assists with checking and combining errors from +// (io.StringWriter).WriteString(). +type serializer struct { + writer io.StringWriter + err error +} + +// write handles errors from io.StringWriter. +func (ser *serializer) write(str string) { + _, err := ser.writer.WriteString(str) + ser.check(err) +} + +// check handles errors (e.g., from another serializer). +func (ser *serializer) check(err error) { + ser.err = multierr.Append(ser.err, err) +} diff --git a/pkg/sampling/encoding_test.go b/pkg/sampling/encoding_test.go new file mode 100644 index 000000000000..84a6dfcb80e5 --- /dev/null +++ b/pkg/sampling/encoding_test.go @@ -0,0 +1,376 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "encoding/binary" + "errors" + "fmt" + "math/rand" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func must[T any](t T, err error) T { + if err != nil { + panic(err) + } + return t +} + +func mustNot[T any](t T, err error) error { + if err == nil { + return fmt.Errorf("expected an error, got nil") + } + return err +} + +func probabilityToTValue(prob float64) (string, error) { + th, err := ProbabilityToThreshold(prob) + return string(th.TValue()), err +} + +func tValueToProbability(tv string) (float64, error) { + th, err := TValueToThreshold(tv) + return th.Probability(), err +} + +func TestValidProbabilityToTValue(t *testing.T) { + require.Equal(t, "0", must(probabilityToTValue(1.0))) + require.Equal(t, "8", must(probabilityToTValue(0.5))) + require.Equal(t, "ffffffffffffff", must(probabilityToTValue(0x1p-56))) + require.Equal(t, "aaaaaaaaaaaaac", must(probabilityToTValue(1/3.))) + require.Equal(t, "55555555555558", must(probabilityToTValue(2/3.))) + require.Equal(t, "54", must(probabilityToTValue(1-0x54p-8))) // 0x54p-8 is approximately 1/3 + require.Equal(t, "01", must(probabilityToTValue(1-0x1p-8))) +} + +func TestThresholdGreater(t *testing.T) { + require.True(t, ThresholdGreater( + must(TValueToThreshold("5")), + must(TValueToThreshold("4")), + )) + + require.True(t, ThresholdGreater( + must(TValueToThreshold("4")), + must(TValueToThreshold("04")), + )) + + require.False(t, ThresholdGreater( + must(TValueToThreshold("234")), + must(TValueToThreshold("4")), + )) + + require.True(t, ThresholdGreater( + must(TValueToThreshold("4")), + must(TValueToThreshold("234")), + )) +} + +func TestInvalidprobabilityToTValue(t *testing.T) { + // Too small + require.Error(t, mustNot(probabilityToTValue(0x1p-57))) + require.Error(t, mustNot(probabilityToTValue(0x1p-57))) + + // Too big + require.Error(t, mustNot(probabilityToTValue(1.1))) + require.Error(t, mustNot(probabilityToTValue(1.1))) +} + +func TestTValueToProbability(t *testing.T) { + require.Equal(t, 0.5, must(tValueToProbability("8"))) + require.Equal(t, 1-0x444p-12, must(tValueToProbability("444"))) + require.Equal(t, 1.0, must(tValueToProbability("0"))) + + // 0x55555554p-32 is very close to 1/3 + require.InEpsilon(t, 1-1/3., must(tValueToProbability("55555554")), 1e-9) +} + +func TestProbabilityToThreshold(t *testing.T) { + require.Equal(t, + must(TValueToThreshold("8")), + must(ProbabilityToThreshold(0.5))) + require.Equal(t, + must(TValueToThreshold("ffffffffffffff")), + must(ProbabilityToThreshold(0x1p-56))) + require.Equal(t, + must(TValueToThreshold("ffffffffffff00")), + must(ProbabilityToThreshold(0x100p-56))) + require.Equal(t, + must(TValueToThreshold("00000000000010")), + must(ProbabilityToThreshold(1.0-0x1p-52))) + require.Equal(t, + AlwaysSampleThreshold, + must(ProbabilityToThreshold(1.0))) + + zt, err := ProbabilityToThreshold(0) + require.Equal(t, zt, AlwaysSampleThreshold) + require.Error(t, err) + require.Equal(t, err, ErrProbabilityRange) +} + +func TestShouldSample(t *testing.T) { + // Test four boundary conditions for 50% sampling, + thresh := must(ProbabilityToThreshold(0.5)) + // Smallest TraceID that should NOT sample. + require.False(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0, // randomness starts here + 0, 0, 0, 0, 0, 0, + }))) + // Largest TraceID that should NOT sample. + require.False(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0x7f, // randomness starts here + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }))) + // Smallest TraceID that should sample. + require.True(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0x80, // randomness starts here + 0, 0, 0, 0, 0, 0, + }))) + // Largest TraceID that should sample. + require.True(t, thresh.ShouldSample(TraceIDToRandomness(pcommon.TraceID{ + // 9 meaningless bytes + 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, 0xee, + 0xff, // randomness starts here + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }))) +} + +func TestRValueSyntax(t *testing.T) { + type testCase struct { + in string + expectErr error + } + for _, test := range []testCase{ + // correct cases + {"12341234123412", nil}, + + // wrong size + {"123412341234120", RValueSizeError("123412341234120")}, + {"1234123412341", RValueSizeError("1234123412341")}, + {"", RValueSizeError("")}, + + // bad syntax + {"abcdefgabcdefg", strconv.ErrSyntax}, + } { + t.Run(testName(test.in), func(t *testing.T) { + rnd, err := RValueToRandomness(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), + "%q: not expecting %v wanted %v", test.in, err, test.expectErr, + ) + require.Equal(t, must(RValueToRandomness("00000000000000")), rnd) + } else { + require.NoError(t, err, "%q", test.in) + + val, err := strconv.ParseUint(test.in, 16, 64) + require.NoError(t, err) + + require.Equal(t, TraceIDToRandomness( + pcommon.TraceID{ + byte(rand.Intn(256)), // 0 + byte(rand.Intn(256)), // 1 + byte(rand.Intn(256)), // 2 + byte(rand.Intn(256)), // 3 + byte(rand.Intn(256)), // 4 + byte(rand.Intn(256)), // 5 + byte(rand.Intn(256)), // 6 + byte(rand.Intn(256)), // 7 + byte(rand.Intn(256)), // 8 + byte(val >> 48 & 0xff), // 9 + byte(val >> 40 & 0xff), // 10 + byte(val >> 32 & 0xff), // 11 + byte(val >> 24 & 0xff), // 12 + byte(val >> 16 & 0xff), // 13 + byte(val >> 8 & 0xff), // 14 + byte(val >> 0 & 0xff), // 15 + }, + ), rnd) + } + }) + } +} + +func TestTValueSyntax(t *testing.T) { + type testCase struct { + in string + expectErr error + } + for _, test := range []testCase{ + // correct cases + {"1", nil}, + + // syntax error + {"", ErrTValueEmpty}, + {"g", strconv.ErrSyntax}, + } { + t.Run(testName(test.in), func(t *testing.T) { + _, err := TValueToThreshold(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), + "%q: not expecting %v wanted %v", test.in, err, test.expectErr, + ) + } else { + require.NoError(t, err, "%q", test.in) + } + }) + } +} + +func TestProbabilityToThresholdWithPrecision(t *testing.T) { + type kase struct { + prob float64 + exact string + rounded []string + } + + for _, test := range []kase{ + // Note: remember 8 is half of 16: hex rounds up at 8+, down at 7-. + { + 1 - 0x456789ap-28, + "456789a", + []string{ + "45678a", + "45679", + "4568", + "456", + "45", + "4", + }, + }, + // Add 3 leading zeros + { + 1 - 0x456789ap-40, + "000456789a", + []string{ + "00045678a", + "00045679", + "0004568", + "000456", + "00045", + "0004", + }, + }, + // Rounding up + { + 1 - 0x789abcdefp-40, + "0789abcdef", + []string{ + "0789abcdef", + "0789abcdf", + "0789abce", + "0789abd", + "0789ac", + "0789b", + "078a", + "079", + "08", + }, + }, + // Rounding down + { + 1 - 0x12345678p-32, + "12345678", + []string{ + "1234568", + "123456", + "12345", + "1234", + "123", + "12", + "1", + }, + }, + // Zeros + { + 1 - 0x80801p-28, + "0080801", + []string{ + "00808", + "008", + }, + }, + } { + t.Run(test.exact, func(t *testing.T) { + th, err := ProbabilityToThreshold(test.prob) + require.NoError(t, err) + require.Equal(t, th.TValue(), test.exact) + + for _, round := range test.rounded { + t.Run(round, func(t *testing.T) { + // Requested precision is independent of leading zeros, + // so strip them to calculate test precision. + strip := round + for strip[0] == '0' { + strip = strip[1:] + } + rth, err := ProbabilityToThresholdWithPrecision(test.prob, uint8(len(strip))) + require.NoError(t, err) + require.Equal(t, round, rth.TValue()) + }) + } + }) + } +} + +// There were two benchmarks used to choose the implementation for the +// Threshold type in this package. The results indicate that it is +// faster to compare a 56-bit number than to compare as 7 element +// []byte. + +type benchTIDs [1024]pcommon.TraceID + +func (tids *benchTIDs) init() { + for i := range tids { + binary.BigEndian.PutUint64(tids[i][:8], rand.Uint64()) + binary.BigEndian.PutUint64(tids[i][8:], rand.Uint64()) + } +} + +// The current implementation, using unsigned: +// +// BenchmarkThresholdCompareAsUint64-10 1000000000 0.4515 ns/op 0 B/op 0 allocs/op +// +// vs the tested and rejected, using bytes: +// +// BenchmarkThresholdCompareAsBytes-10 528679580 2.288 ns/op 0 B/op 0 allocs/op +func BenchmarkThresholdCompareAsUint64(b *testing.B) { + var tids benchTIDs + var comps [1024]Threshold + tids.init() + for i := range comps { + var err error + comps[i], err = ProbabilityToThreshold(rand.Float64()) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + yes := 0 + no := 0 + for i := 0; i < b.N; i++ { + idx := i % len(tids) + tid := tids[idx] + comp := comps[idx] + + if comp.ShouldSample(TraceIDToRandomness(tid)) { + yes++ + } else { + no++ + } + } +} diff --git a/pkg/sampling/go.mod b/pkg/sampling/go.mod new file mode 100644 index 000000000000..feac2ad0590a --- /dev/null +++ b/pkg/sampling/go.mod @@ -0,0 +1,23 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling + +go 1.20 + +require ( + github.com/stretchr/testify v1.8.2 + go.opentelemetry.io/collector/pdata v1.0.0-rcv0011 + go.uber.org/multierr v1.11.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect + google.golang.org/grpc v1.54.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/pkg/sampling/go.sum b/pkg/sampling/go.sum new file mode 100644 index 000000000000..5a83b0329a63 --- /dev/null +++ b/pkg/sampling/go.sum @@ -0,0 +1,71 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector/pdata v1.0.0-rcv0011 h1:7lT0vseP89mHtUpvgmWYRvQZ0eY+SHbVsnXY20xkoMg= +go.opentelemetry.io/collector/pdata v1.0.0-rcv0011/go.mod h1:9vrXSQBeMRrdfGt9oMgYweqERJ8adaiQjN6LSbqRMMA= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/sampling/oteltracestate.go b/pkg/sampling/oteltracestate.go new file mode 100644 index 000000000000..8fccd096f56c --- /dev/null +++ b/pkg/sampling/oteltracestate.go @@ -0,0 +1,198 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "fmt" + "io" + "regexp" + "strconv" +) + +type OTelTraceState struct { + commonTraceState + + // sampling r and t-values + rnd Randomness // r value parsed, as unsigned + rvalue string // 14 ASCII hex digits + threshold Threshold // t value parsed, as a threshold + tvalue string // 1-14 ASCII hex digits +} + +const ( + // RName is the OTel tracestate field for R-value + RName = "rv" + // TName is the OTel tracestate field for T-value + TName = "th" + + // hardMaxOTelLength is the maximum encoded size of an OTel + // tracestate value. + hardMaxOTelLength = 256 + + // chr = ucalpha / lcalpha / DIGIT / "." / "_" / "-" + // ucalpha = %x41-5A ; A-Z + // lcalpha = %x61-7A ; a-z + // key = lcalpha *(lcalpha / DIGIT ) + // value = *(chr) + // list-member = key ":" value + // list = list-member *( ";" list-member ) + otelKeyRegexp = lcAlphaRegexp + lcDigitRegexp + `*` + otelValueRegexp = `[a-zA-Z0-9._\-]*` + otelMemberRegexp = `(?:` + otelKeyRegexp + `:` + otelValueRegexp + `)` + otelSemicolonMemberRegexp = `(?:` + `;` + otelMemberRegexp + `)` + otelTracestateRegexp = `^` + otelMemberRegexp + otelSemicolonMemberRegexp + `*$` +) + +var ( + otelTracestateRe = regexp.MustCompile(otelTracestateRegexp) + + otelSyntax = keyValueScanner{ + maxItems: -1, + trim: false, + separator: ';', + equality: ':', + } + + // ErrInconsistentSampling is returned when a sampler update + // is illogical. It is safe to ignore. Samplers should avoid + // this condition using a ThresholdLessThan() test. + ErrInconsistentSampling = fmt.Errorf("cannot raise existing sampling probability") + ErrInconsistentZero = fmt.Errorf("cannot zero sampling probability") +) + +func NewOTelTraceState(input string) (OTelTraceState, error) { + // Note: the default value has threshold == 0 and tvalue == "". + // It is important to recognize this as always-sample, meaning + // to check HasTValue() before using TValueThreshold(), since + // TValueThreshold() == NeverSampleThreshold when !HasTValue(). + otts := OTelTraceState{} + + if len(input) > hardMaxOTelLength { + return otts, ErrTraceStateSize + } + + if !otelTracestateRe.MatchString(input) { + return otts, strconv.ErrSyntax + } + + err := otelSyntax.scanKeyValues(input, func(key, value string) error { + var err error + switch key { + case RName: + if otts.rnd, err = RValueToRandomness(value); err == nil { + otts.rvalue = value + } else { + // The zero-value for randomness implies always-sample; + // the threshold test is R < T, but T is not meaningful + // at zero, and this value implies zero adjusted count. + otts.rvalue = "" + otts.rnd = Randomness{} + } + case TName: + if otts.threshold, err = TValueToThreshold(value); err == nil { + otts.tvalue = value + } else { + otts.tvalue = "" + otts.threshold = AlwaysSampleThreshold + } + default: + otts.kvs = append(otts.kvs, KV{ + Key: key, + Value: value, + }) + } + return err + }) + + return otts, err +} + +func (otts *OTelTraceState) HasRValue() bool { + return otts.rvalue != "" +} + +func (otts *OTelTraceState) RValue() string { + return otts.rvalue +} + +func (otts *OTelTraceState) RValueRandomness() Randomness { + return otts.rnd +} + +func (otts *OTelTraceState) HasTValue() bool { + return otts.tvalue != "" +} + +func (otts *OTelTraceState) TValue() string { + return otts.tvalue +} + +func (otts *OTelTraceState) TValueThreshold() Threshold { + return otts.threshold +} + +func (otts *OTelTraceState) UpdateTValueWithSampling(sampledThreshold Threshold, encodedTValue string) error { + if otts.HasTValue() && ThresholdGreater(otts.threshold, sampledThreshold) { + return ErrInconsistentSampling + } + otts.threshold = sampledThreshold + otts.tvalue = encodedTValue + return nil +} + +func (otts *OTelTraceState) AdjustedCount() float64 { + if !otts.HasTValue() { + return 0 + } + return 1.0 / otts.threshold.Probability() +} + +func (otts *OTelTraceState) ClearTValue() { + otts.tvalue = "" + otts.threshold = Threshold{} +} + +func (otts *OTelTraceState) SetRValue(randomness Randomness) { + otts.rnd = randomness + otts.rvalue = randomness.RValue() +} + +func (otts *OTelTraceState) ClearRValue() { + otts.rvalue = "" + otts.rnd = Randomness{} +} + +func (otts *OTelTraceState) HasAnyValue() bool { + return otts.HasRValue() || otts.HasTValue() || otts.HasExtraValues() +} + +func (otts *OTelTraceState) Serialize(w io.StringWriter) error { + ser := serializer{writer: w} + cnt := 0 + sep := func() { + if cnt != 0 { + ser.write(";") + } + cnt++ + } + if otts.HasRValue() { + sep() + ser.write(RName) + ser.write(":") + ser.write(otts.RValue()) + } + if otts.HasTValue() { + sep() + ser.write(TName) + ser.write(":") + ser.write(otts.TValue()) + } + for _, kv := range otts.ExtraValues() { + sep() + ser.write(kv.Key) + ser.write(":") + ser.write(kv.Value) + } + return ser.err +} diff --git a/pkg/sampling/oteltracestate_test.go b/pkg/sampling/oteltracestate_test.go new file mode 100644 index 000000000000..97df83f92bc3 --- /dev/null +++ b/pkg/sampling/oteltracestate_test.go @@ -0,0 +1,336 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "errors" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func testName(in string) string { + if len(in) > 32 { + return in[:32] + "..." + } + return in +} + +func TestEmptyOTelTraceState(t *testing.T) { + // Empty value is invalid + _, err := NewOTelTraceState("") + require.Error(t, err) +} + +func TestOTelTraceStateTValueSerialize(t *testing.T) { + const orig = "rv:10000000000000;th:3;a:b;c:d" + otts, err := NewOTelTraceState(orig) + require.NoError(t, err) + require.True(t, otts.HasTValue()) + require.Equal(t, "3", otts.TValue()) + require.Equal(t, 1-0x3p-4, otts.TValueThreshold().Probability()) + + require.True(t, otts.HasRValue()) + require.Equal(t, "10000000000000", otts.RValue()) + require.Equal(t, "10000000000000", otts.RValueRandomness().RValue()) + + require.True(t, otts.HasAnyValue()) + var w strings.Builder + otts.Serialize(&w) + require.Equal(t, orig, w.String()) +} + +func TestOTelTraceStateZero(t *testing.T) { + const orig = "th:0" + otts, err := NewOTelTraceState(orig) + require.NoError(t, err) + require.True(t, otts.HasAnyValue()) + require.True(t, otts.HasTValue()) + require.Equal(t, "0", otts.TValue()) + require.Equal(t, 1.0, otts.TValueThreshold().Probability()) + + var w strings.Builder + otts.Serialize(&w) + require.Equal(t, orig, w.String()) +} + +func TestOTelTraceStateRValuePValue(t *testing.T) { + // Ensures the caller can handle RValueSizeError and search + // for p-value in extra-values. + const orig = "rv:3;p:2" + otts, err := NewOTelTraceState(orig) + require.Error(t, err) + require.True(t, errors.Is(err, RValueSizeError("3"))) + require.False(t, otts.HasRValue()) + + // The error is oblivious to the old r-value, but that's ok. + require.Contains(t, err.Error(), "14 hex digits") + + require.Equal(t, []KV{{"p", "2"}}, otts.ExtraValues()) + + var w strings.Builder + otts.Serialize(&w) + require.Equal(t, "p:2", w.String()) +} + +func TestOTelTraceStateTValueUpdate(t *testing.T) { + const orig = "rv:abcdefabcdefab" + otts, err := NewOTelTraceState(orig) + require.NoError(t, err) + require.False(t, otts.HasTValue()) + require.True(t, otts.HasRValue()) + + th, _ := TValueToThreshold("3") + require.NoError(t, otts.UpdateTValueWithSampling(th, "3")) + + require.Equal(t, "3", otts.TValue()) + require.Equal(t, 1-0x3p-4, otts.TValueThreshold().Probability()) + + const updated = "rv:abcdefabcdefab;th:3" + var w strings.Builder + otts.Serialize(&w) + require.Equal(t, updated, w.String()) +} + +func TestOTelTraceStateRTUpdate(t *testing.T) { + otts, err := NewOTelTraceState("a:b") + require.NoError(t, err) + require.False(t, otts.HasTValue()) + require.False(t, otts.HasRValue()) + require.True(t, otts.HasAnyValue()) + + th, _ := TValueToThreshold("3") + require.NoError(t, otts.UpdateTValueWithSampling(th, "3")) + otts.SetRValue(must(RValueToRandomness("00000000000003"))) + + const updated = "rv:00000000000003;th:3;a:b" + var w strings.Builder + otts.Serialize(&w) + require.Equal(t, updated, w.String()) +} + +func TestOTelTraceStateRTClear(t *testing.T) { + otts, err := NewOTelTraceState("a:b;rv:12341234123412;th:1234") + require.NoError(t, err) + + otts.ClearTValue() + otts.ClearRValue() + + const updated = "a:b" + var w strings.Builder + otts.Serialize(&w) + require.Equal(t, updated, w.String()) +} + +func TestParseOTelTraceState(t *testing.T) { + type testCase struct { + in string + rval string + tval string + extra []string + expectErr error + } + const ns = "" + for _, test := range []testCase{ + // t-value correct cases + {"th:2", ns, "2", nil, nil}, + {"th:1", ns, "1", nil, nil}, + {"th:1", ns, "1", nil, nil}, + {"th:10", ns, "10", nil, nil}, + {"th:33", ns, "33", nil, nil}, + {"th:ab", ns, "ab", nil, nil}, + {"th:61", ns, "61", nil, nil}, + + // syntax errors + {"", ns, ns, nil, strconv.ErrSyntax}, + {"th:1;", ns, ns, nil, strconv.ErrSyntax}, + {"th:1=p:2", ns, ns, nil, strconv.ErrSyntax}, + {"th:1;p:2=s:3", ns, ns, nil, strconv.ErrSyntax}, + {":1;p:2=s:3", ns, ns, nil, strconv.ErrSyntax}, + {":;p:2=s:3", ns, ns, nil, strconv.ErrSyntax}, + {":;:", ns, ns, nil, strconv.ErrSyntax}, + {":", ns, ns, nil, strconv.ErrSyntax}, + {"th:;p=1", ns, ns, nil, strconv.ErrSyntax}, + {"th:$", ns, ns, nil, strconv.ErrSyntax}, // not-hexadecimal + {"th:0x1p+3", ns, ns, nil, strconv.ErrSyntax}, // + is invalid + {"th:14.5", ns, ns, nil, strconv.ErrSyntax}, // integer syntax + {"th:-1", ns, ns, nil, strconv.ErrSyntax}, // non-negative + + // too many digits + {"th:ffffffffffffffff", ns, ns, nil, ErrTValueSize}, + {"th:100000000000000", ns, ns, nil, ErrTValueSize}, + + // one field + {"e100:1", ns, ns, []string{"e100:1"}, nil}, + + // two fields + {"e1:1;e2:2", ns, ns, []string{"e1:1", "e2:2"}, nil}, + {"e1:1;e2:2", ns, ns, []string{"e1:1", "e2:2"}, nil}, + + // one extra key, two ways + {"th:2;extra:stuff", ns, "2", []string{"extra:stuff"}, nil}, + {"extra:stuff;th:2", ns, "2", []string{"extra:stuff"}, nil}, + + // two extra fields + {"e100:100;th:1;e101:101", ns, "1", []string{"e100:100", "e101:101"}, nil}, + {"th:1;e100:100;e101:101", ns, "1", []string{"e100:100", "e101:101"}, nil}, + {"e100:100;e101:101;th:1", ns, "1", []string{"e100:100", "e101:101"}, nil}, + + // parse error prevents capturing unrecognized keys + {"1:1;u:V", ns, ns, nil, strconv.ErrSyntax}, + {"X:1;u:V", ns, ns, nil, strconv.ErrSyntax}, + {"x:1;u:V", ns, ns, []string{"x:1", "u:V"}, nil}, + + // r-value + {"rv:22222222222222;extra:stuff", "22222222222222", ns, []string{"extra:stuff"}, nil}, + {"extra:stuff;rv:22222222222222", "22222222222222", ns, []string{"extra:stuff"}, nil}, + {"rv:ffffffffffffff", "ffffffffffffff", ns, nil, nil}, + {"rv:88888888888888", "88888888888888", ns, nil, nil}, + {"rv:00000000000000", "00000000000000", ns, nil, nil}, + + // r-value range error (15 bytes of hex or more) + {"rv:100000000000000", ns, ns, nil, RValueSizeError("100000000000000")}, + {"rv:fffffffffffffffff", ns, ns, nil, RValueSizeError("fffffffffffffffff")}, + + // no trailing ; + {"x:1;", ns, ns, nil, strconv.ErrSyntax}, + + // empty key + {"x:", ns, ns, []string{"x:"}, nil}, + + // charset test + {"x:0X1FFF;y:.-_-.;z:", ns, ns, []string{"x:0X1FFF", "y:.-_-.", "z:"}, nil}, + {"x1y2z3:1-2-3;y1:y_1;xy:-;th:50", ns, "50", []string{"x1y2z3:1-2-3", "y1:y_1", "xy:-"}, nil}, + + // size exceeded + {"x:" + strings.Repeat("_", 255), ns, ns, nil, ErrTraceStateSize}, + {"x:" + strings.Repeat("_", 254), ns, ns, []string{"x:" + strings.Repeat("_", 254)}, nil}, + } { + t.Run(testName(test.in), func(t *testing.T) { + otts, err := NewOTelTraceState(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), "%q: not expecting %v wanted %v", test.in, err, test.expectErr) + } else { + require.NoError(t, err) + } + if test.rval != ns { + require.True(t, otts.HasRValue()) + require.Equal(t, test.rval, otts.RValue()) + } else { + require.False(t, otts.HasRValue(), "should have no r-value: %s", otts.RValue()) + } + if test.tval != ns { + require.True(t, otts.HasTValue()) + require.Equal(t, test.tval, otts.TValue()) + } else { + require.False(t, otts.HasTValue(), "should have no t-value: %s", otts.TValue()) + } + var expect []KV + for _, ex := range test.extra { + k, v, _ := strings.Cut(ex, ":") + expect = append(expect, KV{ + Key: k, + Value: v, + }) + } + require.Equal(t, expect, otts.ExtraValues()) + + if test.expectErr != nil { + return + } + // on success Serialize() should not modify + // test by re-parsing + var w strings.Builder + otts.Serialize(&w) + cpy, err := NewOTelTraceState(w.String()) + require.NoError(t, err) + require.Equal(t, otts, cpy) + }) + } +} + +func TestUpdateTValueWithSampling(t *testing.T) { + type testCase struct { + // The input otel tracestate; no error conditions tested + in string + + // The incoming adjusted count; defined whether + // t-value is present or not. + adjCountIn float64 + + // the update probability; threshold and tvalue are + // derived from this + prob float64 + + // when update error is expected + updateErr error + + // output t-value + out string + + // output adjusted count + adjCountOut float64 + } + for _, test := range []testCase{ + // 8/16 in, sampled at (0x10-0xe)/0x10 = 2/16 => adjCount 8 + {"th:8", 2, 0x2p-4, nil, "th:e", 8}, + + // 8/16 in, sampled at 14/16 => no update, adjCount 2 + {"th:8", 2, 0xep-4, nil, "th:8", 2}, + + // 1/16 in, 50% update (error) + {"th:f", 16, 0x8p-4, ErrInconsistentSampling, "th:f", 16}, + + // 1/1 sampling in, 1/16 update + {"th:0", 1, 0x1p-4, nil, "th:f", 16}, + + // no t-value in, 1/16 update + {"", 0, 0x1p-4, nil, "th:f", 16}, + + // none in, 100% update + {"", 0, 1, nil, "th:0", 1}, + + // 1/2 in, 100% update (error) + {"th:8", 2, 1, ErrInconsistentSampling, "th:8", 2}, + + // 1/1 in, 0x1p-56 update + {"th:0", 1, 0x1p-56, nil, "th:ffffffffffffff", 0x1p56}, + + // 1/1 in, 0x1p-56 update + {"th:0", 1, 0x1p-56, nil, "th:ffffffffffffff", 0x1p56}, + + // 2/3 in, 1/3 update. Note that 0x555 + 0xaab = 0x1000. + {"th:555", 1 / (1 - 0x555p-12), 0x555p-12, nil, "th:aab", 1 / (1 - 0xaabp-12)}, + } { + t.Run(test.in+"/"+test.out, func(t *testing.T) { + otts := OTelTraceState{} + if test.in != "" { + var err error + otts, err = NewOTelTraceState(test.in) + require.NoError(t, err) + } + + require.Equal(t, test.adjCountIn, otts.AdjustedCount()) + + newTh, err := ProbabilityToThreshold(test.prob) + require.NoError(t, err) + + upErr := otts.UpdateTValueWithSampling(newTh, newTh.TValue()) + + if test.updateErr != nil { + require.Equal(t, test.updateErr, upErr) + } + + var outData strings.Builder + err = otts.Serialize(&outData) + require.NoError(t, err) + require.Equal(t, test.out, outData.String()) + + require.Equal(t, test.adjCountOut, otts.AdjustedCount()) + }) + } +} diff --git a/pkg/sampling/probability.go b/pkg/sampling/probability.go new file mode 100644 index 000000000000..0d723b28737a --- /dev/null +++ b/pkg/sampling/probability.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "errors" + "math" +) + +// ErrProbabilityRange is returned when a value should be in the range [1/MaxAdjustedCount, 1]. +var ErrProbabilityRange = errors.New("sampling probability out of range (0x1p-56 <= valid <= 1)") + +// ErrPrecisionUnderflow is returned when a precision is too great for the range. +var ErrPrecisionUnderflow = errors.New("sampling precision underflow") + +// MinSamplingProbability is the smallest representable probability +// and is the inverse of MaxAdjustedCount. +const MinSamplingProbability = 1.0 / MaxAdjustedCount + +// probabilityInRange tests MinSamplingProb <= prob <= 1. +func probabilityInRange(prob float64) bool { + return prob >= MinSamplingProbability && prob <= 1 +} + +func ProbabilityToThreshold(prob float64) (Threshold, error) { + // Probability cases + if !probabilityInRange(prob) { + return AlwaysSampleThreshold, ErrProbabilityRange + } + + scaled := uint64(math.Round(prob * MaxAdjustedCount)) + + return Threshold{ + unsigned: MaxAdjustedCount - scaled, + }, nil +} + +func ProbabilityToThresholdWithPrecision(prob float64, prec uint8) (Threshold, error) { + // Assume full precision at 0. + if prec == 0 { + return ProbabilityToThreshold(prob) + } + if !probabilityInRange(prob) { + return AlwaysSampleThreshold, ErrProbabilityRange + } + + // Adjust precision considering the significance of leading + // zeros. If we can multiply the rejection probability by 16 + // and still be less than 1, then there is a leading zero of + // obligatory precision. + for reject := 1 - prob; reject*16 < 1; { + reject *= 16 + prec++ + } + + // Check if leading zeros plus precision is above the maximum. + // This is called underflow because the requested precision + // leads to complete no significant figures. + if prec > NumHexDigits { + return AlwaysSampleThreshold, ErrPrecisionUnderflow + } + + scaled := uint64(math.Round(prob * MaxAdjustedCount)) + rscaled := MaxAdjustedCount - scaled + shift := 4 * (14 - prec) + half := uint64(1) << (shift - 1) + + rscaled = (rscaled + half) >> shift + rscaled = rscaled << shift + + return Threshold{ + unsigned: rscaled, + }, nil +} + +// Probability is the sampling ratio in the range [MinSamplingProb, 1]. +func (t Threshold) Probability() float64 { + return float64(MaxAdjustedCount-t.unsigned) / MaxAdjustedCount +} diff --git a/pkg/sampling/randomness.go b/pkg/sampling/randomness.go new file mode 100644 index 000000000000..4a1c69e26c2a --- /dev/null +++ b/pkg/sampling/randomness.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "encoding/binary" + "fmt" + "strconv" + + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// RValueSizeError indicates the size was not 14 bytes. This may allow +// parsing the legacy r-value. +type RValueSizeError string + +// numRandomnessValues equals MaxAdjustedCount--this variable has been +// introduced to improve readability. Recall that MaxAdjustedCount is +// 2**56 which is one greater than the maximum RValue +// ("ffffffffffffff", i.e., "100000000000000"). +const numRandomnessValues = MaxAdjustedCount + +// Error indicates that 14 bytes are needed. +func (r RValueSizeError) Error() string { + return fmt.Sprintf("r-value must have 14 hex digits: %q", string(r)) +} + +// LeastHalfTraceIDThresholdMask is the mask to use on the +// least-significant half of the TraceID, i.e., bytes 8-15. +// Because this is a 56 bit mask, the result after masking is +// the unsigned value of bytes 9 through 15. +// +// This helps extract 56 bits of randomness from the second half of +// the TraceID, as specified in https://www.w3.org/TR/trace-context-2/#randomness-of-trace-id +const LeastHalfTraceIDThresholdMask = MaxAdjustedCount - 1 + +// Randomness may be derived from R-value or TraceID. +// +// Randomness contains 56 bits of randomness, derived in one of two ways, see: +// https://www.w3.org/TR/trace-context-2/#randomness-of-trace-id +type Randomness struct { + // unsigned is in the range [0, MaxAdjustedCount-1] + unsigned uint64 +} + +// TraceIDToRandomness returns randomness from a TraceID (assumes +// the traceparent random flag was set). +func TraceIDToRandomness(id pcommon.TraceID) Randomness { + // To get the 56 bits we want, take the second half of the trace ID, + leastHalf := binary.BigEndian.Uint64(id[8:]) + return Randomness{ + // Then apply the mask to get the least-significant 56 bits / 7 bytes. + // Equivalently stated: zero the most-significant 8 bits. + unsigned: leastHalf & LeastHalfTraceIDThresholdMask, + } +} + +// RValueToRandomness parses NumHexDigits hex bytes into a Randomness. +func RValueToRandomness(s string) (Randomness, error) { + if len(s) != NumHexDigits { + return Randomness{}, RValueSizeError(s) + } + + unsigned, err := strconv.ParseUint(s, hexBase, 64) + if err != nil { + return Randomness{}, err + } + + return Randomness{ + unsigned: unsigned, + }, nil +} + +// ToRValue formats the r-value encoding. +func (rnd Randomness) RValue() string { + // The important part here is to format a full 14-byte hex + // string, including leading zeros. We could accomplish the + // same with custom code or with fmt.Sprintf directives, but + // here we let strconv.FormatUint fill in leading zeros, as + // follows: + // + // Format (numRandomnessValues+Randomness) as a hex string + // Strip the leading hex digit, which is a "1" by design + // + // For example, a randomness that requires two leading zeros + // (all in hexadecimal): + // + // randomness is 7 bytes: aabbccddeeff + // numRandomnessValues is 2^56: 100000000000000 + // randomness+numRandomnessValues: 100aabbccddeeff + // strip the leading "1": 00aabbccddeeff + return strconv.FormatUint(numRandomnessValues+rnd.unsigned, hexBase)[1:] + +} diff --git a/pkg/sampling/threshold.go b/pkg/sampling/threshold.go new file mode 100644 index 000000000000..81ea0b6d4abb --- /dev/null +++ b/pkg/sampling/threshold.go @@ -0,0 +1,105 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "errors" + "strconv" + "strings" +) + +const ( + // MaxAdjustedCount is 2^56 i.e. 0x100000000000000 i.e., 1<<56. + MaxAdjustedCount = 1 << 56 + + // NumHexDigits is the number of hex digits equalling 56 bits. + NumHexDigits = 56 / hexBits + + hexBits = 4 + hexBase = 16 + + NeverSampleTValue = "0" +) + +// Threshold used to compare with the least-significant 7 bytes of the TraceID. +type Threshold struct { + // unsigned is in the range [0, MaxAdjustedCount] + // - 0 represents always sampling (0 Random values are less-than) + // - 1 represents sampling 1-in-(MaxAdjustedCount-1) + // - MaxAdjustedCount represents always sampling 1-in- + unsigned uint64 +} + +var ( + // ErrTValueSize is returned for t-values longer than NumHexDigits hex digits. + ErrTValueSize = errors.New("t-value exceeds 14 hex digits") + + // ErrEmptyTValue indicates no t-value was found, i.e., no threshold available. + ErrTValueEmpty = errors.New("t-value is empty") + + // AlwaysSampleThreshold represents 100% sampling. + AlwaysSampleThreshold = Threshold{unsigned: 0} +) + +// TValueToThreshold returns a Threshold. Because TValue strings +// have trailing zeros omitted, this function performs the reverse. +func TValueToThreshold(s string) (Threshold, error) { + if len(s) > NumHexDigits { + return AlwaysSampleThreshold, ErrTValueSize + } + if len(s) == 0 { + return AlwaysSampleThreshold, ErrTValueEmpty + } + + // Having checked length above, there are no range errors + // possible. Parse the hex string to an unsigned valued. + unsigned, err := strconv.ParseUint(s, hexBase, 64) + if err != nil { + return AlwaysSampleThreshold, err // e.g. parse error + } + + // The unsigned value requires shifting to account for the + // trailing zeros that were omitted by the encoding (see + // TValue for the reverse). Compute the number to shift by: + extendByHexZeros := NumHexDigits - len(s) + return Threshold{ + unsigned: unsigned << (hexBits * extendByHexZeros), + }, nil +} + +// TValue encodes a threshold, which is a variable-length hex string +// up to 14 characters. The empty string is returned for 100% +// sampling. +func (th Threshold) TValue() string { + // Always-sample is a special case because TrimRight() below + // will trim it to the empty string, which represents no t-value. + if th == AlwaysSampleThreshold { + return "0" + } + // For thresholds other than the extremes, format a full-width + // (14 digit) unsigned value with leading zeros, then, remove + // the trailing zeros. Use the logic for (Randomness).RValue(). + digits := Randomness(th).RValue() + + // Remove trailing zeros. + return strings.TrimRight(digits, "0") +} + +// ShouldSample returns true when the span passes this sampler's +// consistent sampling decision. +func (t Threshold) ShouldSample(rnd Randomness) bool { + return rnd.unsigned >= t.unsigned +} + +// ThresholdGreater allows direct comparison of Threshold values. +// Greater thresholds equate with smaller sampling probabilities. +func ThresholdGreater(a, b Threshold) bool { + return a.unsigned > b.unsigned +} + +// ThresholdLessThan allows direct comparison of Threshold values. +// Smaller thresholds equate with greater sampling probabilities. +func ThresholdLessThan(a, b Threshold) bool { + return a.unsigned < b.unsigned +} diff --git a/pkg/sampling/w3ctracestate.go b/pkg/sampling/w3ctracestate.go new file mode 100644 index 000000000000..855ddb291339 --- /dev/null +++ b/pkg/sampling/w3ctracestate.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "io" + "regexp" + "strconv" + "strings" +) + +type W3CTraceState struct { + commonTraceState + otts OTelTraceState +} + +const ( + hardMaxNumPairs = 32 + hardMaxW3CLength = 1024 + hardMaxKeyLength = 256 + hardMaxTenantLength = 241 + hardMaxSystemLength = 14 + + otelVendorCode = "ot" + + // keyRegexp is not an exact test, it permits all the + // characters and then we check various conditions. + + // key = simple-key / multi-tenant-key + // simple-key = lcalpha 0*255( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ) + // multi-tenant-key = tenant-id "@" system-id + // tenant-id = ( lcalpha / DIGIT ) 0*240( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ) + // system-id = lcalpha 0*13( lcalpha / DIGIT / "_" / "-"/ "*" / "/" ) + // lcalpha = %x61-7A ; a-z + + lcAlphaRegexp = `[a-z]` + lcDigitPunctRegexp = `[a-z0-9\-\*/_]` + lcDigitRegexp = `[a-z0-9]` + multiTenantSep = `@` + tenantIDRegexp = lcDigitRegexp + lcDigitPunctRegexp + `*` // could be {0,hardMaxTenantLength-1} + systemIDRegexp = lcAlphaRegexp + lcDigitPunctRegexp + `*` // could be {0,hardMaxSystemLength-1} + multiTenantKeyRegexp = tenantIDRegexp + multiTenantSep + systemIDRegexp + simpleKeyRegexp = lcAlphaRegexp + lcDigitPunctRegexp + `*` // could be {0,hardMaxKeyLength-1} + keyRegexp = `(?:(?:` + simpleKeyRegexp + `)|(?:` + multiTenantKeyRegexp + `))` + + // value = 0*255(chr) nblk-chr + // nblk-chr = %x21-2B / %x2D-3C / %x3E-7E + // chr = %x20 / nblk-chr + // + // Note the use of double-quoted strings in two places below. + // This is for \x expansion in these two cases. Also note + // \x2d is a hyphen character, so a quoted \ (i.e., \\\x2d) + // appears below. + valueNonblankCharRegexp = "[\x21-\x2b\\\x2d-\x3c\x3e-\x7e]" + valueCharRegexp = "[\x20-\x2b\\\x2d-\x3c\x3e-\x7e]" + valueRegexp = valueCharRegexp + `{0,255}` + valueNonblankCharRegexp + + // tracestate = list-member 0*31( OWS "," OWS list-member ) + // list-member = (key "=" value) / OWS + + owsCharSet = ` \t` + owsRegexp = `(?:[` + owsCharSet + `]*)` + w3cMemberRegexp = `(?:` + keyRegexp + `=` + valueRegexp + `)?` + + // This regexp is large enough that regexp impl refuses to + // make 31 copies of it (i.e., `{0,31}`) so we use `*` below. + w3cOwsMemberOwsRegexp = `(?:` + owsRegexp + w3cMemberRegexp + owsRegexp + `)` + w3cCommaOwsMemberOwsRegexp = `(?:` + `,` + w3cOwsMemberOwsRegexp + `)` + + // The limit to 31 of owsCommaMemberRegexp is applied in code. + w3cTracestateRegexp = `^` + w3cOwsMemberOwsRegexp + w3cCommaOwsMemberOwsRegexp + `*$` +) + +var ( + w3cTracestateRe = regexp.MustCompile(w3cTracestateRegexp) + + w3cSyntax = keyValueScanner{ + maxItems: hardMaxNumPairs, + trim: true, + separator: ',', + equality: '=', + } +) + +func NewW3CTraceState(input string) (w3c W3CTraceState, _ error) { + if len(input) > hardMaxW3CLength { + return w3c, ErrTraceStateSize + } + + if !w3cTracestateRe.MatchString(input) { + return w3c, strconv.ErrSyntax + } + + err := w3cSyntax.scanKeyValues(input, func(key, value string) error { + if len(key) > hardMaxKeyLength { + return ErrTraceStateSize + } + if tenant, system, found := strings.Cut(key, multiTenantSep); found { + if len(tenant) > hardMaxTenantLength { + return ErrTraceStateSize + } + if len(system) > hardMaxSystemLength { + return ErrTraceStateSize + } + } + switch key { + case otelVendorCode: + var err error + w3c.otts, err = NewOTelTraceState(value) + return err + default: + w3c.kvs = append(w3c.kvs, KV{ + Key: key, + Value: value, + }) + return nil + } + }) + return w3c, err +} + +func (w3c *W3CTraceState) HasAnyValue() bool { + return w3c.HasOTelValue() || w3c.HasExtraValues() +} + +func (w3c *W3CTraceState) OTelValue() *OTelTraceState { + return &w3c.otts +} + +func (w3c *W3CTraceState) HasOTelValue() bool { + return w3c.otts.HasAnyValue() +} + +func (w3c *W3CTraceState) Serialize(w io.StringWriter) error { + ser := serializer{writer: w} + cnt := 0 + sep := func() { + if cnt != 0 { + ser.write(",") + } + cnt++ + } + if w3c.otts.HasAnyValue() { + sep() + ser.write("ot=") + ser.check(w3c.otts.Serialize(w)) + } + for _, kv := range w3c.ExtraValues() { + sep() + ser.write(kv.Key) + ser.write("=") + ser.write(kv.Value) + } + return ser.err +} diff --git a/pkg/sampling/w3ctracestate_test.go b/pkg/sampling/w3ctracestate_test.go new file mode 100644 index 000000000000..ae06ddfff3fa --- /dev/null +++ b/pkg/sampling/w3ctracestate_test.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "errors" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseW3CTraceState(t *testing.T) { + type testCase struct { + in string + rval string + tval string + extra map[string]string + expectErr error + } + const ns = "" + for _, test := range []testCase{ + // correct cases + {"ot=th:1", ns, "1", nil, nil}, + {" ot=th:1 ", ns, "1", nil, nil}, + {"ot=th:1", ns, "1", nil, nil}, + {" ot=th:1 ", ns, "1", nil, nil}, + {" ot=th:1,other=value ", ns, "1", map[string]string{ + "other": "value", + }, nil}, + {"ot=th:1 , other=value", ns, "1", map[string]string{ + "other": "value", + }, nil}, + {",,,", ns, ns, nil, nil}, + {" , ot=th:1, , other=value ", ns, "1", map[string]string{ + "other": "value", + }, nil}, + {"ot=th:100;rv:abcdabcdabcdff", "abcdabcdabcdff", "100", nil, nil}, + {" ot=th:100;rv:abcdabcdabcdff", "abcdabcdabcdff", "100", nil, nil}, + {"ot=th:100;rv:abcdabcdabcdff ", "abcdabcdabcdff", "100", nil, nil}, + {"ot=rv:11111111111111", "11111111111111", ns, nil, nil}, + {"ot=rv:ffffffffffffff,unknown=value,other=something", "ffffffffffffff", ns, map[string]string{ + "other": "something", + "unknown": "value", + }, nil}, + + // syntax errors + {"-1=2", ns, ns, nil, strconv.ErrSyntax}, // invalid key char + {"=", ns, ns, nil, strconv.ErrSyntax}, // invalid empty key + + // size errors + {strings.Repeat("x", hardMaxKeyLength+1) + "=v", ns, ns, nil, ErrTraceStateSize}, // too long simple key + {strings.Repeat("x", hardMaxTenantLength+1) + "@y=v", ns, ns, nil, ErrTraceStateSize}, // too long multitenant-id + {"y@" + strings.Repeat("x", hardMaxSystemLength+1) + "=v", ns, ns, nil, ErrTraceStateSize}, // too long system-id + {"x=" + strings.Repeat("y", hardMaxW3CLength-1), ns, ns, nil, ErrTraceStateSize}, + {strings.Repeat("x=y,", hardMaxNumPairs) + "x=y", ns, ns, nil, ErrTraceStateSize}, + } { + t.Run(testName(test.in), func(t *testing.T) { + w3c, err := NewW3CTraceState(test.in) + + if test.expectErr != nil { + require.True(t, errors.Is(err, test.expectErr), + "%q: not expecting %v wanted %v", test.in, err, test.expectErr, + ) + } else { + require.NoError(t, err, "%q", test.in) + } + if test.rval != ns { + require.True(t, w3c.HasOTelValue()) + require.True(t, w3c.HasAnyValue()) + require.True(t, w3c.OTelValue().HasRValue()) + require.Equal(t, test.rval, w3c.OTelValue().RValue()) + } else { + require.False(t, w3c.OTelValue().HasRValue(), "should have no r-value") + } + if test.tval != ns { + require.True(t, w3c.HasOTelValue()) + require.True(t, w3c.HasAnyValue()) + require.True(t, w3c.OTelValue().HasTValue()) + require.Equal(t, test.tval, w3c.OTelValue().TValue()) + } else { + require.False(t, w3c.OTelValue().HasTValue(), "should have no t-value") + } + if test.extra != nil { + require.True(t, w3c.HasAnyValue()) + actual := map[string]string{} + for _, kv := range w3c.ExtraValues() { + actual[kv.Key] = kv.Value + } + require.Equal(t, test.extra, actual) + } + + if test.expectErr != nil { + return + } + // on success Serialize() should not modify + // test by re-parsing + var w strings.Builder + w3c.Serialize(&w) + cpy, err := NewW3CTraceState(w.String()) + require.NoError(t, err, "with %v", w.String()) + require.Equal(t, w3c, cpy, "with %v", w.String()) + }) + } +} diff --git a/processor/probabilisticsamplerprocessor/README.md b/processor/probabilisticsamplerprocessor/README.md index 8efe43dd3cb5..d9f37611ef98 100644 --- a/processor/probabilisticsamplerprocessor/README.md +++ b/processor/probabilisticsamplerprocessor/README.md @@ -54,6 +54,8 @@ The following configuration options can be modified: - `attribute_source` (default = traceID, optional): defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. - `from_attribute` (default = null, optional): The optional name of a log record attribute used for sampling purposes, such as a unique log record ID. The value of the attribute is only used if the trace ID is absent or if `attribute_source` is set to `record`. - `sampling_priority` (default = null, optional): The optional name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. 0 means to never sample the log record, and >= 100 means to always sample the log record. +- `mode` (default = "", optional): The optional sampling mode. One of "hash_seed", "equalizing", and "propotional". By default, when not explicitly set, if "hash_seed" is non-zero, the "hash_seed" mode will be configured, otherwise the "proportional" mode is selected. +- `sampling_precision` (default = 3, optional): The number of digits of precision used to express the desired exactness. ## Hashing @@ -68,6 +70,7 @@ Sample 15% of the logs: ```yaml processors: probabilistic_sampler: + mode: hash_seed sampling_percentage: 15 ``` @@ -76,6 +79,7 @@ Sample logs according to their logID attribute: ```yaml processors: probabilistic_sampler: + mode: hash_seed sampling_percentage: 15 attribute_source: record # possible values: one of record or traceID from_attribute: logID # value is required if the source is not traceID @@ -86,10 +90,61 @@ Sample logs according to the attribute `priority`: ```yaml processors: probabilistic_sampler: + mode: hash_seed sampling_percentage: 15 sampling_priority: priority ``` +## Consistent Probability Sampling + +This processor includes an implementation of the tail sampling logic +described in [OTEP +235](https://github.com/open-telemetry/oteps/pull/235), which encodes +probability sampling information in the OpenTelemetry sub-field of the +W3C TraceState. There are two modes supported. + +- `proportional`: In this mode, the `sampling_percentage` + configuration is applied such that the number of spans exiting the + sampling is proportional to the number of spans entering the + sampling, regardless of how much sampling was already applied. All + sampling percentages are valid in this mode. +- `equalizing`: In this mode, the `sampling_percentage` configuration + is applied such that spans exit the sampler reduced to a minimum + probability. When spans arrive with probability equal to the + configured sampling percentage, the spans pass through unmodified. + When spans arrive with probability smaller than the configured + sampling percentage, errors are reported. When spans arrive with + larger probability than the configured sampling percentage, they + will be reduced in number as spans exit with the configured sampling + percentage. + +For example, to configure the proportional sampler, simply omit the +`hash_seed` field: -Refer to [config.yaml](./testdata/config.yaml) for detailed -examples on using the processor. +``` +processors: + probabilistic_sampler: + # no hash_seed is set, uses proportional consistent by default + sampling_percentage: 10 +``` + +For example, to configure an equalizing sampler, set the mode explicitly: + +``` +processors: + probabilistic_sampler: + mode: equalizing + sampling_percentage: 10 +``` + +The optional `sampling_precision` field determines how many +hexadecimal digits are used to express the sampling rejection +threshold. By default, 3 hex digits are used. For example, 60% +sampling is approximated as "666" with precision 3, because the +rejection threshold of 40% is approximated by `0x666` out of `0x1000`, +indicating a sampling probability of precisely 60.009765625%. + +## Detailed examples + +Refer to [config.yaml](./testdata/config.yaml) for detailed examples +on using the processor. diff --git a/processor/probabilisticsamplerprocessor/config.go b/processor/probabilisticsamplerprocessor/config.go index 631a84a99c83..69017996845a 100644 --- a/processor/probabilisticsamplerprocessor/config.go +++ b/processor/probabilisticsamplerprocessor/config.go @@ -5,7 +5,9 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "fmt" + "math" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" "go.opentelemetry.io/collector/component" ) @@ -26,15 +28,48 @@ var validAttributeSource = map[AttributeSource]bool{ // Config has the configuration guiding the sampler processor. type Config struct { - // SamplingPercentage is the percentage rate at which traces or logs are going to be sampled. Defaults to zero, i.e.: no sample. - // Values greater or equal 100 are treated as "sample all traces/logs". + // SamplingPercentage is the percentage rate at which traces or logs are going to be sampled. Defaults + // to zero, i.e.: no sample. Values greater or equal 100 are treated as "sample all traces/logs". This + // is treated as having four significant figures when conveying the sampling probability. SamplingPercentage float32 `mapstructure:"sampling_percentage"` - // HashSeed allows one to configure the hashing seed. This is important in scenarios where multiple layers of collectors - // have different sampling rates: if they use the same seed all passing one layer may pass the other even if they have - // different sampling rates, configuring different seeds avoids that. + // HashSeed allows one to configure the hashing seed. This is important in scenarios where multiple + // layers of collectors have different sampling rates: if they use the same seed all passing one layer + // may pass the other even if they have different sampling rates, configuring different seeds avoids + // that. HashSeed uint32 `mapstructure:"hash_seed"` + // SamplerMode selects the sampling behavior. Supported values: + // + // - "hash_seed_downsample": the legacy behavior of this + // processor. Using an FNV hash combined with the HashSeed + // value, this sampler performs a non-consistent + // probabilistic downsampling. The number of spans output + // is expected to equal SamplingPercentage (as a ratio) + // times the number of spans inpout. Statistically, a + // span-to-metrics pipeline based on this mechanism may have + // anomalous behavior. + // + // - "equalizing": Using an OTel-specified consistent sampling + // mechanism, this sampler selectively reduces the effective + // sampling probability of arriving spans. This can be + // useful to select a small fraction of complete traces from + // a stream with mixed sampling rates. The rate of spans + // passing through depends on how much sampling has already + // been applied. If an arriving span was head sampled at + // the same probability it passes through. If the span + // arrives with lower probability, a warning is logged + // because it means this sampler is configured with too + // large a sampling probability to ensure complete traces. + // + // - "proportional": Using an OTel-specified consistent sampling + // mechanism, this sampler reduces the effective sampling + // probability of each span by `SamplingProbability`. + SamplerMode SamplerMode `mapstructure:"mode"` + + /////// + // Logs only fields below. + // AttributeSource (logs only) defines where to look for the attribute in from_attribute. The allowed values are // `traceID` or `record`. Default is `traceID`. AttributeSource `mapstructure:"attribute_source"` @@ -46,17 +81,40 @@ type Config struct { // SamplingPriority (logs only) allows to use a log record attribute designed by the `sampling_priority` key // to be used as the sampling priority of the log record. SamplingPriority string `mapstructure:"sampling_priority"` + + // How many hex digits of th: value to use, max, from 1 up to + // 14. Default is 3. + SamplingPrecision uint8 `mapstructure:"sampling_precision"` } var _ component.Config = (*Config)(nil) // Validate checks if the processor configuration is valid func (cfg *Config) Validate() error { - if cfg.SamplingPercentage < 0 { - return fmt.Errorf("negative sampling rate: %.2f", cfg.SamplingPercentage) + ratio := float64(cfg.SamplingPercentage) / 100.0 + + switch { + case ratio < 0: + return fmt.Errorf("negative sampling rate: %.2f%%", cfg.SamplingPercentage) + case ratio == 0: + // Special case + case ratio < (1 / sampling.MaxAdjustedCount): + return fmt.Errorf("sampling rate is too small: %.2f%%", cfg.SamplingPercentage) + case ratio > 1: + return fmt.Errorf("sampling rate is too small: %.2f%%", cfg.SamplingPercentage) + case math.IsInf(ratio, 0) || math.IsNaN(ratio): + return fmt.Errorf("sampling rate is invalid: %.2f%%", cfg.SamplingPercentage) } + if cfg.AttributeSource != "" && !validAttributeSource[cfg.AttributeSource] { return fmt.Errorf("invalid attribute source: %v. Expected: %v or %v", cfg.AttributeSource, traceIDAttributeSource, recordAttributeSource) } + + if cfg.SamplingPrecision == 0 { + return fmt.Errorf("invalid sampling precision: 0") + } else if cfg.SamplingPrecision > sampling.NumHexDigits { + return fmt.Errorf("sampling precision is too great, should be <= 14: %d", cfg.SamplingPrecision) + } + return nil } diff --git a/processor/probabilisticsamplerprocessor/config_test.go b/processor/probabilisticsamplerprocessor/config_test.go index 90711d343552..6ba11c6cd0d5 100644 --- a/processor/probabilisticsamplerprocessor/config_test.go +++ b/processor/probabilisticsamplerprocessor/config_test.go @@ -26,7 +26,8 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ SamplingPercentage: 15.3, - HashSeed: 22, + SamplingPrecision: 4, + SamplerMode: "proportional", AttributeSource: "traceID", }, }, @@ -34,7 +35,9 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "logs"), expected: &Config{ SamplingPercentage: 15.3, + SamplingPrecision: 3, HashSeed: 22, + SamplerMode: "hash_seed", AttributeSource: "record", FromAttribute: "foo", SamplingPriority: "bar", diff --git a/processor/probabilisticsamplerprocessor/factory.go b/processor/probabilisticsamplerprocessor/factory.go index 8302e7471840..25d5bd1d6f8c 100644 --- a/processor/probabilisticsamplerprocessor/factory.go +++ b/processor/probabilisticsamplerprocessor/factory.go @@ -20,6 +20,8 @@ import ( var onceMetrics sync.Once +const defaultPrecision = 3 + // NewFactory returns a new factory for the Probabilistic sampler processor. func NewFactory() processor.Factory { onceMetrics.Do(func() { @@ -36,7 +38,9 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - AttributeSource: defaultAttributeSource, + AttributeSource: defaultAttributeSource, + SamplerMode: modeUnset, + SamplingPrecision: defaultPrecision, } } diff --git a/processor/probabilisticsamplerprocessor/factory_test.go b/processor/probabilisticsamplerprocessor/factory_test.go index cd4a6246c649..8818f49eb72d 100644 --- a/processor/probabilisticsamplerprocessor/factory_test.go +++ b/processor/probabilisticsamplerprocessor/factory_test.go @@ -15,22 +15,22 @@ import ( func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig() - assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + assert.NotNil(t, cfg, "failed to create default config") } func TestCreateProcessor(t *testing.T) { cfg := createDefaultConfig() set := processortest.NewNopCreateSettings() tp, err := createTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) - assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") + assert.NotNil(t, tp) } func TestCreateProcessorLogs(t *testing.T) { cfg := createDefaultConfig() set := processortest.NewNopCreateSettings() tp, err := createLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) - assert.NotNil(t, tp) assert.NoError(t, err, "cannot create logs processor") + assert.NotNil(t, tp) } diff --git a/processor/probabilisticsamplerprocessor/go.mod b/processor/probabilisticsamplerprocessor/go.mod index 02ffc88f330b..b814f591197f 100644 --- a/processor/probabilisticsamplerprocessor/go.mod +++ b/processor/probabilisticsamplerprocessor/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.90.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.90.0 github.com/stretchr/testify v1.8.4 go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.90.0 @@ -109,4 +110,6 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index d5f2ef3a75f8..0ad329b4576c 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -18,12 +18,12 @@ import ( ) type logSamplerProcessor struct { - scaledSamplingRate uint32 - hashSeed uint32 - traceIDEnabled bool - samplingSource string - samplingPriority string - logger *zap.Logger + hashScaledSamplingRate uint32 + hashSeed uint32 + traceIDEnabled bool + samplingSource string + samplingPriority string + logger *zap.Logger } // newLogsProcessor returns a processor.LogsProcessor that will perform head sampling according to the given @@ -31,12 +31,12 @@ type logSamplerProcessor struct { func newLogsProcessor(ctx context.Context, set processor.CreateSettings, nextConsumer consumer.Logs, cfg *Config) (processor.Logs, error) { lsp := &logSamplerProcessor{ - scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), - hashSeed: cfg.HashSeed, - traceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, - samplingPriority: cfg.SamplingPriority, - samplingSource: cfg.FromAttribute, - logger: set.Logger, + hashScaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), + hashSeed: cfg.HashSeed, + traceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, + samplingPriority: cfg.SamplingPriority, + samplingSource: cfg.FromAttribute, + logger: set.Logger, } return processorhelper.NewLogsProcessor( @@ -67,7 +67,7 @@ func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) ( lidBytes = getBytesFromValue(value) } } - priority := lsp.scaledSamplingRate + priority := lsp.hashScaledSamplingRate if lsp.samplingPriority != "" { if localPriority, ok := l.Attributes().Get(lsp.samplingPriority); ok { switch localPriority.Type() { diff --git a/processor/probabilisticsamplerprocessor/sampler_mode.go b/processor/probabilisticsamplerprocessor/sampler_mode.go new file mode 100644 index 000000000000..a6108774d795 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/sampler_mode.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package probabilisticsamplerprocessor + +import "fmt" + +type SamplerMode string + +const ( + HashSeed SamplerMode = "hash_seed" + Equalizing SamplerMode = "equalizing" + Proportional SamplerMode = "proportional" + DefaultMode SamplerMode = Proportional + modeUnset SamplerMode = "" +) + +var AllModes = []SamplerMode{HashSeed, Equalizing, Proportional} + +func (sm *SamplerMode) UnmarshalText(in []byte) error { + switch mode := SamplerMode(in); mode { + case HashSeed, + Equalizing, + Proportional, + modeUnset: + *sm = mode + return nil + default: + return fmt.Errorf("unsupported sampler mode %q", mode) + } +} diff --git a/processor/probabilisticsamplerprocessor/sampler_mode_test.go b/processor/probabilisticsamplerprocessor/sampler_mode_test.go new file mode 100644 index 000000000000..13dbe59cc722 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/sampler_mode_test.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package probabilisticsamplerprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnmarshalText(t *testing.T) { + tests := []struct { + samplerMode string + shouldError bool + }{ + { + samplerMode: "hash_seed", + }, + { + samplerMode: "equalizing", + }, + { + samplerMode: "proportional", + }, + { + samplerMode: "", + }, + { + samplerMode: "dunno", + shouldError: true, + }, + } + for _, tt := range tests { + t.Run(tt.samplerMode, func(t *testing.T) { + temp := modeUnset + err := temp.UnmarshalText([]byte(tt.samplerMode)) + if tt.shouldError { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, temp, SamplerMode(tt.samplerMode)) + }) + } +} diff --git a/processor/probabilisticsamplerprocessor/testdata/config.yaml b/processor/probabilisticsamplerprocessor/testdata/config.yaml index a834def5d98c..2c9510e42c8f 100644 --- a/processor/probabilisticsamplerprocessor/testdata/config.yaml +++ b/processor/probabilisticsamplerprocessor/testdata/config.yaml @@ -7,21 +7,17 @@ processors: # The "sampling.priority" semantics have priority over trace id hashing and # can be used to control if given spans are sampled, ie.: forwarded, or not. probabilistic_sampler: + # mode may be "proportional", "equalizing", or "hash_seed" + mode: proportional # the percentage rate at which traces are going to be sampled. Defaults to # zero, i.e.: no sample. Values greater or equal 100 are treated as # "sample all traces". sampling_percentage: 15.3 - # hash_seed allows one to configure the hashing seed. This is important in - # scenarios where multiple layers of collectors are used to achieve the - # desired sampling rate, eg.: 10% on first layer and 10% on the - # second, resulting in an overall sampling rate of 1% (10% x 10%). - # If all layers use the same seed, all data passing one layer will also pass - # the next one, independent of the configured sampling rate. Having different - # seeds at different layers ensures that sampling rate in each layer work as - # intended. - hash_seed: 22 + sampling_precision: 4 probabilistic_sampler/logs: + # mode may be "proportional", "equalizing", or "hash_seed" + mode: hash_seed # the percentage rate at which logs are going to be sampled. Defaults to # zero, i.e.: no sample. Values greater or equal 100 are treated as # "sample all logs". diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor.go b/processor/probabilisticsamplerprocessor/tracesprocessor.go index 96224e6f5704..7124f2b2c990 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor.go @@ -5,7 +5,9 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" + "fmt" "strconv" + "strings" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -15,6 +17,8 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) // samplingPriority has the semantic result of parsing the "sampling.priority" @@ -35,26 +39,132 @@ const ( // by the collector. doNotSampleSpan - // The constants help translate user friendly percentages to numbers direct used in sampling. + // Hashing method: The constants below help translate user friendly percentages + // to numbers direct used in sampling. numHashBuckets = 0x4000 // Using a power of 2 to avoid division. bitMaskHashBuckets = numHashBuckets - 1 percentageScaleFactor = numHashBuckets / 100.0 ) -type traceSamplerProcessor struct { - scaledSamplingRate uint32 - hashSeed uint32 - logger *zap.Logger +var ErrInconsistentArrivingTValue = fmt.Errorf("inconsistent arriving t-value: span should not have been sampled") + +type traceSampler interface { + // decide reports the result based on a probabilistic decision. + decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) + + // updateTracestate modifies the OTelTraceState assuming it will be + // sampled, probabilistically or otherwise. The "should" parameter + // is the result from decide(), for the span's TraceID, which + // will not be recalculated. + updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState) error +} + +type traceProcessor struct { + sampler traceSampler + logger *zap.Logger +} + +type traceHasher struct { + // Hash-based calculation + hashScaledSamplerate uint32 + hashSeed uint32 + probability float64 +} + +// traceEqualizer adjusts thresholds absolutely. Cannot be used with zero. +type traceEqualizer struct { + // TraceID-randomness-based calculation + traceIDThreshold sampling.Threshold + + // tValueEncoding includes the leading "t:" + tValueEncoding string +} + +// traceEqualizer adjusts thresholds relatively. Cannot be used with zero. +type traceProportionalizer struct { + ratio float64 + prec uint8 +} + +// zeroProbability is a bypass for all cases with Percent==0. +type zeroProbability struct { +} + +func randomnessFromSpan(s ptrace.Span) (sampling.Randomness, *sampling.W3CTraceState, error) { + state := s.TraceState() + raw := state.AsRaw() + + // Parse the arriving TraceState. + wts, err := sampling.NewW3CTraceState(raw) + var randomness sampling.Randomness + if err == nil && wts.OTelValue().HasRValue() { + // When the tracestate is OK and has r-value, use it. + randomness = wts.OTelValue().RValueRandomness() + } else { + // See https://github.com/open-telemetry/opentelemetry-proto/pull/503 + // which merged but unreleased at the time of writing. + // + // Note: When we have an additional flag indicating this + // randomness is present we should inspect the flag + // and return that no randomness is available, here. + randomness = sampling.TraceIDToRandomness(s.TraceID()) + } + return randomness, &wts, err } // newTracesProcessor returns a processor.TracesProcessor that will perform head sampling according to the given // configuration. func newTracesProcessor(ctx context.Context, set processor.CreateSettings, cfg *Config, nextConsumer consumer.Traces) (processor.Traces, error) { - tsp := &traceSamplerProcessor{ - // Adjust sampling percentage on private so recalculations are avoided. - scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), - hashSeed: cfg.HashSeed, - logger: set.Logger, + // README allows percents >100 to equal 100%, but t-value + // encoding does not. Correct it here. + pct := float64(cfg.SamplingPercentage) + if pct > 100 { + pct = 100 + } + + tp := &traceProcessor{ + logger: set.Logger, + } + + // error ignored below b/c already checked once + if cfg.SamplerMode == modeUnset { + if cfg.HashSeed != 0 { + cfg.SamplerMode = HashSeed + } else { + cfg.SamplerMode = DefaultMode + } + } + + if pct == 0 { + tp.sampler = &zeroProbability{} + } else { + ratio := pct / 100 + switch cfg.SamplerMode { + case HashSeed: + ts := &traceHasher{} + + // Adjust sampling percentage on private so recalculations are avoided. + ts.hashScaledSamplerate = uint32(pct * percentageScaleFactor) + ts.hashSeed = cfg.HashSeed + ts.probability = ratio + + tp.sampler = ts + case Equalizing: + threshold, err := sampling.ProbabilityToThresholdWithPrecision(ratio, cfg.SamplingPrecision) + if err != nil { + return nil, err + } + + tp.sampler = &traceEqualizer{ + tValueEncoding: threshold.TValue(), + traceIDThreshold: threshold, + } + case Proportional: + tp.sampler = &traceProportionalizer{ + ratio: ratio, + prec: cfg.SamplingPrecision, + } + } } return processorhelper.NewTracesProcessor( @@ -62,16 +172,109 @@ func newTracesProcessor(ctx context.Context, set processor.CreateSettings, cfg * set, cfg, nextConsumer, - tsp.processTraces, + tp.processTraces, processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) } -func (tsp *traceSamplerProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { +func (ts *traceHasher) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) { + // If one assumes random trace ids hashing may seems avoidable, however, traces can be coming from sources + // with various different criteria to generate trace id and perhaps were already sampled without hashing. + // Hashing here prevents bias due to such systems. + tid := s.TraceID() + decision := computeHash(tid[:], ts.hashSeed)&bitMaskHashBuckets < ts.hashScaledSamplerate + return decision, nil, nil +} + +func (ts *traceHasher) updateTracestate(_ pcommon.TraceID, _ bool, _ *sampling.W3CTraceState) error { + // No changes; any t-value will pass through. + return nil +} + +func (ts *traceEqualizer) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) { + rnd, wts, err := randomnessFromSpan(s) + if err != nil { + return false, nil, err + } + otts := wts.OTelValue() + // Consistency check: if the TraceID is out of range, the + // TValue is a lie. If inconsistent, clear it. + if otts.HasTValue() { + if !otts.TValueThreshold().ShouldSample(rnd) { + err = ErrInconsistentArrivingTValue + otts.ClearTValue() + } + } else if !otts.HasTValue() { + // Note: We could in this case attach another + // tracestate to signify that the incoming sampling + // threshold was at one point unknown. + } + + return ts.traceIDThreshold.ShouldSample(rnd), wts, err +} + +func (ts *traceEqualizer) updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState) error { + // When this sampler decided not to sample, the t-value becomes zero. + // Incoming TValue consistency is not checked when this happens. + if !should { + wts.OTelValue().ClearTValue() + return nil + } + // Spans that appear consistently sampled but arrive w/ zero + // adjusted count remain zero. + return wts.OTelValue().UpdateTValueWithSampling(ts.traceIDThreshold, ts.tValueEncoding) +} + +func (ts *traceProportionalizer) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) { + rnd, wts, err := randomnessFromSpan(s) + if err != nil { + return false, nil, err + } + otts := wts.OTelValue() + // Consistency check: if the TraceID is out of range, the + // TValue is a lie. If inconsistent, clear it. + if otts.HasTValue() && !otts.TValueThreshold().ShouldSample(rnd) { + err = ErrInconsistentArrivingTValue + otts.ClearTValue() + } + + incoming := 1.0 + if otts.HasTValue() { + incoming = otts.TValueThreshold().Probability() + } else { + // Note: We could in this case attach another + // tracestate to signify that the incoming sampling + // threshold was at one point unknown. + } + + threshold, _ := sampling.ProbabilityToThresholdWithPrecision(incoming*ts.ratio, ts.prec) + should := threshold.ShouldSample(rnd) + if should { + _ = otts.UpdateTValueWithSampling(threshold, threshold.TValue()) + } + return should, wts, err +} + +func (ts *traceProportionalizer) updateTracestate(tid pcommon.TraceID, should bool, wts *sampling.W3CTraceState) error { + if !should { + wts.OTelValue().ClearTValue() + } + return nil +} + +func (*zeroProbability) decide(s ptrace.Span) (bool, *sampling.W3CTraceState, error) { + return false, nil, nil +} + +func (*zeroProbability) updateTracestate(_ pcommon.TraceID, _ bool, _ *sampling.W3CTraceState) error { + return nil +} + +func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool { rs.ScopeSpans().RemoveIf(func(ils ptrace.ScopeSpans) bool { ils.Spans().RemoveIf(func(s ptrace.Span) bool { - sp := parseSpanSamplingPriority(s) - if sp == doNotSampleSpan { + priority := parseSpanSamplingPriority(s) + if priority == doNotSampleSpan { // The OpenTelemetry mentions this as a "hint" we take a stronger // approach and do not sample the span since some may use it to // remove specific spans from traces. @@ -83,24 +286,41 @@ func (tsp *traceSamplerProcessor) processTraces(ctx context.Context, td ptrace.T return true } - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "sampling_priority"), tag.Upsert(tagSampledKey, "true")}, - statCountTracesSampled.M(int64(1)), - ) - - // If one assumes random trace ids hashing may seems avoidable, however, traces can be coming from sources - // with various different criteria to generate trace id and perhaps were already sampled without hashing. - // Hashing here prevents bias due to such systems. - tidBytes := s.TraceID() - sampled := sp == mustSampleSpan || - computeHash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < tsp.scaledSamplingRate - - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "trace_id_hash"), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, - statCountTracesSampled.M(int64(1)), - ) + probSample, wts, err := tp.sampler.decide(s) + if err != nil { + tp.logger.Error("trace-state", zap.Error(err)) + } + + forceSample := priority == mustSampleSpan + sampled := forceSample || probSample + + if forceSample { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(tagPolicyKey, "sampling_priority"), tag.Upsert(tagSampledKey, "true")}, + statCountTracesSampled.M(int64(1)), + ) + } else { + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(tagPolicyKey, "trace_id_hash"), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, + statCountTracesSampled.M(int64(1)), + ) + } + + if sampled && wts != nil { + err := tp.sampler.updateTracestate(s.TraceID(), probSample, wts) + if err != nil { + tp.logger.Debug("tracestate update", zap.Error(err)) + } + + var w strings.Builder + if err := wts.Serialize(&w); err != nil { + tp.logger.Debug("tracestate serialize", zap.Error(err)) + } + s.TraceState().FromRaw(w.String()) + } + return !sampled }) // Filter out empty ScopeMetrics diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go index 0688b47a59e0..17fe756d76f3 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go @@ -5,6 +5,7 @@ package probabilisticsamplerprocessor import ( "context" + "fmt" "math" "math/rand" "testing" @@ -19,6 +20,7 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) func TestNewTracesProcessor(t *testing.T) { @@ -88,7 +90,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { cfg: &Config{ SamplingPercentage: 5, }, - numBatches: 1e5, + numBatches: 1e6, numTracesPerBatch: 2, acceptableDelta: 0.01, }, @@ -289,24 +291,31 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { sampled: true, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - sink := new(consumertest.TracesSink) - tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), tt.cfg, sink) - require.NoError(t, err) + for _, mode := range AllModes { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.SamplerMode = mode + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, sink) + require.NoError(t, err) - err = tsp.ConsumeTraces(context.Background(), tt.td) - require.NoError(t, err) + err = tsp.ConsumeTraces(context.Background(), tt.td) + require.NoError(t, err) - sampledData := sink.AllTraces() - if tt.sampled { - require.Equal(t, 1, len(sampledData)) - assert.Equal(t, 1, sink.SpanCount()) - } else { - require.Equal(t, 0, len(sampledData)) - assert.Equal(t, 0, sink.SpanCount()) - } - }) + sampledData := sink.AllTraces() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + } + }) + } } } @@ -391,6 +400,252 @@ func Test_parseSpanSamplingPriority(t *testing.T) { } } +// Test_tracesamplerprocessor_TraceState checks if handling of the context +// tracestate is correct. +func Test_tracesamplerprocessor_TraceState(t *testing.T) { + sid := idutils.UInt64ToSpanID(0xfefefefe) + singleSpanWithAttrib := func(ts, key string, attribValue pcommon.Value) ptrace.Traces { + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.TraceState().FromRaw(ts) + // This hard-coded TraceID will sample at 50% and not at 49%. + // The equivalent randomness is 0x80000000000000. + span.SetTraceID(pcommon.TraceID{ + // Don't care (9 bytes) + 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, 0xfe, + // Trace randomness (7 bytes) + 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }) + if key != "" { + attribValue.CopyTo(span.Attributes().PutEmpty(key)) + } + span.SetSpanID(sid) + return traces + } + tests := []struct { + name string + cfg *Config + ts string + key string + value pcommon.Value + sf func(SamplerMode) (sampled bool, adjCount float64, tracestate string) + }{ + { + name: "100 percent", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=th:0" + }, + }, + { + name: "50 percent sampled", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { return true, 2, "ot=th:8" }, + }, + { + name: "1 percent sampled", + cfg: &Config{ + SamplingPercentage: 1, + }, + // 99/100 = .fd70a3d70a3d70a3d + ts: "ot=rv:FD70A3D70A3D71", // note upper case passes through, is not generated + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:FD70A3D70A3D71;th:fd70a3d70a3d71" + }, + }, + { + name: "1 percent sampled with rvalue and precision", + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 3, + }, + ts: "ot=rv:FD70A3D70A3D71", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:FD70A3D70A3D71;th:fd7" + }, + }, + { + name: "1 percent not sampled with rvalue", + cfg: &Config{ + SamplingPercentage: 1, + }, + // 99/100 = .FD70A3D70A3D70A3D + ts: "ot=rv:FD70A3D70A3D70", + }, + { + name: "49 percent not sampled", + cfg: &Config{ + SamplingPercentage: 49, + }, + }, + { + name: "1 percent sampled with rvalue", + cfg: &Config{ + SamplingPercentage: 1, + }, + // 99/100 = .FD70A3D70A3D70A3D + ts: "ot=rv:fd70B000000000", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:fd70B000000000;th:fd70a3d70a3d71" + }, + }, + { + name: "sampled by priority", + cfg: &Config{ + SamplingPercentage: 1, + }, + ts: "", + key: "sampling.priority", + value: pcommon.NewValueInt(2), + sf: func(SamplerMode) (bool, float64, string) { return true, 0, "" }, + }, + { + name: "not sampled by priority", + cfg: &Config{ + SamplingPercentage: 99, + }, + ts: "", + key: "sampling.priority", + value: pcommon.NewValueInt(0), + }, + { + name: "incoming 50 percent", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "ot=rv:90000000000000;th:80000000000000", // note extra zeros! + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 2, "ot=rv:90000000000000;th:8" + } + return false, 0, "" + }, + }, + { + name: "incoming 50 percent with no rvalue", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "ot=th:8", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 2, "ot=th:8" + } + return false, 0, "" + }, + }, + { + name: "equalizing vs proportional", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "ot=rv:c0000000000000;th:8", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 2, "ot=rv:c0000000000000;th:8" + } + return true, 4, "ot=rv:c0000000000000;th:c" + }, + }, + { + name: "inconsistent threshold arriving", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "ot=rv:40000000000000;th:8", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=rv:40000000000000;th:0" + }, + }, + { + name: "inconsistent threshold not samp,led", + cfg: &Config{ + SamplingPercentage: 1, + }, + ts: "ot=rv:40000000000000;th:8", + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "40 percent precision 3", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 3, + }, + ts: "ot=rv:a0000000000000", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.4, "ot=rv:a0000000000000;th:99a" + }, + }, + { + name: "60 percent inconsistent resampled", + cfg: &Config{ + SamplingPercentage: 60, + SamplingPrecision: 4, + }, + // This th:8 is inconsistent with rv, is erased. But, the + // rv qualifies for the 60% sampling (th:666666 repeating) + ts: "ot=rv:70000000000000;th:8", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.6, "ot=rv:70000000000000;th:6666" + }, + }, + } + for _, tt := range tests { + for _, mode := range []SamplerMode{Equalizing, Proportional} { + t.Run(fmt.Sprint(mode, "_", tt.name), func(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.SamplerMode = mode + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, sink) + require.NoError(t, err) + td := singleSpanWithAttrib(tt.ts, tt.key, tt.value) + + err = tsp.ConsumeTraces(context.Background(), td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + + var expectSampled bool + var expectCount float64 + var expectTS string + if tt.sf != nil { + expectSampled, expectCount, expectTS = tt.sf(mode) + } + if expectSampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + got := sink.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + gotTs, err := sampling.NewW3CTraceState(got.TraceState().AsRaw()) + require.NoError(t, err) + if expectCount == 0 { + assert.Equal(t, 0.0, gotTs.OTelValue().AdjustedCount()) + } else if cfg.SamplingPrecision == 0 { + assert.InEpsilon(t, expectCount, gotTs.OTelValue().AdjustedCount(), 1e-9) + } else { + assert.InEpsilon(t, expectCount, gotTs.OTelValue().AdjustedCount(), 1e-3) + } + require.Equal(t, expectTS, got.TraceState().AsRaw()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + require.Equal(t, "", expectTS) + } + }) + } + } +} + func getSpanWithAttributes(key string, value pcommon.Value) ptrace.Span { span := ptrace.NewSpan() initSpanWithAttribute(key, value, span) diff --git a/versions.yaml b/versions.yaml index 0811b596ba6e..4990bb68c1a2 100644 --- a/versions.yaml +++ b/versions.yaml @@ -56,6 +56,7 @@ module-sets: - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/mezmoexporter - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter + - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/parquetexporter - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/pulsarexporter @@ -126,6 +127,7 @@ module-sets: - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry + - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure @@ -217,6 +219,7 @@ module-sets: - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/nsxtreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/oracledbreceiver + - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/podmanreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver