Skip to content

Commit

Permalink
Introduce optional memory ballast (#45)
Browse files Browse the repository at this point in the history
This change add an optional memory ballast to help with ease GC pressure
in high thoughput scenarios

- Added new CLI flag called `--mem-ballast-size-mib` which represents
the size of memory ballast to use in MiB. Omitting the value or setting
it to zero does not set the ballast.

- Updated process telemetry code to account for the ballast and report
memory usage statistics after subtracting the ballast size. I'm not sure
if this makes sense in every scenario. If not, we can add another flag
to disable the behaviour. Another option is to add one more metric that
just exports the memory ballast size as a static number and then let
metric consumers to adjust the numbers.

https://blog.twitch.tv/go-memory-ballast-how-i-learnt-to-stop-worrying-and-love-the-heap-26c2462549a2

golang/go#23044
  • Loading branch information
owais authored and Paulo Janotti committed Jun 25, 2019
1 parent 4bd705a commit 6566e78
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 18 deletions.
9 changes: 9 additions & 0 deletions cmd/occollector/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
zipkinScribeReceiverFlg = "receive-zipkin-scribe"
loggingExporterFlg = "logging-exporter"
useTailSamplingAlwaysSample = "tail-sampling-always-sample"
memBallastFlag = "mem-ballast-size-mib"
)

// Flags adds flags related to basic building of the collector application to the given flagset.
Expand All @@ -55,6 +56,9 @@ func Flags(flags *flag.FlagSet) {
flags.Bool(loggingExporterFlg, false, "Flag to add a logging exporter (combine with log level DEBUG to log incoming spans)")
flags.Bool(useTailSamplingAlwaysSample, false, "Flag to use a tail-based sampling processor with an always sample policy, "+
"unless tail sampling setting is present on configuration file.")
flags.Uint(memBallastFlag, 0,
fmt.Sprintf("Flag to specify size of memory (MiB) ballast to set. Ballast is not used when this is not specified. "+
"default settings: 0"))
}

// GetConfigFile gets the config file from the config file flag.
Expand All @@ -72,6 +76,11 @@ func DebugTailSamplingEnabled(v *viper.Viper) bool {
return v.GetBool(useTailSamplingAlwaysSample)
}

// MemBallastSize returns the size of memory ballast to use in MBs
func MemBallastSize(v *viper.Viper) int {
return v.GetInt(memBallastFlag)
}

// JaegerReceiverCfg holds configuration for Jaeger receivers.
type JaegerReceiverCfg struct {
// ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests
Expand Down
33 changes: 23 additions & 10 deletions cmd/occollector/app/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func (app *Application) setupZPages() {
}
}

