Skip to content

Commit

Permalink
Testbed improvements (#1161)
Browse files Browse the repository at this point in the history
* Use atomic wrappers for readability and for preventing accidentally accessing atomic variables
  non-atomically.
* Enhance readability of large numbers in testbed by formatting with thousands separator.
* Include rate per second in testbed output.
  • Loading branch information
jrcamp authored Jun 22, 2020
1 parent 4bf6afb commit a8db627
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 73 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ require (
github.com/tcnksm/ghr v0.13.0
github.com/uber/jaeger-lib v2.2.0+incompatible
go.opencensus.io v0.22.3
go.uber.org/atomic v1.5.1
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e
golang.org/x/sys v0.0.0-20200408040146-ea54a3c99b9b
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32 // indirect
google.golang.org/api v0.10.0 // indirect
google.golang.org/genproto v0.0.0-20200408120641-fbb3ad325eb7
Expand Down
24 changes: 12 additions & 12 deletions testbed/testbed/child_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"path"
"path/filepath"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/process"
"go.uber.org/atomic"
)

// ResourceSpec is a resource consumption specification.
Expand Down Expand Up @@ -95,10 +95,10 @@ type ChildProcess struct {
lastProcessTimes *cpu.TimesStat

// Current RAM RSS in MiBs
ramMiBCur uint32
ramMiBCur atomic.Uint32

// Current CPU percentage times 1000 (we use scaling since we have to use int for atomic operations).
cpuPercentX1000Cur uint32
cpuPercentX1000Cur atomic.Uint32

// Maximum CPU seen
cpuPercentMax float64
Expand Down Expand Up @@ -287,8 +287,8 @@ func (cp *ChildProcess) Stop() (stopped bool, err error) {
close(finished)

// Set resource consumption stats to 0
atomic.StoreUint32(&cp.ramMiBCur, 0)
atomic.StoreUint32(&cp.cpuPercentX1000Cur, 0)
cp.ramMiBCur.Store(0)
cp.cpuPercentX1000Cur.Store(0)

log.Printf("%s process stopped, exit code=%d", cp.name, cp.cmd.ProcessState.ExitCode())

Expand Down Expand Up @@ -369,7 +369,7 @@ func (cp *ChildProcess) fetchRAMUsage() {
}

// Store current usage.
atomic.StoreUint32(&cp.ramMiBCur, ramMiBCur)
cp.ramMiBCur.Store(ramMiBCur)
}

func (cp *ChildProcess) fetchCPUUsage() {
Expand Down Expand Up @@ -398,19 +398,19 @@ func (cp *ChildProcess) fetchCPUUsage() {
curCPUPercentageX1000 := uint32(cpuPercent * 1000)

// Store current usage.
atomic.StoreUint32(&cp.cpuPercentX1000Cur, curCPUPercentageX1000)
cp.cpuPercentX1000Cur.Store(curCPUPercentageX1000)
}

func (cp *ChildProcess) checkAllowedResourceUsage() error {
// Check if current CPU usage exceeds expected.
var errMsg string
if cp.resourceSpec.ExpectedMaxCPU != 0 && cp.cpuPercentX1000Cur/1000 > cp.resourceSpec.ExpectedMaxCPU {
if cp.resourceSpec.ExpectedMaxCPU != 0 && cp.cpuPercentX1000Cur.Load()/1000 > cp.resourceSpec.ExpectedMaxCPU {
errMsg = fmt.Sprintf("CPU consumption is %.1f%%, max expected is %d%%",
float64(cp.cpuPercentX1000Cur)/1000.0, cp.resourceSpec.ExpectedMaxCPU)
float64(cp.cpuPercentX1000Cur.Load())/1000.0, cp.resourceSpec.ExpectedMaxCPU)
}

// Check if current RAM usage exceeds expected.
if cp.resourceSpec.ExpectedMaxRAM != 0 && cp.ramMiBCur > cp.resourceSpec.ExpectedMaxRAM {
if cp.resourceSpec.ExpectedMaxRAM != 0 && cp.ramMiBCur.Load() > cp.resourceSpec.ExpectedMaxRAM {
errMsg = fmt.Sprintf("RAM consumption is %d MiB, max expected is %d MiB",
cp.ramMiBCur, cp.resourceSpec.ExpectedMaxRAM)
}
Expand All @@ -431,8 +431,8 @@ func (cp *ChildProcess) GetResourceConsumption() string {
return ""
}

curRSSMib := atomic.LoadUint32(&cp.ramMiBCur)
curCPUPercentageX1000 := atomic.LoadUint32(&cp.cpuPercentX1000Cur)
curRSSMib := cp.ramMiBCur.Load()
curCPUPercentageX1000 := cp.cpuPercentX1000Cur.Load()

return fmt.Sprintf("%s RAM (RES):%4d MiB, CPU:%4.1f%%", cp.name,
curRSSMib, float64(curCPUPercentageX1000)/1000.0)
Expand Down
70 changes: 35 additions & 35 deletions testbed/testbed/data_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"log"
"math/rand"
"strconv"
"sync/atomic"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/golang/protobuf/ptypes/timestamp"
"go.uber.org/atomic"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
Expand All @@ -37,53 +37,53 @@ import (
"go.opentelemetry.io/collector/translator/internaldata"
)

//DataProvider defines the interface for generators of test data used to drive various end-to-end tests.
// DataProvider defines the interface for generators of test data used to drive various end-to-end tests.
type DataProvider interface {
//SetLoadGeneratorCounters supplies pointers to LoadGenerator counters.
//The data provider implementation should increment these as it generates data.
SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64)
//GenerateTraces returns an internal Traces instance with an OTLP ResourceSpans slice populated with test data.
// SetLoadGeneratorCounters supplies pointers to LoadGenerator counters.
// The data provider implementation should increment these as it generates data.
SetLoadGeneratorCounters(batchesGenerated *atomic.Uint64, dataItemsGenerated *atomic.Uint64)
// GenerateTraces returns an internal Traces instance with an OTLP ResourceSpans slice populated with test data.
GenerateTraces() (pdata.Traces, bool)
//GenerateTracesOld returns a slice of OpenCensus Span instances populated with test data.
// GenerateTracesOld returns a slice of OpenCensus Span instances populated with test data.
GenerateTracesOld() ([]*tracepb.Span, bool)
//GenerateMetrics returns an internal MetricData instance with an OTLP ResourceMetrics slice of test data.
// GenerateMetrics returns an internal MetricData instance with an OTLP ResourceMetrics slice of test data.
GenerateMetrics() (data.MetricData, bool)
//GenerateMetricsOld returns a slice of OpenCensus Metric instances populated with test data.
// GenerateMetricsOld returns a slice of OpenCensus Metric instances populated with test data.
GenerateMetricsOld() ([]*metricspb.Metric, bool)
//GetGeneratedSpan returns the generated Span matching the provided traceId and spanId or else nil if no match found.
// GetGeneratedSpan returns the generated Span matching the provided traceId and spanId or else nil if no match found.
GetGeneratedSpan(traceID []byte, spanID []byte) *otlptrace.Span
}

//PerfTestDataProvider in an implementation of the DataProvider for use in performance tests.
//Tracing IDs are based on the incremented batch and data items counters.
// PerfTestDataProvider in an implementation of the DataProvider for use in performance tests.
// Tracing IDs are based on the incremented batch and data items counters.
type PerfTestDataProvider struct {
options LoadOptions
batchesGenerated *uint64
dataItemsGenerated *uint64
batchesGenerated *atomic.Uint64
dataItemsGenerated *atomic.Uint64
}

//NewPerfTestDataProvider creates an instance of PerfTestDataProvider which generates test data based on the sizes
//specified in the supplied LoadOptions.
// NewPerfTestDataProvider creates an instance of PerfTestDataProvider which generates test data based on the sizes
// specified in the supplied LoadOptions.
func NewPerfTestDataProvider(options LoadOptions) *PerfTestDataProvider {
return &PerfTestDataProvider{
options: options,
}
}

func (dp *PerfTestDataProvider) SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64) {
func (dp *PerfTestDataProvider) SetLoadGeneratorCounters(batchesGenerated *atomic.Uint64, dataItemsGenerated *atomic.Uint64) {
dp.batchesGenerated = batchesGenerated
dp.dataItemsGenerated = dataItemsGenerated
}

func (dp *PerfTestDataProvider) GenerateTracesOld() ([]*tracepb.Span, bool) {

var spans []*tracepb.Span
traceID := atomic.AddUint64(dp.batchesGenerated, 1)
traceID := dp.batchesGenerated.Inc()
for i := 0; i < dp.options.ItemsPerBatch; i++ {

startTime := time.Now()

spanID := atomic.AddUint64(dp.dataItemsGenerated, 1)
spanID := dp.dataItemsGenerated.Inc()

// Create a span.
span := &tracepb.Span{
Expand Down Expand Up @@ -126,13 +126,13 @@ func (dp *PerfTestDataProvider) GenerateTraces() (pdata.Traces, bool) {
spans := ilss.At(0).Spans()
spans.Resize(dp.options.ItemsPerBatch)

traceID := atomic.AddUint64(dp.batchesGenerated, 1)
traceID := dp.batchesGenerated.Inc()
for i := 0; i < dp.options.ItemsPerBatch; i++ {

startTime := time.Now()
endTime := startTime.Add(time.Duration(time.Millisecond))

spanID := atomic.AddUint64(dp.dataItemsGenerated, 1)
spanID := dp.dataItemsGenerated.Inc()

span := spans.At(i)

Expand Down Expand Up @@ -192,7 +192,7 @@ func (dp *PerfTestDataProvider) GenerateMetricsOld() ([]*metricspb.Metric, bool)
Resource: resource,
}

batchIndex := atomic.AddUint64(dp.batchesGenerated, 1)
batchIndex := dp.batchesGenerated.Inc()

// Generate data points for the metric. We generate timeseries each containing
// a single data points. This is the most typical payload composition since
Expand All @@ -201,7 +201,7 @@ func (dp *PerfTestDataProvider) GenerateMetricsOld() ([]*metricspb.Metric, bool)
timeseries := &metricspb.TimeSeries{}

startTime := time.Now()
value := atomic.AddUint64(dp.dataItemsGenerated, 1)
value := dp.dataItemsGenerated.Inc()

// Create a data point.
point := &metricspb.Point{
Expand Down Expand Up @@ -248,14 +248,14 @@ func (dp *PerfTestDataProvider) GenerateMetrics() (data.MetricData, bool) {
metricDescriptor.SetDescription("Load Generator Counter #" + strconv.Itoa(i))
metricDescriptor.SetType(pdata.MetricTypeInt64)

batchIndex := atomic.AddUint64(dp.batchesGenerated, 1)
batchIndex := dp.batchesGenerated.Inc()

// Generate data points for the metric.
metric.Int64DataPoints().Resize(dataPointsPerMetric)
for j := 0; j < dataPointsPerMetric; j++ {
dataPoint := metric.Int64DataPoints().At(j)
dataPoint.SetStartTime(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
value := atomic.AddUint64(dp.dataItemsGenerated, 1)
value := dp.dataItemsGenerated.Inc()
dataPoint.SetValue(int64(value))
dataPoint.LabelsMap().InitFromMap(map[string]string{
"item_index": "item_" + strconv.Itoa(j),
Expand Down Expand Up @@ -283,22 +283,22 @@ func timeToTimestamp(t time.Time) *timestamp.Timestamp {
}
}

//GoldenDataProvider is an implementation of DataProvider for use in correctness tests.
//Provided data from the "Golden" dataset generated using pairwise combinatorial testing techniques.
// GoldenDataProvider is an implementation of DataProvider for use in correctness tests.
// Provided data from the "Golden" dataset generated using pairwise combinatorial testing techniques.
type GoldenDataProvider struct {
tracePairsFile string
spanPairsFile string
random io.Reader
batchesGenerated *uint64
dataItemsGenerated *uint64
batchesGenerated *atomic.Uint64
dataItemsGenerated *atomic.Uint64
resourceSpans []*otlptrace.ResourceSpans
spansIndex int
spansMap map[string]*otlptrace.Span
}

//NewGoldenDataProvider creates a new instance of GoldenDataProvider which generates test data based
//on the pairwise combinations specified in the tracePairsFile and spanPairsFile input variables.
//The supplied randomSeed is used to initialize the random number generator used in generating tracing IDs.
// NewGoldenDataProvider creates a new instance of GoldenDataProvider which generates test data based
// on the pairwise combinations specified in the tracePairsFile and spanPairsFile input variables.
// The supplied randomSeed is used to initialize the random number generator used in generating tracing IDs.
func NewGoldenDataProvider(tracePairsFile string, spanPairsFile string, randomSeed int64) *GoldenDataProvider {
return &GoldenDataProvider{
tracePairsFile: tracePairsFile,
Expand All @@ -307,7 +307,7 @@ func NewGoldenDataProvider(tracePairsFile string, spanPairsFile string, randomSe
}
}

func (dp *GoldenDataProvider) SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64) {
func (dp *GoldenDataProvider) SetLoadGeneratorCounters(batchesGenerated *atomic.Uint64, dataItemsGenerated *atomic.Uint64) {
dp.batchesGenerated = batchesGenerated
dp.dataItemsGenerated = dataItemsGenerated
}
Expand All @@ -321,7 +321,7 @@ func (dp *GoldenDataProvider) GenerateTraces() (pdata.Traces, bool) {
dp.resourceSpans = make([]*otlptrace.ResourceSpans, 0)
}
}
atomic.AddUint64(dp.batchesGenerated, 1)
dp.batchesGenerated.Inc()
if dp.spansIndex >= len(dp.resourceSpans) {
return pdata.TracesFromOtlp(make([]*otlptrace.ResourceSpans, 0)), true
}
Expand All @@ -332,7 +332,7 @@ func (dp *GoldenDataProvider) GenerateTraces() (pdata.Traces, bool) {
for _, libSpans := range resourceSpans[0].InstrumentationLibrarySpans {
spanCount += uint64(len(libSpans.Spans))
}
atomic.AddUint64(dp.dataItemsGenerated, spanCount)
dp.dataItemsGenerated.Add(spanCount)
return pdata.TracesFromOtlp(resourceSpans), false
}

Expand Down
15 changes: 9 additions & 6 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.uber.org/atomic"
"golang.org/x/text/message"

"go.opentelemetry.io/collector/consumer/consumerdata"
)

var printer = message.NewPrinter(message.MatchLanguage("en"))

// LoadGenerator is a simple load generator.
type LoadGenerator struct {
sender DataSender

dataProvider DataProvider

// Number of batches of data items sent.
batchesSent uint64
batchesSent atomic.Uint64

// Number of data items (spans or metric data points) sent.
dataItemsSent uint64
dataItemsSent atomic.Uint64

stopOnce sync.Once
stopWait sync.WaitGroup
Expand Down Expand Up @@ -111,11 +114,11 @@ func (lg *LoadGenerator) Stop() {

// GetStats returns the stats as a printable string.
func (lg *LoadGenerator) GetStats() string {
return fmt.Sprintf("Sent:%5d items", atomic.LoadUint64(&lg.dataItemsSent))
return printer.Sprintf("Sent:%10d items", lg.DataItemsSent())
}

func (lg *LoadGenerator) DataItemsSent() uint64 {
return atomic.LoadUint64(&lg.dataItemsSent)
return lg.dataItemsSent.Load()
}

// IncDataItemsSent is used when a test bypasses the LoadGenerator and sends data
Expand All @@ -125,7 +128,7 @@ func (lg *LoadGenerator) DataItemsSent() uint64 {
// reports to use their own counter and load generator and other sending sources
// to contribute to this counter. This could be done as a future improvement.
func (lg *LoadGenerator) IncDataItemsSent() {
atomic.AddUint64(&lg.dataItemsSent, 1)
lg.dataItemsSent.Inc()
}

func (lg *LoadGenerator) generate() {
Expand Down
Loading

0 comments on commit a8db627

Please sign in to comment.