Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/civisibility/integrations/gotesting: fixes for orchestrion autoinstrumentation #2844

Merged
merged 10 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ddtrace/tracer/civisibility_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"
)

Expand Down Expand Up @@ -46,6 +47,7 @@ func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
//
// A pointer to a newly initialized civisibilitypayload instance.
func newCiVisibilityPayload() *ciVisibilityPayload {
log.Debug("ciVisibilityPayload: creating payload instance")
return &ciVisibilityPayload{newPayload()}
}

Expand All @@ -61,6 +63,7 @@ func newCiVisibilityPayload() *ciVisibilityPayload {
// A pointer to a bytes.Buffer containing the encoded CI Visibility payload.
// An error if reading from the buffer or encoding the payload fails.
func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {
log.Debug("ciVisibilityPayload: .getBuffer (count: %v)", p.itemCount())

/*
The Payload format in the CI Visibility protocol is like this:
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/tracer/civisibility_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func newCiVisibilityTransport(config *config) *ciVisibilityTransport {
testCycleURL = fmt.Sprintf("%s/%s/%s", config.agentURL.String(), EvpProxyPath, TestCyclePath)
}

log.Debug("ciVisibilityTransport: creating transport instance [agentless: %v, testcycleurl: %v]", agentlessEnabled, testCycleURL)

return &ciVisibilityTransport{
config: config,
testCycleURLPath: testCycleURL,
Expand Down Expand Up @@ -157,6 +159,7 @@ func (t *ciVisibilityTransport) send(p *payload) (body io.ReadCloser, err error)
req.Header.Set("Content-Encoding", "gzip")
}

log.Debug("ciVisibilityTransport: sending transport request: %v bytes", buffer.Len())
response, err := t.config.httpClient.Do(req)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions ddtrace/tracer/civisibility_tslv.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func getCiVisibilityEvent(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
tSpan.ModuleID = getAndRemoveMetaToUInt64(span, constants.TestModuleIDTag)
tSpan.SuiteID = getAndRemoveMetaToUInt64(span, constants.TestSuiteIDTag)
Expand All @@ -298,6 +299,7 @@ func createTestEventFromSpan(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestSuiteEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
tSpan.ModuleID = getAndRemoveMetaToUInt64(span, constants.TestModuleIDTag)
tSpan.SuiteID = getAndRemoveMetaToUInt64(span, constants.TestSuiteIDTag)
Expand All @@ -320,6 +322,7 @@ func createTestSuiteEventFromSpan(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestModuleEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
tSpan.ModuleID = getAndRemoveMetaToUInt64(span, constants.TestModuleIDTag)
return &ciVisibilityEvent{
Expand All @@ -341,6 +344,7 @@ func createTestModuleEventFromSpan(span *span) *ciVisibilityEvent {
// A pointer to the created ciVisibilityEvent.
func createTestSessionEventFromSpan(span *span) *ciVisibilityEvent {
tSpan := createTslvSpan(span)
tSpan.ParentID = 0
tSpan.SessionID = getAndRemoveMetaToUInt64(span, constants.TestSessionIDTag)
return &ciVisibilityEvent{
span: span,
Expand Down
11 changes: 6 additions & 5 deletions ddtrace/tracer/civisibility_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ciVisibilityTraceWriter struct {
//
// A pointer to an initialized ciVisibilityTraceWriter.
func newCiVisibilityTraceWriter(c *config) *ciVisibilityTraceWriter {
log.Debug("ciVisibilityTraceWriter: creating trace writer instance")
return &ciVisibilityTraceWriter{
config: c,
payload: newCiVisibilityPayload(),
Expand All @@ -62,7 +63,7 @@ func (w *ciVisibilityTraceWriter) add(trace []*span) {
for _, s := range trace {
cvEvent := getCiVisibilityEvent(s)
if err := w.payload.push(cvEvent); err != nil {
log.Error("Error encoding msgpack: %v", err)
log.Error("ciVisibilityTraceWriter: Error encoding msgpack: %v", err)
}
if w.payload.size() > agentlessPayloadSizeLimit {
w.flush()
Expand Down Expand Up @@ -104,16 +105,16 @@ func (w *ciVisibilityTraceWriter) flush() {
var err error
for attempt := 0; attempt <= w.config.sendRetries; attempt++ {
size, count = p.size(), p.itemCount()
log.Debug("Sending payload: size: %d events: %d\n", size, count)
log.Debug("ciVisibilityTraceWriter: sending payload: size: %d events: %d\n", size, count)
_, err = w.config.transport.send(p.payload)
if err == nil {
log.Debug("sent events after %d attempts", attempt+1)
log.Debug("ciVisibilityTraceWriter: sent events after %d attempts", attempt+1)
return
}
log.Error("failure sending events (attempt %d), will retry: %v", attempt+1, err)
log.Error("ciVisibilityTraceWriter: failure sending events (attempt %d), will retry: %v", attempt+1, err)
p.reset()
time.Sleep(time.Millisecond)
}
log.Error("lost %d events: %v", count, err)
log.Error("ciVisibilityTraceWriter: lost %d events: %v", count, err)
}(oldp)
}
185 changes: 127 additions & 58 deletions internal/civisibility/integrations/gotesting/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/civisibility/integrations"
Expand All @@ -23,6 +25,66 @@ import (
// The following functions are being used by the gotesting package for manual instrumentation and the orchestrion
// automatic instrumentation

type (
instrumentationMetadata struct {
IsInternal bool
}

ddTestItem struct {
test integrations.DdTest
error atomic.Int32
skipped atomic.Int32
}
)

var (
// instrumentationMap holds a map of *runtime.Func for tracking instrumented functions
instrumentationMap = map[*runtime.Func]*instrumentationMetadata{}

// instrumentationMapMutex is a read-write mutex for synchronizing access to instrumentationMap.
instrumentationMapMutex sync.RWMutex

// ciVisibilityTests holds a map of *testing.T or *testing.B to civisibility.DdTest for tracking tests.
ciVisibilityTests = map[unsafe.Pointer]*ddTestItem{}

// ciVisibilityTestsMutex is a read-write mutex for synchronizing access to ciVisibilityTests.
ciVisibilityTestsMutex sync.RWMutex
)

// getInstrumentationMetadata gets the stored instrumentation metadata for a given *runtime.Func.
func getInstrumentationMetadata(fn *runtime.Func) *instrumentationMetadata {
instrumentationMapMutex.RLock()
defer instrumentationMapMutex.RUnlock()
if v, ok := instrumentationMap[fn]; ok {
return v
}
return nil
}

// setInstrumentationMetadata stores an instrumentation metadata for a given *runtime.Func.
func setInstrumentationMetadata(fn *runtime.Func, metadata *instrumentationMetadata) {
instrumentationMapMutex.RLock()
defer instrumentationMapMutex.RUnlock()
instrumentationMap[fn] = metadata
}

// getCiVisibilityTest retrieves the CI visibility test associated with a given *testing.T, *testing.B, *testing.common
func getCiVisibilityTest(tb testing.TB) *ddTestItem {
ciVisibilityTestsMutex.RLock()
defer ciVisibilityTestsMutex.RUnlock()
if v, ok := ciVisibilityTests[reflect.ValueOf(tb).UnsafePointer()]; ok {
return v
}
return nil
}

// setCiVisibilityTest associates a CI visibility test with a given *testing.T, *testing.B, *testing.common
func setCiVisibilityTest(tb testing.TB, ciTest integrations.DdTest) {
ciVisibilityTestsMutex.Lock()
defer ciVisibilityTestsMutex.Unlock()
ciVisibilityTests[reflect.ValueOf(tb).UnsafePointer()] = &ddTestItem{test: ciTest}
}

// instrumentTestingM helper function to instrument internalTests and internalBenchmarks in a `*testing.M` instance.
func instrumentTestingM(m *testing.M) func(exitCode int) {
// Initialize CI Visibility
Expand Down Expand Up @@ -61,13 +123,30 @@ func instrumentTestingTFunc(f func(*testing.T)) func(*testing.T) {
moduleName, suiteName := utils.GetModuleAndSuiteName(fReflect.Pointer())
originalFunc := runtime.FuncForPC(fReflect.Pointer())

// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)
// Avoid instrumenting twice
metadata := getInstrumentationMetadata(originalFunc)
if metadata != nil && metadata.IsInternal {
// If is an internal test, we don't instrument because f is already the instrumented func by executeInternalTest
return f
}

instrumentedFn := func(t *testing.T) {
// Initialize module counters if not already present.
if _, ok := modulesCounters[moduleName]; !ok {
var v int32
modulesCounters[moduleName] = &v
}
// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)

// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)
// Initialize suite counters if not already present.
if _, ok := suitesCounters[suiteName]; !ok {
var v int32
suitesCounters[suiteName] = &v
}
// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)

return func(t *testing.T) {
// Create or retrieve the module, suite, and test for CI visibility.
module := session.GetOrCreateModuleWithFramework(moduleName, testFramework, runtime.Version())
suite := module.GetOrCreateSuite(suiteName)
Expand Down Expand Up @@ -101,51 +180,47 @@ func instrumentTestingTFunc(f func(*testing.T)) func(*testing.T) {
// Execute the original test function.
f(t)
}
setInstrumentationMetadata(runtime.FuncForPC(reflect.Indirect(reflect.ValueOf(instrumentedFn)).Pointer()), &instrumentationMetadata{IsInternal: true})
return instrumentedFn
}

// instrumentTestingTSetErrorInfo helper function to set an error in the `testing.T` CI Visibility span
func instrumentTestingTSetErrorInfo(t *testing.T, errType string, errMessage string, skip int) {
ciTest := getCiVisibilityTest(t)
if ciTest != nil {
ciTest.SetErrorInfo(errType, errMessage, utils.GetStacktrace(2+skip))
// instrumentSetErrorInfo helper function to set an error in the `*testing.T, *testing.B, *testing.common` CI Visibility span
func instrumentSetErrorInfo(tb testing.TB, errType string, errMessage string, skip int) {
ciTestItem := getCiVisibilityTest(tb)
if ciTestItem != nil && ciTestItem.error.CompareAndSwap(0, 1) && ciTestItem.test != nil {
ciTestItem.test.SetErrorInfo(errType, errMessage, utils.GetStacktrace(2+skip))
}
}

// instrumentTestingTCloseAndSkip helper function to close and skip with a reason a `testing.T` CI Visibility span
func instrumentTestingTCloseAndSkip(t *testing.T, skipReason string) {
ciTest := getCiVisibilityTest(t)
if ciTest != nil {
ciTest.CloseWithFinishTimeAndSkipReason(integrations.ResultStatusSkip, time.Now(), skipReason)
// instrumentCloseAndSkip helper function to close and skip with a reason a `*testing.T, *testing.B, *testing.common` CI Visibility span
func instrumentCloseAndSkip(tb testing.TB, skipReason string) {
ciTestItem := getCiVisibilityTest(tb)
if ciTestItem != nil && ciTestItem.skipped.CompareAndSwap(0, 1) && ciTestItem.test != nil {
ciTestItem.test.CloseWithFinishTimeAndSkipReason(integrations.ResultStatusSkip, time.Now(), skipReason)
}
}

// instrumentTestingTSkipNow helper function to close and skip a `testing.T` CI Visibility span
func instrumentTestingTSkipNow(t *testing.T) {
ciTest := getCiVisibilityTest(t)
if ciTest != nil {
ciTest.Close(integrations.ResultStatusSkip)
// instrumentSkipNow helper function to close and skip a `*testing.T, *testing.B, *testing.common` CI Visibility span
func instrumentSkipNow(tb testing.TB) {
ciTestItem := getCiVisibilityTest(tb)
if ciTestItem != nil && ciTestItem.skipped.CompareAndSwap(0, 1) && ciTestItem.test != nil {
ciTestItem.test.Close(integrations.ResultStatusSkip)
}
}

// instrumentTestingBFunc helper function to instrument a benchmark function func(*testing.B)
func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (string, func(*testing.B)) {
// Avoid instrumenting twice
if hasCiVisibilityBenchmarkFunc(&f) {
return name, f
}

// Reflect the function to obtain its pointer.
fReflect := reflect.Indirect(reflect.ValueOf(f))
moduleName, suiteName := utils.GetModuleAndSuiteName(fReflect.Pointer())
originalFunc := runtime.FuncForPC(fReflect.Pointer())

// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)

// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)
// Avoid instrumenting twice
if hasCiVisibilityBenchmarkFunc(originalFunc) {
return name, f
}

return subBenchmarkAutoName, func(b *testing.B) {
instrumentedFunc := func(b *testing.B) {
// The sub-benchmark implementation relies on creating a dummy sub benchmark (called [DD:TestVisibility]) with
// a Run over the original sub benchmark function to get the child results without interfering measurements
// By doing this the name of the sub-benchmark are changed
Expand All @@ -155,6 +230,22 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str
// benchmark/[DD:TestVisibility]/child
// We use regex and decrement the depth level of the benchmark to restore the original name

// Initialize module counters if not already present.
if _, ok := modulesCounters[moduleName]; !ok {
var v int32
modulesCounters[moduleName] = &v
}
// Increment the test count in the module.
atomic.AddInt32(modulesCounters[moduleName], 1)

// Initialize suite counters if not already present.
if _, ok := suitesCounters[suiteName]; !ok {
var v int32
suitesCounters[suiteName] = &v
}
// Increment the test count in the suite.
atomic.AddInt32(suitesCounters[suiteName], 1)

// Decrement level.
bpf := getBenchmarkPrivateFields(b)
bpf.AddLevel(-1)
Expand Down Expand Up @@ -190,7 +281,7 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str
// Replace this function with the original one (executed only once - the first iteration[b.run1]).
*iPfOfB.benchFunc = f
// Set b to the CI visibility test.
setCiVisibilityBenchmark(b, test)
setCiVisibilityTest(b, test)

// Enable the timer again.
b.ResetTimer()
Expand All @@ -200,8 +291,7 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str
f(b)
}

setCiVisibilityBenchmarkFunc(&instrumentedFunc)
defer deleteCiVisibilityBenchmarkFunc(&instrumentedFunc)
setCiVisibilityBenchmarkFunc(runtime.FuncForPC(reflect.Indirect(reflect.ValueOf(instrumentedFunc)).Pointer()))
b.Run(name, instrumentedFunc)

endTime := time.Now()
Expand Down Expand Up @@ -258,28 +348,7 @@ func instrumentTestingBFunc(pb *testing.B, name string, f func(*testing.B)) (str

checkModuleAndSuite(module, suite)
}
}

// instrumentTestingBSetErrorInfo helper function to set an error in the `testing.B` CI Visibility span
func instrumentTestingBSetErrorInfo(b *testing.B, errType string, errMessage string, skip int) {
ciTest := getCiVisibilityBenchmark(b)
if ciTest != nil {
ciTest.SetErrorInfo(errType, errMessage, utils.GetStacktrace(2+skip))
}
}

// instrumentTestingBCloseAndSkip helper function to close and skip with a reason a `testing.B` CI Visibility span
func instrumentTestingBCloseAndSkip(b *testing.B, skipReason string) {
ciTest := getCiVisibilityBenchmark(b)
if ciTest != nil {
ciTest.CloseWithFinishTimeAndSkipReason(integrations.ResultStatusSkip, time.Now(), skipReason)
}
}

// instrumentTestingBSkipNow helper function to close and skip a `testing.B` CI Visibility span
func instrumentTestingBSkipNow(b *testing.B) {
ciTest := getCiVisibilityBenchmark(b)
if ciTest != nil {
ciTest.Close(integrations.ResultStatusSkip)
}
setCiVisibilityBenchmarkFunc(originalFunc)
setCiVisibilityBenchmarkFunc(runtime.FuncForPC(reflect.Indirect(reflect.ValueOf(instrumentedFunc)).Pointer()))
return subBenchmarkAutoName, instrumentedFunc
}
Loading
Loading