func (app *Application) setupTelemetry() {
func (app *Application) setupTelemetry(ballastSizeBytes uint64) {
app.logger.Info("Setting up own telemetry...")
err := AppTelemetry.init(app.asyncErrorChannel, app.v, app.logger)
err := AppTelemetry.init(app.asyncErrorChannel, ballastSizeBytes, app.v, app.logger)
if err != nil {
app.logger.Error("Failed to initialize telemetry", zap.Error(err))
os.Exit(1)
Expand Down Expand Up @@ -173,23 +173,24 @@ func (app *Application) shutdownClosableComponents() {
func (app *Application) execute() {
app.logger.Info("Starting...", zap.Int("NumCPU", runtime.NumCPU()))

// Set memory ballast
ballast, ballastSizeBytes := app.createMemoryBallast()

app.asyncErrorChannel = make(chan error)

// Setup everything.

app.setupPProf()
app.setupHealthCheck()
app.processor, app.closeFns = startProcessor(app.v, app.logger)
app.setupZPages()
app.receivers = createReceivers(app.v, app.logger, app.processor, app.asyncErrorChannel)
app.setupTelemetry()
app.setupTelemetry(ballastSizeBytes)

// Everything is ready, now run until an event requiring shutdown happens.

app.runAndWaitForShutdownEvent()

// Begin shutdown sequence.

runtime.KeepAlive(ballast)
app.healthCheck.Set(healthcheck.Unavailable)
app.logger.Info("Starting shutdown...")

Expand Down Expand Up @@ -284,22 +285,23 @@ func (app *Application) shutdownPipelines() {
func (app *Application) executeUnified() {
app.logger.Info("Starting...", zap.Int("NumCPU", runtime.NumCPU()))

// Set memory ballast
ballast, ballastSizeBytes := app.createMemoryBallast()

app.asyncErrorChannel = make(chan error)

// Setup everything.

app.setupPProf()
app.setupHealthCheck()
app.setupZPages()
app.setupTelemetry()
app.setupTelemetry(ballastSizeBytes)
app.setupPipelines()

// Everything is ready, now run until an event requiring shutdown happens.

app.runAndWaitForShutdownEvent()

// Begin shutdown sequence.

runtime.KeepAlive(ballast)
app.healthCheck.Set(healthcheck.Unavailable)
app.logger.Info("Starting shutdown...")

Expand Down Expand Up @@ -333,3 +335,14 @@ func (app *Application) StartUnified() error {

return rootCmd.Execute()
}

func (app *Application) createMemoryBallast() ([]byte, uint64) {
ballastSizeMiB := builder.MemBallastSize(app.v)
if ballastSizeMiB > 0 {
ballastSizeBytes := uint64(ballastSizeMiB) * 1024 * 1024
ballast := make([]byte, ballastSizeBytes)
app.logger.Info("Using memory ballast", zap.Int("MiBs", ballastSizeMiB))
return ballast, ballastSizeBytes
}
return nil, 0
}
74 changes: 74 additions & 0 deletions cmd/occollector/app/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package collector
import (
"net"
"net/http"
"os"
"runtime"
"testing"

stats "github.com/guillermo/go.procstat"

"github.com/open-telemetry/opentelemetry-service/internal/testutils"

"github.com/open-telemetry/opentelemetry-service/internal/zpagesserver"
Expand Down Expand Up @@ -99,6 +103,72 @@ func TestApplication_StartUnified(t *testing.T) {
<-appDone
}

func testMemBallast(t *testing.T, app *Application, ballastSizeMiB int) {
maxRssBytes := mibToBytes(50)
minVirtualBytes := mibToBytes(ballastSizeMiB)

portArg := []string{
healthCheckHTTPPort, // Keep it as first since its address is used later.
zpagesserver.ZPagesHTTPPort,
"metrics-port",
"receivers.opencensus.port",
}

addresses := getMultipleAvailableLocalAddresses(t, uint(len(portArg)))
for i, addr := range addresses {
_, port, err := net.SplitHostPort(addr)
if err != nil {
t.Fatalf("failed to split host and port from %q: %v", addr, err)
}
app.v.Set(portArg[i], port)
}

// Without exporters the collector will start and just shutdown, no error is expected.
app.v.Set("logging-exporter", true)
app.v.Set("mem-ballast-size-mib", ballastSizeMiB)

appDone := make(chan struct{})
go func() {
defer close(appDone)
if err := app.Start(); err != nil {
t.Fatalf("app.Start() got %v, want nil", err)
}
}()

<-app.readyChan
if !isAppAvailable(t, "http://"+addresses[0]) {
t.Fatalf("app didn't reach ready state")
}
stats := stats.Stat{Pid: os.Getpid()}
err := stats.Update()
if err != nil {
panic(err)
}

if stats.Vsize < minVirtualBytes {
t.Errorf("unexpected virtual memory size. expected: >=%d, got: %d", minVirtualBytes, stats.Vsize)
}

if stats.Rss > maxRssBytes {
t.Errorf("unexpected RSS size. expected: <%d, got: %d", maxRssBytes, stats.Rss)
}

close(app.stopTestChan)
<-appDone
}

// TestApplication_MemBallast starts a new instance of collector with different
// mem ballast sizes and ensures that ballast consumes virtual memory but does
// not count towards RSS mem
func TestApplication_MemBallast(t *testing.T) {
cases := []int{0, 500, 1000}
for i := 0; i < len(cases); i++ {
runtime.GC()
app := newApp()
testMemBallast(t, app, cases[i])
}
}

// isAppAvailable checks if the healthcheck server at the given endpoint is
// returning `available`.
func isAppAvailable(t *testing.T, healthCheckEndPoint string) bool {
Expand All @@ -118,3 +188,7 @@ func getMultipleAvailableLocalAddresses(t *testing.T, numAddresses uint) []strin
}
return addresses
}

func mibToBytes(mib int) uint64 {
return uint64(mib) * 1024 * 1024
}
4 changes: 2 additions & 2 deletions cmd/occollector/app/collector/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func telemetryFlags(flags *flag.FlagSet) {
flags.Uint(metricsPortCfg, 8888, "Port exposing collector telemetry.")
}

func (tel *appTelemetry) init(asyncErrorChannel chan<- error, v *viper.Viper, logger *zap.Logger) error {
func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, v *viper.Viper, logger *zap.Logger) error {
level, err := telemetry.ParseLevel(v.GetString(metricsLevelCfg))
if err != nil {
log.Fatalf("Failed to parse metrics level: %v", err)
Expand All @@ -70,7 +70,7 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, v *viper.Viper, lo
views = append(views, nodebatcher.MetricViews(level)...)
views = append(views, observability.AllViews...)
views = append(views, tailsampling.SamplingProcessorMetricViews(level)...)
processMetricsViews := telemetry.NewProcessMetricsViews()
processMetricsViews := telemetry.NewProcessMetricsViews(ballastSizeBytes)
views = append(views, processMetricsViews.Views()...)
tel.views = views
if err := view.Register(views...); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/google/go-cmp v0.3.0
github.com/gorilla/mux v1.6.2
github.com/grpc-ecosystem/grpc-gateway v1.8.5
github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jaegertracing/jaeger v1.9.0
github.com/omnition/scribe-go v0.0.0-20190131012523-9e3c68f31124
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.6.3/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg
github.com/grpc-ecosystem/grpc-gateway v1.8.5 h1:2+KSC78XiO6Qy0hIjfc1OD9H+hsaJdJlb8Kqsd41CTE=
github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f h1:5qK7cub9F9wqib56+0HZlXgPn24GtmEVRoETcwQoOyA=
github.com/guillermo/go.procstat v0.0.0-20131123175440-34c2813d2e7f/go.mod h1:ovoU5+mwafQ5XoEAuIEA9EMocbfVJ0vDacPD67dpL4k=
github.com/hashicorp/consul v0.0.0-20180615161029-bed22a81e9fd h1:auIpcMc3+//R94n6tzTN+sJDiNvL3k5+Rus62AtvO4M=
github.com/hashicorp/consul v0.0.0-20180615161029-bed22a81e9fd/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand Down
22 changes: 16 additions & 6 deletions internal/collector/telemetry/process_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (

// ProcessMetricsViews is a struct that contains views related to process metrics (cpu, mem, etc)
type ProcessMetricsViews struct {
views []*view.View
done chan struct{}
ballastSizeBytes uint64
views []*view.View
done chan struct{}
}

var mRuntimeAllocMem = stats.Int64("oc.io/process/memory_alloc", "Number of bytes currently allocated in use", "By")
Expand Down Expand Up @@ -55,10 +56,11 @@ var viewCPUSeconds = &view.View{

// NewProcessMetricsViews creates a new set of ProcessMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func NewProcessMetricsViews() *ProcessMetricsViews {
func NewProcessMetricsViews(ballastSizeBytes uint64) *ProcessMetricsViews {
return &ProcessMetricsViews{
views: []*view.View{viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds},
done: make(chan struct{}),
ballastSizeBytes: ballastSizeBytes,
views: []*view.View{viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds},
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -89,7 +91,7 @@ func (pmv *ProcessMetricsViews) StopCollection() {

func (pmv *ProcessMetricsViews) updateViews() {
ms := &runtime.MemStats{}
runtime.ReadMemStats(ms)
pmv.readMemStats(ms)
stats.Record(context.Background(), mRuntimeAllocMem.M(int64(ms.Alloc)))
stats.Record(context.Background(), mRuntimeTotalAllocMem.M(int64(ms.TotalAlloc)))
stats.Record(context.Background(), mRuntimeSysMem.M(int64(ms.Sys)))
Expand All @@ -102,3 +104,11 @@ func (pmv *ProcessMetricsViews) updateViews() {
}
}
}

func (pmv *ProcessMetricsViews) readMemStats(ms *runtime.MemStats) {
runtime.ReadMemStats(ms)
ms.Alloc -= pmv.ballastSizeBytes
ms.HeapAlloc -= pmv.ballastSizeBytes
ms.HeapSys -= pmv.ballastSizeBytes
ms.HeapInuse -= pmv.ballastSizeBytes
}

0 comments on commit 6566e78

Please sign in to comment.