diff --git a/cmd/occollector/app/builder/builder.go b/cmd/occollector/app/builder/builder.go index 204673ba3a7..741ef5a7dd8 100644 --- a/cmd/occollector/app/builder/builder.go +++ b/cmd/occollector/app/builder/builder.go @@ -39,6 +39,7 @@ const ( zipkinScribeReceiverFlg = "receive-zipkin-scribe" loggingExporterFlg = "logging-exporter" useTailSamplingAlwaysSample = "tail-sampling-always-sample" + memBallastFlag = "mem-ballast-size-mib" ) // Flags adds flags related to basic building of the collector application to the given flagset. @@ -55,6 +56,9 @@ func Flags(flags *flag.FlagSet) { flags.Bool(loggingExporterFlg, false, "Flag to add a logging exporter (combine with log level DEBUG to log incoming spans)") flags.Bool(useTailSamplingAlwaysSample, false, "Flag to use a tail-based sampling processor with an always sample policy, "+ "unless tail sampling setting is present on configuration file.") + flags.Uint(memBallastFlag, 0, + fmt.Sprintf("Flag to specify size of memory (MiB) ballast to set. Ballast is not used when this is not specified. "+ + "default settings: 0")) } // GetConfigFile gets the config file from the config file flag. @@ -72,6 +76,11 @@ func DebugTailSamplingEnabled(v *viper.Viper) bool { return v.GetBool(useTailSamplingAlwaysSample) } +// MemBallastSize returns the size of memory ballast to use in MBs +func MemBallastSize(v *viper.Viper) int { + return v.GetInt(memBallastFlag) +} + // JaegerReceiverCfg holds configuration for Jaeger receivers. type JaegerReceiverCfg struct { // ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests diff --git a/cmd/occollector/app/collector/collector.go b/cmd/occollector/app/collector/collector.go index ce03bae31c8..556a174bde1 100644 --- a/cmd/occollector/app/collector/collector.go +++ b/cmd/occollector/app/collector/collector.go @@ -123,9 +123,9 @@ func (app *Application) setupZPages() { } } -func (app *Application) setupTelemetry() { +func (app *Application) setupTelemetry(ballastSizeBytes uint64) { app.logger.Info("Setting up own telemetry...") - err := AppTelemetry.init(app.asyncErrorChannel, app.v, app.logger) + err := AppTelemetry.init(app.asyncErrorChannel, ballastSizeBytes, app.v, app.logger) if err != nil { app.logger.Error("Failed to initialize telemetry", zap.Error(err)) os.Exit(1) @@ -173,23 +173,24 @@ func (app *Application) shutdownClosableComponents() { func (app *Application) execute() { app.logger.Info("Starting...", zap.Int("NumCPU", runtime.NumCPU())) + // Set memory ballast + ballast, ballastSizeBytes := app.createMemoryBallast() + app.asyncErrorChannel = make(chan error) // Setup everything. - app.setupPProf() app.setupHealthCheck() app.processor, app.closeFns = startProcessor(app.v, app.logger) app.setupZPages() app.receivers = createReceivers(app.v, app.logger, app.processor, app.asyncErrorChannel) - app.setupTelemetry() + app.setupTelemetry(ballastSizeBytes) // Everything is ready, now run until an event requiring shutdown happens. - app.runAndWaitForShutdownEvent() // Begin shutdown sequence. - + runtime.KeepAlive(ballast) app.healthCheck.Set(healthcheck.Unavailable) app.logger.Info("Starting shutdown...") @@ -284,22 +285,23 @@ func (app *Application) shutdownPipelines() { func (app *Application) executeUnified() { app.logger.Info("Starting...", zap.Int("NumCPU", runtime.NumCPU())) + // Set memory ballast + ballast, ballastSizeBytes := app.createMemoryBallast() + app.asyncErrorChannel = make(chan error) // Setup everything. - app.setupPProf() app.setupHealthCheck() app.setupZPages() - app.setupTelemetry() + app.setupTelemetry(ballastSizeBytes) app.setupPipelines() // Everything is ready, now run until an event requiring shutdown happens. - app.runAndWaitForShutdownEvent() // Begin shutdown sequence. - + runtime.KeepAlive(ballast) app.healthCheck.Set(healthcheck.Unavailable) app.logger.Info("Starting shutdown...") @@ -333,3 +335,14 @@ func (app *Application) StartUnified() error { return rootCmd.Execute() } + +func (app *Application) createMemoryBallast() ([]byte, uint64) { + ballastSizeMiB := builder.MemBallastSize(app.v) + if ballastSizeMiB > 0 { + ballastSizeBytes := uint64(ballastSizeMiB) * 1024 * 1024 + ballast := make([]byte, ballastSizeBytes) + app.logger.Info("Using memory ballast", zap.Int("MiBs", ballastSizeMiB)) + return ballast, ballastSizeBytes + } + return nil, 0 +} diff --git a/cmd/occollector/app/collector/collector_test.go b/cmd/occollector/app/collector/collector_test.go index 83c0aaff9d5..e7974e33166 100644 --- a/cmd/occollector/app/collector/collector_test.go +++ b/cmd/occollector/app/collector/collector_test.go @@ -18,8 +18,12 @@ package collector import ( "net" "net/http" + "os" + "runtime" "testing" + stats "github.com/guillermo/go.procstat" + "github.com/open-telemetry/opentelemetry-service/internal/testutils" "github.com/open-telemetry/opentelemetry-service/internal/zpagesserver" @@ -99,6 +103,72 @@ func TestApplication_StartUnified(t *testing.T) { <-appDone } +func testMemBallast(t *testing.T, app *Application, ballastSizeMiB int) { + maxRssBytes := mibToBytes(50) + minVirtualBytes := mibToBytes(ballastSizeMiB) + + portArg := []string{ + healthCheckHTTPPort, // Keep it as first since its address is used later. + zpagesserver.ZPagesHTTPPort, + "metrics-port", + "receivers.opencensus.port", + } + + addresses := getMultipleAvailableLocalAddresses(t, uint(len(portArg))) + for i, addr := range addresses { + _, port, err := net.SplitHostPort(addr) + if err != nil { + t.Fatalf("failed to split host and port from %q: %v", addr, err) + } + app.v.Set(portArg[i], port) + } + + // Without exporters the collector will start and just shutdown, no error is expected. + app.v.Set("logging-exporter", true) + app.v.Set("mem-ballast-size-mib", ballastSizeMiB) + + appDone := make(chan struct{}) + go func() { + defer close(appDone) + if err := app.Start(); err != nil { + t.Fatalf("app.Start() got %v, want nil", err) + } + }() + + <-app.readyChan + if !isAppAvailable(t, "http://"+addresses[0]) { + t.Fatalf("app didn't reach ready state") + } + stats := stats.Stat{Pid: os.Getpid()} + err := stats.Update() + if err != nil { + panic(err) + } + + if stats.Vsize < minVirtualBytes { + t.Errorf("unexpected virtual memory size. expected: >=%d, got: %d", minVirtualBytes, stats.Vsize) + } + + if stats.Rss > maxRssBytes { + t.Errorf("unexpected RSS size. expected: <%d, got: %d", maxRssBytes, stats.Rss) + } + + close(app.stopTestChan) + <-appDone +} + +// TestApplication_MemBallast starts a new instance of collector with different +// mem ballast sizes and ensures that ballast consumes virtual memory but does +// not count towards RSS mem +func TestApplication_MemBallast(t *testing.T) { + cases := []int{0, 500, 1000} + for i := 0; i < len(cases); i++ { + runtime.GC() + app := newApp() + testMemBallast(t, app, cases[i]) + } +} + // isAppAvailable checks if the healthcheck server at the given endpoint is // returning `available`. func isAppAvailable(t *testing.T, healthCheckEndPoint string) bool { @@ -118,3 +188,7 @@ func getMultipleAvailableLocalAddresses(t *testing.T, numAddresses uint) []strin } return addresses } + +func mibToBytes(mib int) uint64 { + return uint64(mib) * 1024 * 1024 +} diff --git a/cmd/occollector/app/collector/telemetry.go b/cmd/occollector/app/collector/telemetry.go index adcb020a401..9e8395b07cc 100644 --- a/cmd/occollector/app/collector/telemetry.go +++ b/cmd/occollector/app/collector/telemetry.go @@ -53,7 +53,7 @@ func telemetryFlags(flags *flag.FlagSet) { flags.Uint(metricsPortCfg, 8888, "Port exposing collector telemetry.") } -func (tel *appTelemetry) init(asyncErrorChannel chan<- error, v *viper.Viper, logger *zap.Logger) error { +func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, v *viper.Viper, logger *zap.Logger) error { level, err := telemetry.ParseLevel(v.GetString(metricsLevelCfg)) if err != nil { log.Fatalf("Failed to parse metrics level: %v", err) @@ -70,7 +70,7 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, v *viper.Viper, lo views = append(views, nodebatcher.MetricViews(level)...) views = append(views, observability.AllViews...) views = append(views, tailsampling.SamplingProcessorMetricViews(level)...) - processMetricsViews := telemetry.NewProcessMetricsViews() + processMetricsViews := telemetry.NewProcessMetricsViews(ballastSizeBytes) views = append(views, processMetricsViews.Views()...) tel.views = views if err := view.Register(views...); err != nil { diff --git a/go.mod b/go.mod index 5e92ca2a617..48db9291903 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/google/go-cmp v0.3.0 github.com/gorilla/mux v1.6.2 github.com/grpc-ecosystem/grpc-gateway v1.8.5 + github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jaegertracing/jaeger v1.9.0 github.com/omnition/scribe-go v0.0.0-20190131012523-9e3c68f31124 diff --git a/go.sum b/go.sum index 0734df1fc36..32a24527ec4 100644 --- a/go.sum +++ b/go.sum @@ -145,6 +145,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.6.3/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE= github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= +github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f h1:5qK7cub9F9wqib56+0HZlXgPn24GtmEVRoETcwQoOyA= +github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f/go.mod h1:ovoU5+mwafQ5XoEAuIEA9EMocbfVJ0vDacPD67dpL4k= github.com/hashicorp/consul v0.0.0-20180615161029-bed22a81e9fd h1:auIpcMc3+//R94n6tzTN+sJDiNvL3k5+Rus62AtvO4M= github.com/hashicorp/consul v0.0.0-20180615161029-bed22a81e9fd/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= diff --git a/internal/collector/telemetry/process_telemetry.go b/internal/collector/telemetry/process_telemetry.go index 3b33e0c0921..af46a4bfc8d 100644 --- a/internal/collector/telemetry/process_telemetry.go +++ b/internal/collector/telemetry/process_telemetry.go @@ -13,8 +13,9 @@ import ( // ProcessMetricsViews is a struct that contains views related to process metrics (cpu, mem, etc) type ProcessMetricsViews struct { - views []*view.View - done chan struct{} + ballastSizeBytes uint64 + views []*view.View + done chan struct{} } var mRuntimeAllocMem = stats.Int64("oc.io/process/memory_alloc", "Number of bytes currently allocated in use", "By") @@ -55,10 +56,11 @@ var viewCPUSeconds = &view.View{ // NewProcessMetricsViews creates a new set of ProcessMetrics (mem, cpu) that can be used to measure // basic information about this process. -func NewProcessMetricsViews() *ProcessMetricsViews { +func NewProcessMetricsViews(ballastSizeBytes uint64) *ProcessMetricsViews { return &ProcessMetricsViews{ - views: []*view.View{viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds}, - done: make(chan struct{}), + ballastSizeBytes: ballastSizeBytes, + views: []*view.View{viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds}, + done: make(chan struct{}), } } @@ -89,7 +91,7 @@ func (pmv *ProcessMetricsViews) StopCollection() { func (pmv *ProcessMetricsViews) updateViews() { ms := &runtime.MemStats{} - runtime.ReadMemStats(ms) + pmv.readMemStats(ms) stats.Record(context.Background(), mRuntimeAllocMem.M(int64(ms.Alloc))) stats.Record(context.Background(), mRuntimeTotalAllocMem.M(int64(ms.TotalAlloc))) stats.Record(context.Background(), mRuntimeSysMem.M(int64(ms.Sys))) @@ -102,3 +104,11 @@ func (pmv *ProcessMetricsViews) updateViews() { } } } + +func (pmv *ProcessMetricsViews) readMemStats(ms *runtime.MemStats) { + runtime.ReadMemStats(ms) + ms.Alloc -= pmv.ballastSizeBytes + ms.HeapAlloc -= pmv.ballastSizeBytes + ms.HeapSys -= pmv.ballastSizeBytes + ms.HeapInuse -= pmv.ballastSizeBytes +}