Skip to content

Commit

Permalink
progress on discovery alignment. Note might be erroneous to remove re…
Browse files Browse the repository at this point in the history
…ceiver.type from env vars
  • Loading branch information
hughesjj committed Feb 26, 2024
1 parent 09e2bae commit 1ad36d7
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 86 deletions.
2 changes: 1 addition & 1 deletion internal/common/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
DiscoReceiversKey = "receivers/splunk.discovery"
)

var NoType = component.NewID("")
var NoType = component.MustNewID("SENTINEL_FOR_DISCOVERY_RECEIVER___")

type StatusType string

Expand Down
2 changes: 1 addition & 1 deletion internal/common/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ func TestIsValidStatusType(t *testing.T) {
}

func TestNoTypeIsEmpty(t *testing.T) {
require.Equal(t, "", NoType.String())
require.Equal(t, "SENTINEL_FOR_DISCOVERY_RECEIVER___", NoType.String())
}
8 changes: 1 addition & 7 deletions internal/confmapprovider/discovery/properties/property.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,11 @@ func NewPropertyFromEnvVar(envVar, val string) (*Property, bool, error) {
return prop, true, err
}

// TODO hughesjj okay need to modify these.
// wordify takes an arbitrary string (utf8) and will hex encode any rune not in \w, escaping with `_x<hex-encoded-rune>_`.
func wordify(text string) string {
var wordified []rune
for _, c := range text {
// encoded all non-word characters to hex
// hughesjj TODO we should probably encode underscore as well given it's our escape character
// hughesjj TODO If not, could be bug if we ever pass "raw" _x<numbers>
// hughesjj TODO come to think of it, can we just *not* wordify the receiver name, given how restrictive the charset is now?
// hughesjj TODO at minimum I'd expect it to be mostly no-op
if c != '_' && c < '0' || (c > '9') && (c < 'A') || (c > 'Z') && (c < 'a') || (c > 'z') {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(c))
Expand All @@ -209,12 +204,11 @@ func wordify(text string) string {
return string(wordified)
}

// TODO hughesjj make this public an duse it at time of construction, unless we can encode config in name instea
// unwordify takes any string, expanding `_x<.>_` content as hex-decoded utf8 strings
func unwordify(text string) (string, error) {
var err error
unwordified := envVarHexRE.ReplaceAllStringFunc(text, func(s string) string {
s = s[2 : len(s)-1] // hughesjj perhaps check for literals here in range [0-9a-fA-F] and bail if outsider
s = s[2 : len(s)-1]
decoded, e := hex.DecodeString(s)
if e != nil {
err = multierr.Combine(err, fmt.Errorf("%q: %w", s, e))
Expand Down
27 changes: 9 additions & 18 deletions internal/confmapprovider/discovery/properties/property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,6 @@ func TestValidProperties(t *testing.T) {
Input: "splunk.discovery.receivers.receiver_type////.config.one::config",
},
},
{key: "splunk.discovery.extensions.extension--0-1-with-config-in-type-_x64__x86_🙈🙉🙊4:000x0;;0;;0;;-___-----type/e/x/t/e%ns<i>o<=n=>nam/e-with-config.config.o::n::e.config", val: "val",
expected: &Property{
ComponentType: "extensions",
Component: ComponentID{Type: "extension--0-1-with-config-in-type-_x64__x86_🙈🙉🙊4:000x0;;0;;0;;-___-----type", Name: "e/x/t/e%ns<i>o<=n=>nam/e-with-config"},
Type: "config",
Key: "o::n::e.config",
Val: "val",
stringMap: map[string]any{
"extensions": map[string]any{
"extension--0-1-with-config-in-type-_x64__x86_🙈🙉🙊4:000x0;;0;;0;;-___-----type/e/x/t/e%ns<i>o<=n=>nam/e-with-config": map[string]any{
"config": map[string]any{
"o": map[string]any{"n": map[string]any{"e.config": "val"}}},
},
},
},
Input: "splunk.discovery.extensions.extension--0-1-with-config-in-type-_x64__x86_🙈🙉🙊4:000x0;;0;;0;;-___-----type/e/x/t/e%ns<i>o<=n=>nam/e-with-config.config.o::n::e.config",
},
},
{key: "splunk.discovery.receivers.receiver_type////.enabled", val: "false",
expected: &Property{
stringMap: map[string]any{
Expand Down Expand Up @@ -247,4 +229,13 @@ func TestInvalidProperties(t *testing.T) {
require.Nil(t, p)
})
}
for _, tt := range []struct {
property, expectedError string
}{
{property: "splunk.discovery.extensions.extension--0-1-with-config-in-type-_x64__x86_🙈🙉🙊4:000x0;;0;;0;;-___-----type/e/x/t/e%ns<i>o<=n=>nam/e-with-config.config.o::n::e.config", expectedError: `invalid character(s) in type "extension--0-1-with-config-in-type-_x64__x86_🙈🙉🙊4:000x0;;0;;0;;-___-----type"`},
} {
t.Run(tt.property, func(t *testing.T) {
require.PanicsWithError(t, tt.expectedError, func() { NewProperty(tt.property, "val") })
})
}
}
20 changes: 13 additions & 7 deletions internal/receiver/discoveryreceiver/endpoint_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func TestEndpointToPLogsHappyPath(t *testing.T) {
Expand Down Expand Up @@ -163,7 +165,7 @@ func TestEndpointToPLogsHappyPath(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t1 := time.Now()
plogs, failed, err := endpointToPLogs(
component.NewIDWithName("observer.type", "observer.name"),
component.NewIDWithName("observer_type", "observer.name"),
"event.type", []observer.Endpoint{test.endpoint}, t0,
)
t2 := time.Now()
Expand Down Expand Up @@ -285,7 +287,7 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
test := tt
t.Run(test.name, func(t *testing.T) {
plogs, failed, err := endpointToPLogs(
component.NewIDWithName("observer.type", "observer.name"),
component.NewIDWithName("observer_type", "observer.name"),
"event.type", []observer.Endpoint{test.endpoint}, t0,
)
if test.expectedError != "" {
Expand All @@ -304,20 +306,24 @@ func TestEndpointToPLogsInvalidEndpoints(t *testing.T) {
}

func FuzzEndpointToPlogs(f *testing.F) {
f.Add("observer.type", "observer.name", "event.type",
f.Add("observer_type", "observer.name", "event.type",
"port.endpoint.id", "port.target", "port.name", "pod.name", "uid",
"label.one", "label.value.one", "label.two", "label.value.two",
"annotation.one", "annotation.value.one", "annotation.two", "annotation.value.two",
"namespace", "transport", uint16(1))
f.Add("", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", uint16(0))
f.Add(discovery.NoType.Type().String(), "", discovery.NoType.Type().String(), "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", uint16(0))
f.Fuzz(func(t *testing.T, observerType, observerName, eventType,
endpointID, target, portName, podName, uid,
labelOne, labelValueOne, labelTwo, labelValueTwo,
annotationOne, annotationValueOne, annotationTwo, annotationValueTwo,
namespace, transport string, port uint16) {
require.NotPanics(t, func() {
observerTypeSanitized, err := component.NewType(observerType)
if err != nil {
observerTypeSanitized = discovery.NoType.Type()
}
plogs, failed, err := endpointToPLogs(
component.MustNewIDWithName(observerType, observerName), eventType, []observer.Endpoint{
component.MustNewIDWithName(observerTypeSanitized.String(), observerName), eventType, []observer.Endpoint{
{
ID: observer.EndpointID(endpointID),
Target: target,
Expand Down Expand Up @@ -348,7 +354,7 @@ func FuzzEndpointToPlogs(f *testing.F) {
rAttrs := resourceLogs.Resource().Attributes()
rAttrs.PutStr("discovery.event.type", eventType)
rAttrs.PutStr("discovery.observer.name", observerName)
rAttrs.PutStr("discovery.observer.type", observerType)
rAttrs.PutStr("discovery.observer.type", observerTypeSanitized.String())
expectedLR := resourceLogs.ScopeLogs().At(0).LogRecords().At(0)
expectedLR.Body().SetStr(fmt.Sprintf("%s port endpoint %s", eventType, endpointID))
attrs := expectedLR.Attributes()
Expand Down Expand Up @@ -486,7 +492,7 @@ func expectedPLogs() plog.Logs {
rAttrs := plogs.ResourceLogs().AppendEmpty().Resource().Attributes()
rAttrs.PutStr("discovery.event.type", "event.type")
rAttrs.PutStr("discovery.observer.name", "observer.name")
rAttrs.PutStr("discovery.observer.type", "observer.type")
rAttrs.PutStr("discovery.observer.type", "observer_type")
sLogs := plogs.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
lr := sLogs.LogRecords().AppendEmpty()
lr.SetTimestamp(pcommon.NewTimestampFromTime(t0))
Expand Down
22 changes: 11 additions & 11 deletions internal/receiver/discoveryreceiver/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func setup() (*evaluator, component.ID, observer.EndpointID) {
func setup(t *testing.T) (*evaluator, component.ID, observer.EndpointID) {
// If debugging tests, replace the Nop Logger with a test instance to see
// all statements. Not in regular use to avoid spamming output.
// logger := zaptest.NewLogger(t)
logger := zap.NewNop()
logger := zaptest.NewLogger(t)
//logger := zap.NewNop()
alreadyLogged := &sync.Map{}
eval := &evaluator{
logger: logger,
Expand All @@ -46,14 +46,14 @@ func setup() (*evaluator, component.ID, observer.EndpointID) {
},
}

receiverID := component.NewIDWithName("type", "name")
receiverID := component.MustNewIDWithName("type", "name")
endpointID := observer.EndpointID("endpoint")
return eval, receiverID, endpointID
}

func TestEvaluateMatch(t *testing.T) {
eval, receiverID, endpointID := setup()
anotherReceiverID := component.NewIDWithName("type", "another.name")
eval, receiverID, endpointID := setup(t)
anotherReceiverID := component.MustNewIDWithName("type", "another.name")

for _, tc := range []struct {
typ string
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestEvaluateMatch(t *testing.T) {
}

func TestEvaluateInvalidMatch(t *testing.T) {
eval, receiverID, endpointID := setup()
eval, receiverID, endpointID := setup(t)

for _, tc := range []struct {
typ string
Expand All @@ -112,11 +112,11 @@ func TestEvaluateInvalidMatch(t *testing.T) {
func TestCorrelateResourceAttrs(t *testing.T) {
for _, embed := range []bool{false, true} {
t.Run(fmt.Sprintf("embed-%v", embed), func(t *testing.T) {
eval, _, endpointID := setup()
eval, _, endpointID := setup(t)
eval.config.EmbedReceiverConfig = embed

endpoint := observer.Endpoint{ID: endpointID}
observerID := component.NewIDWithName("type", "name")
observerID := component.MustNewIDWithName("type", "name")
eval.correlations.UpdateEndpoint(endpoint, addedState, observerID)

corr := eval.correlations.GetOrCreate(discovery.NoType, endpointID)
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestCorrelateResourceAttrs(t *testing.T) {
func TestCorrelateResourceAttrsWithExistingConfig(t *testing.T) {
for _, embed := range []bool{false, true} {
t.Run(fmt.Sprintf("embed-%v", embed), func(t *testing.T) {
eval, _, endpointID := setup()
eval, _, endpointID := setup(t)
eval.config.EmbedReceiverConfig = embed

endpoint := observer.Endpoint{ID: endpointID}
Expand Down
10 changes: 5 additions & 5 deletions internal/receiver/discoveryreceiver/metric_evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func TestMetricEvaluation(t *testing.T) {
for _, firstOnly := range []bool{true, false} {
match.FirstOnly = firstOnly
t.Run(fmt.Sprintf("FirstOnly:%v", firstOnly), func(t *testing.T) {
observerID := component.NewIDWithName("an.observer", "observer.name")
observerID := component.MustNewIDWithName("an_observer", "observer.name")
cfg := &Config{
Receivers: map[component.ID]ReceiverEntry{
component.NewIDWithName("a.receiver", "receiver.name"): {
component.MustNewIDWithName("a_receiver", "receiver.name"): {
Rule: "a.rule",
Status: &Status{Metrics: map[discovery.StatusType][]Match{status: {match}}},
},
Expand All @@ -96,7 +96,7 @@ func TestMetricEvaluation(t *testing.T) {
rm := md.ResourceMetrics().AppendEmpty()

rAttrs := rm.Resource().Attributes()
rAttrs.PutStr("discovery.receiver.type", "a.receiver")
rAttrs.PutStr("discovery.receiver.type", "a_receiver")
rAttrs.PutStr("discovery.receiver.name", "receiver.name")
rAttrs.PutStr("discovery.endpoint.id", "endpoint.id")

Expand All @@ -121,10 +121,10 @@ func TestMetricEvaluation(t *testing.T) {
require.Equal(t, map[string]any{
"discovery.endpoint.id": "endpoint.id",
"discovery.event.type": "metric.match",
"discovery.observer.id": "an.observer/observer.name",
"discovery.observer.id": "an_observer/observer.name",
"discovery.receiver.name": "receiver.name",
"discovery.receiver.rule": "a.rule",
"discovery.receiver.type": "a.receiver",
"discovery.receiver.type": "a_receiver",
}, rAttrs.AsRaw())

sLogs := rl.ScopeLogs()
Expand Down
20 changes: 10 additions & 10 deletions internal/receiver/discoveryreceiver/statement_evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func TestStatementEvaluation(t *testing.T) {
for _, firstOnly := range []bool{true, false} {
match.FirstOnly = firstOnly
t.Run(fmt.Sprintf("FirstOnly:%v", firstOnly), func(t *testing.T) {
observerID := component.NewIDWithName("an.observer", "observer.name")
observerID := component.MustNewIDWithName("an_observer", "observer.name")
cfg := &Config{
Receivers: map[component.ID]ReceiverEntry{
component.NewIDWithName("a.receiver", "receiver.name"): {
component.MustNewIDWithName("a_receiver", "receiver.name"): {
Rule: "a.rule",
Status: &Status{Statements: map[discovery.StatusType][]Match{status: {match}}},
},
Expand All @@ -83,11 +83,11 @@ func TestStatementEvaluation(t *testing.T) {
addedState, observerID,
)

se, err := newStatementEvaluator(logger, component.NewID("some.type"), cfg, plogs, cStore)
se, err := newStatementEvaluator(logger, component.MustNewID("some_type"), cfg, plogs, cStore)
require.NoError(t, err)

evaluatedLogger := se.evaluatedLogger.With(
zap.String("name", `a.receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`),
zap.String("name", `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`),
)

numExpected := 1
Expand Down Expand Up @@ -153,10 +153,10 @@ func TestStatementEvaluation(t *testing.T) {
require.Equal(t, map[string]any{
"discovery.endpoint.id": "endpoint.id",
"discovery.event.type": "statement.match",
"discovery.observer.id": "an.observer/observer.name",
"discovery.observer.id": "an_observer/observer.name",
"discovery.receiver.name": "receiver.name",
"discovery.receiver.rule": "a.rule",
"discovery.receiver.type": "a.receiver",
"discovery.receiver.type": "a_receiver",
}, rAttrs.AsRaw())

sLogs := rl.ScopeLogs()
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestStatementEvaluation(t *testing.T) {

require.Equal(t, map[string]any{
"discovery.status": string(status),
"name": `a.receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`,
"name": `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`,
"attr.one": "attr.one.value",
"attr.two": "attr.two.value",
"field.one": "field.one.value",
Expand Down Expand Up @@ -216,10 +216,10 @@ func TestStatementEvaluation(t *testing.T) {
}

func TestLogRecordDefaultAndArbitrarySeverityText(t *testing.T) {
observerID := component.NewIDWithName("an.observer", "observer.name")
observerID := component.MustNewIDWithName("an_observer", "observer.name")
cfg := &Config{
Receivers: map[component.ID]ReceiverEntry{
component.NewIDWithName("a.receiver", "receiver.name"): {
component.MustNewIDWithName("a_receiver", "receiver.name"): {
Rule: "a.rule",
Status: &Status{Statements: map[discovery.StatusType][]Match{discovery.Successful: {Match{Strict: "match.me"}}}},
},
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestLogRecordDefaultAndArbitrarySeverityText(t *testing.T) {
Time: time.Now(),
LoggerName: "logger.name",
Fields: map[string]any{
"name": `a.receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`,
"name": `a_receiver/receiver.name/receiver_creator/rc.name/{endpoint=""}/endpoint.id`,
},
}

Expand Down
4 changes: 4 additions & 0 deletions internal/receiver/discoveryreceiver/statussources/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,9 @@ func MetricsToReceiverIDs(md pmetric.Metrics) (component.ID, observer.EndpointID
endpointID = r.AsString()
}
}
_, err := component.NewType(receiverType)
if err != nil {
return discovery.NoType, observer.EndpointID("")
}
return component.MustNewIDWithName(receiverType, receiverName), observer.EndpointID(endpointID)
}
18 changes: 11 additions & 7 deletions internal/receiver/discoveryreceiver/statussources/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

func TestMetricsToReceiverIDs(t *testing.T) {
Expand All @@ -31,10 +33,10 @@ func TestMetricsToReceiverIDs(t *testing.T) {
name string
}{
{name: "happy path", rType: sPtr("a_type"), rName: sPtr("a.name"), eID: sPtr("an.endpoint")},
{name: "empty values", rType: sPtr(""), rName: sPtr(""), eID: sPtr("")},
{name: "missing values", rType: nil, rName: nil, eID: nil},
{name: "empty receiver type", rType: sPtr(""), rName: sPtr("a.name"), eID: sPtr("an.endpoint")},
{name: "missing receiver type", rType: nil, rName: sPtr("a.name"), eID: sPtr("an.endpoint")},
//{name: "empty values", rType: sPtr(""), rName: sPtr(""), eID: sPtr("")}, // TODO fill in sentinel here
//{name: "missing values", rType: nil, rName: nil, eID: nil},
{name: "empty receiver type", rType: sPtr(discovery.NoType.Type().String()), rName: sPtr("a.name"), eID: sPtr("an.endpoint")},
//{name: "missing receiver type", rType: nil, rName: sPtr("a.name"), eID: sPtr("an.endpoint")},
{name: "empty receiver name", rType: sPtr("a_type"), rName: sPtr(""), eID: sPtr("an.endpoint")},
{name: "missing receiver name", rType: sPtr("a_type"), rName: nil, eID: sPtr("an.endpoint")},
{name: "empty endpointID", rType: sPtr("a_type"), rName: sPtr("a.name"), eID: sPtr("")},
Expand All @@ -61,7 +63,9 @@ func TestMetricsToReceiverIDs(t *testing.T) {
if tc.rType != nil {
expectedRType = *tc.rType
}
require.Equal(t, component.MustNewType(expectedRType), receiverID.Type())
expectedRtype, err := component.NewType(expectedRType)
require.NoError(t, err)
require.Equal(t, expectedRtype, receiverID.Type())

var expectedRName string
if tc.rName != nil {
Expand All @@ -81,7 +85,7 @@ func TestMetricsToReceiverIDs(t *testing.T) {
func TestMetricsToReceiverIDsMissingRMetrics(t *testing.T) {
md := pmetric.NewMetrics()
receiverID, endpointID := MetricsToReceiverIDs(md)
require.Equal(t, component.Type(""), receiverID.Type())
require.Equal(t, discovery.NoType.Type(), receiverID.Type())
require.Equal(t, "", receiverID.Name())
require.Equal(t, observer.EndpointID(""), endpointID)
}
Expand All @@ -92,7 +96,7 @@ func TestMetricsToReceiverIDsMissingRAttributes(t *testing.T) {
rm.Resource().Attributes().Clear()

receiverID, endpointID := MetricsToReceiverIDs(md)
require.Equal(t, component.Type(""), receiverID.Type())
require.Equal(t, discovery.NoType.Type(), receiverID.Type())
require.Equal(t, "", receiverID.Name())
require.Equal(t, observer.EndpointID(""), endpointID)
}
Expand Down
Loading

0 comments on commit 1ad36d7

Please sign in to comment.