From 57adbe36211035327c8de52f30cc2c04d0580689 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 14 May 2019 14:42:23 -0500 Subject: [PATCH 01/12] cleanup and connect packages --- src/x/instrument/extended.go | 16 ++-- src/x/instrument/process.go | 5 +- src/x/process/count_dirent_linux.go | 115 ++++++++++++++++++++++++ src/x/process/process_linux.go | 90 ++++++++++++++++++- src/x/process/process_linux_test.go | 133 ++++++++++++++++++++++++++++ src/x/process/process_notlinux.go | 15 +++- 6 files changed, 358 insertions(+), 16 deletions(-) create mode 100644 src/x/process/count_dirent_linux.go create mode 100644 src/x/process/process_linux_test.go diff --git a/src/x/instrument/extended.go b/src/x/instrument/extended.go index af6341b0c7..69ff51008b 100644 --- a/src/x/instrument/extended.go +++ b/src/x/instrument/extended.go @@ -22,7 +22,6 @@ package instrument import ( "fmt" - "os" "runtime" "strings" "sync/atomic" @@ -177,10 +176,10 @@ func (r *runtimeMetrics) report(metricsType ExtendedMetricsType) { type extendedMetricsReporter struct { baseReporter + processReporter Reporter metricsType ExtendedMetricsType runtime runtimeMetrics - process processMetrics } // NewExtendedMetricsReporter creates a new extended metrics reporter @@ -194,21 +193,20 @@ func NewExtendedMetricsReporter( r.metricsType = metricsType r.init(reportInterval, func() { r.runtime.report(r.metricsType) - if r.metricsType >= ModerateExtendedMetrics { - r.process.report() - } }) + if r.metricsType >= ModerateExtendedMetrics { + // ProcessReporter can be quite slow in some situations (specifically + // counting FDs for processes that have many of them) so it runs on + // its own report loop. + r.processReporter = NewProcessReporter(scope, reportInterval) + } if r.metricsType == NoExtendedMetrics { return r } runtimeScope := scope.SubScope("runtime") - processScope := scope.SubScope("process") r.runtime.NumGoRoutines = runtimeScope.Gauge("num-goroutines") r.runtime.GoMaxProcs = runtimeScope.Gauge("gomaxprocs") - r.process.NumFDs = processScope.Gauge("num-fds") - r.process.NumFDErrors = processScope.Counter("num-fd-errors") - r.process.pid = os.Getpid() if r.metricsType < DetailedExtendedMetrics { return r } diff --git a/src/x/instrument/process.go b/src/x/instrument/process.go index 6bd2855543..d3b2f9b164 100644 --- a/src/x/instrument/process.go +++ b/src/x/instrument/process.go @@ -38,11 +38,12 @@ type processReporter struct { type processMetrics struct { NumFDs tally.Gauge NumFDErrors tally.Counter - pid int + + pid int } func (r *processMetrics) report() { - numFDs, err := process.NumFDs(r.pid) + numFDs, err := process.NumFDsWithDefaultBatchSleep(r.pid) if err == nil { r.NumFDs.Update(float64(numFDs)) } else { diff --git a/src/x/process/count_dirent_linux.go b/src/x/process/count_dirent_linux.go new file mode 100644 index 0000000000..0788f5abc7 --- /dev/null +++ b/src/x/process/count_dirent_linux.go @@ -0,0 +1,115 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// This package is mostly copy-pasted from the standard library, specifically +// this file: https://golang.org/src/os/dir_unix.go + +package process + +import ( + "bytes" + "syscall" + "unsafe" +) + +func countDirent(buf []byte) (consumed int, count int) { + origlen := len(buf) + count = 0 + for len(buf) > 0 { + reclen, ok := direntReclen(buf) + if !ok || reclen > uint64(len(buf)) { + return origlen, count + } + + rec := buf[:reclen] + buf = buf[reclen:] + + ino, ok := direntIno(rec) + if !ok { + break + } + if ino == 0 { // File absent in directory. + continue + } + const namoff = uint64(unsafe.Offsetof(syscall.Dirent{}.Name)) + namlen, ok := direntNamlen(rec) + if !ok || namoff+namlen > uint64(len(rec)) { + break + } + name := rec[namoff : namoff+namlen] + for i, c := range name { + if c == 0 { + name = name[:i] + break + } + } + + if bytes.Equal(name, dotBytes) || bytes.Equal(name, doubleDotBytes) { + // Check for useless names before allocating a string. + continue + } + count++ + } + + return origlen - len(buf), count +} + +func direntReclen(buf []byte) (uint64, bool) { + return readInt(buf, unsafe.Offsetof(syscall.Dirent{}.Reclen), unsafe.Sizeof(syscall.Dirent{}.Reclen)) +} + +func direntIno(buf []byte) (uint64, bool) { + return readInt(buf, unsafe.Offsetof(syscall.Dirent{}.Ino), unsafe.Sizeof(syscall.Dirent{}.Ino)) +} + +func direntNamlen(buf []byte) (uint64, bool) { + reclen, ok := direntReclen(buf) + if !ok { + return 0, false + } + return reclen - uint64(unsafe.Offsetof(syscall.Dirent{}.Name)), true +} + +// readInt returns the size-bytes unsigned integer in native byte order at offset off. +func readInt(b []byte, off, size uintptr) (u uint64, ok bool) { + if len(b) < int(off+size) { + return 0, false + } + return readIntLE(b[off:], size), true +} + +func readIntLE(b []byte, size uintptr) uint64 { + switch size { + case 1: + return uint64(b[0]) + case 2: + _ = b[1] // bounds check hint to compiler; see golang.org/issue/14808 + return uint64(b[0]) | uint64(b[1])<<8 + case 4: + _ = b[3] // bounds check hint to compiler; see golang.org/issue/14808 + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 + case 8: + _ = b[7] // bounds check hint to compiler; see golang.org/issue/14808 + return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | + uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + default: + panic("syscall: readInt with unsupported size") + } +} diff --git a/src/x/process/process_linux.go b/src/x/process/process_linux.go index 3c50a654b9..a0bc8e72a6 100644 --- a/src/x/process/process_linux.go +++ b/src/x/process/process_linux.go @@ -24,12 +24,26 @@ package process import ( "fmt" "os" + "syscall" + "time" ) -// NumFDs returns the number of file descriptors for a given process. -// This is more efficient than the NumFDs() method in the psutils package -// by avoiding reading the destination of the symlinks in the proc directory. -func NumFDs(pid int) (int, error) { +const ( + // syscallBatchSize controls the number of syscalls to perform before + // triggering a sleep. + syscallBatchSize = 10 + defaultSyscallBatchDurationSleepMultiplier = 10 +) + +var ( + dotBytes = []byte(".") + doubleDotBytes = []byte("..") +) + +// NumFDsReference returns the number of file descriptors for a given process. +// This is a reference implementation that can be used to compare against for +// correctness. +func NumFDsReference(pid int) (int, error) { statPath := fmt.Sprintf("/proc/%d/fd", pid) d, err := os.Open(statPath) if err != nil { @@ -39,3 +53,71 @@ func NumFDs(pid int) (int, error) { d.Close() return len(fnames), err } + +// NumFDs returns the number of file descriptors for a given process. +// This is an optimized implementation that avoids allocations as much as +// possible. In terms of wall-clock time it is not much faster than +// NumFDsReference due to the fact that the syscall overhead dominates, +// however, it produces significantly less garbage. +func NumFDs(pid int) (int, error) { + // Multiplier of zero means no throttling. + return NumFDsWithBatchSleep(pid, 0) +} + +// NumFDsWithBatchSleep is the same as NumFDs but it throttles itself to prevent excessive +// CPU usages for processes with a lot of file descriptors. +// +// batchDurationSleepMultiplier is the multiplier by which the amount of time spent performing +// a single batch of syscalls will be multiplied by to determine the amount of time that the +// function will spend sleeping. +// +// For example, if performing syscallBatchSize syscalls takes 500 nanoseconds and +// batchDurationSleepMultiplier is 10 then the function will sleep for ~500 * 10 nanoseconds +// inbetween batches. +// +// In other words, a batchDurationSleepMultiplier will cause the function to take approximately +// 10x longer but require 10x less CPU utilization at any given moment in time. +func NumFDsWithBatchSleep(pid int, batchDurationSleepMultiplier float64) { + statPath := fmt.Sprintf("/proc/%d/fd", pid) + d, err := os.Open(statPath) + if err != nil { + return 0, err + } + defer d.Close() + + var ( + b = make([]byte, 4096) + count = 0 + lastSleep = time.Now() + ) + for i := 0; ; i++ { + if i%syscallBatchSize == 0 && i != 0 { + // Throttle loop to prevent execssive CPU usage. + syscallBatchCompletionDuration := time.Now().Sub(lastSleep) + timeToSleep := time.Duration(float64(syscallBatchCompletionDuration) * batchDurationSleepMultiplier) + if timeToSleep > 0 { + time.Sleep(timeToSleep) + } + lastSleep = time.Now() + } + + n, err := syscall.ReadDirent(int(d.Fd()), b) + if err != nil { + return 0, err + } + if n <= 0 { + break + } + + _, numDirs := countDirent(b[:n]) + count += numDirs + } + + return count, nil +} + +// NumFDsWithDefaultBatchSleep is the same as NumFDsWithBatchSleep except it uses the default value +// for the batchSleepDurationMultiplier. +func NumFDsWithDefaultBatchSleep(pid int) (int, error) { + return NumFDsWithBatchSleep(pid, defaultSyscallBatchDurationSleepMultiplier) +} diff --git a/src/x/process/process_linux_test.go b/src/x/process/process_linux_test.go new file mode 100644 index 0000000000..bf888c7372 --- /dev/null +++ b/src/x/process/process_linux_test.go @@ -0,0 +1,133 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package process + +import ( + "fmt" + "io/ioutil" + "math" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +type cleanupFn func() + +func TestNumFDs(t *testing.T) { + for i := 0; i <= 8; i++ { + var numFiles int + if i == 0 { + numFiles = 0 + } else { + numFiles = int(math.Pow(float64(2), float64(i))) + } + + func() { + numExpectedFds := numFiles + 5 + cleanupFn := createTempFiles(numFiles) + defer cleanupFn() + + selfPID := os.Getpid() + + t.Run(fmt.Sprintf("func: %s, numFiles: %d", "NumFDsReference", numFiles), func(t *testing.T) { + numFDs, err := NumFDsReference(selfPID) + require.NoError(t, err) + require.Equal(t, numExpectedFds, numFDs) + }) + + t.Run(fmt.Sprintf("func: %s, numFiles: %d", "NumFDs", numFiles), func(t *testing.T) { + numFDs, err := NumFDs(selfPID) + require.NoError(t, err) + require.Equal(t, numExpectedFds, numFDs) + }) + + t.Run(fmt.Sprintf("func: %s, numFiles: %d", "NumFDsWithDefaultBatchSleep", numFiles), func(t *testing.T) { + numFDs, err := NumFDsWithDefaultBatchSleep(selfPID) + require.NoError(t, err) + require.Equal(t, numExpectedFds, numFDs) + }) + }() + } +} + +func BenchmarkNumFDs(b *testing.B) { + var ( + // Low for C.I and local testing, bump this up to a much larger number + // when performing actual benchmarking. + numFiles = 16000 + // +5 to account for standard F.Ds that each process gets. + numExpectedFds = numFiles + 5 + ) + cleanupFn := createTempFiles(numFiles) + defer cleanupFn() + + selfPID := os.Getpid() + b.Run("NumFDs", func(b *testing.B) { + for i := 0; i < b.N; i++ { + numFDs, err := NumFDsReference(selfPID) + if err != nil { + b.Fatal(err) + } + if numFDs != numExpectedFds { + b.Fatalf("expected %d files but got %d", numExpectedFds, numFDs) + } + } + }) + + b.Run("NumFDsFast", func(b *testing.B) { + for i := 0; i < b.N; i++ { + numFDs, err := NumFDs(selfPID) + if err != nil { + b.Fatal(err) + } + if numFDs != numExpectedFds { + b.Fatalf("expected %d files but got %d", numExpectedFds, numFDs) + } + } + }) +} + +func createTempFiles(numFiles int) cleanupFn { + tempDir, err := ioutil.TempDir("", "test") + if err != nil { + panic(err) + } + + files := make([]*os.File, 0, numFiles) + for i := 0; i < numFiles; i++ { + tempFilePath := filepath.Join(tempDir, fmt.Sprintf("%d.txt", i)) + f, err := os.OpenFile(tempFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + panic(err) + } + files = append(files, f) + } + + return func() { + for _, f := range files { + f.Close() + } + os.RemoveAll(tempDir) + } + +} diff --git a/src/x/process/process_notlinux.go b/src/x/process/process_notlinux.go index a5ad63a424..5e4000773b 100644 --- a/src/x/process/process_notlinux.go +++ b/src/x/process/process_notlinux.go @@ -29,7 +29,20 @@ import ( var errNotAvailable = errors.New( "cannot get process file descriptors, only available on linux") -// NumFDs returns the number of file descriptors for a given process. +// NumFDs returns the number of file descriptors for a given process and is not available +// on non-linux systems. func NumFDs(pid int) (int, error) { return 0, errNotAvailable } + +// NumFDsWithDefaultBatchSleep returns the number of file descriptors for a given process +// and is not available on non-linux systems. +func NumFDsWithDefaultBatchSleep(pid int) (int, error) { + return 0, errNotAvailable +} + +// NumFDsWithBatchSleep returns the number of file descriptors for a given process and is +// not available on non-linux systems. +func NumFDsWithBatchSleep(pid int) (int, error) { + return 0, errNotAvailable +} From c84d36be15ad1744982fbd8fef28df8c8199c52e Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 14 May 2019 14:45:52 -0500 Subject: [PATCH 02/12] update comment --- src/x/process/count_dirent_linux.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/x/process/count_dirent_linux.go b/src/x/process/count_dirent_linux.go index 0788f5abc7..657555f1d9 100644 --- a/src/x/process/count_dirent_linux.go +++ b/src/x/process/count_dirent_linux.go @@ -19,7 +19,8 @@ // THE SOFTWARE. // This package is mostly copy-pasted from the standard library, specifically -// this file: https://golang.org/src/os/dir_unix.go +// this file: https://golang.org/src/os/dir_unix.go with some changes to prevent +// allocations. package process From 600bd8f2f5e72e67af72f00aa0c9167c40f01e39 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 14 May 2019 14:47:09 -0500 Subject: [PATCH 03/12] update benchmark --- src/x/process/process_linux_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/x/process/process_linux_test.go b/src/x/process/process_linux_test.go index bf888c7372..269eb874a3 100644 --- a/src/x/process/process_linux_test.go +++ b/src/x/process/process_linux_test.go @@ -82,7 +82,7 @@ func BenchmarkNumFDs(b *testing.B) { defer cleanupFn() selfPID := os.Getpid() - b.Run("NumFDs", func(b *testing.B) { + b.Run("NumFDsReference", func(b *testing.B) { for i := 0; i < b.N; i++ { numFDs, err := NumFDsReference(selfPID) if err != nil { @@ -94,7 +94,7 @@ func BenchmarkNumFDs(b *testing.B) { } }) - b.Run("NumFDsFast", func(b *testing.B) { + b.Run("NumFDs", func(b *testing.B) { for i := 0; i < b.N; i++ { numFDs, err := NumFDs(selfPID) if err != nil { @@ -105,6 +105,18 @@ func BenchmarkNumFDs(b *testing.B) { } } }) + + b.Run("NumFDsWithDefaultBatchSleep", func(b *testing.B) { + for i := 0; i < b.N; i++ { + numFDs, err := NumFDsWithDefaultBatchSleep(selfPID) + if err != nil { + b.Fatal(err) + } + if numFDs != numExpectedFds { + b.Fatalf("expected %d files but got %d", numExpectedFds, numFDs) + } + } + }) } func createTempFiles(numFiles int) cleanupFn { From 6ae5b9f85256bc776dc4503b2a035b1faf87d8bd Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 14 May 2019 14:59:15 -0500 Subject: [PATCH 04/12] disable prometheus process reporter --- src/x/instrument/config.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/x/instrument/config.go b/src/x/instrument/config.go index e727f9e903..d0b4bcfd30 100644 --- a/src/x/instrument/config.go +++ b/src/x/instrument/config.go @@ -23,8 +23,10 @@ package instrument import ( "errors" "io" + "os" "time" + promFork "github.com/m3db/prometheus_client_golang/prometheus" "github.com/uber-go/tally" "github.com/uber-go/tally/m3" "github.com/uber-go/tally/multi" @@ -85,6 +87,14 @@ func (mc *MetricsConfiguration) NewRootScope() (tally.Scope, io.Closer, error) { if err != nil { return nil, nil, err } + + // Turn off the default Prometheus process collector because it can be very + // slow if a lot of file descriptors are open. + toUnregister := promFork.NewProcessCollector(os.Getpid(), "") + if unregistered := promFork.Unregister(toUnregister); !unregistered { + return nil, nil, errors.New("unable to unregister prometheus process collector") + } + reporters = append(reporters, r) } if len(reporters) == 0 { From be08c5eb0d535bf2d52a3339a0da4436b52ab8de Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 14 May 2019 15:03:22 -0500 Subject: [PATCH 05/12] fix return args --- src/x/process/process_linux.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/x/process/process_linux.go b/src/x/process/process_linux.go index a0bc8e72a6..9ce1fe3d50 100644 --- a/src/x/process/process_linux.go +++ b/src/x/process/process_linux.go @@ -31,7 +31,7 @@ import ( const ( // syscallBatchSize controls the number of syscalls to perform before // triggering a sleep. - syscallBatchSize = 10 + syscallBatchSize = 10 defaultSyscallBatchDurationSleepMultiplier = 10 ) @@ -77,7 +77,7 @@ func NumFDs(pid int) (int, error) { // // In other words, a batchDurationSleepMultiplier will cause the function to take approximately // 10x longer but require 10x less CPU utilization at any given moment in time. -func NumFDsWithBatchSleep(pid int, batchDurationSleepMultiplier float64) { +func NumFDsWithBatchSleep(pid int, batchDurationSleepMultiplier float64) (int, error) { statPath := fmt.Sprintf("/proc/%d/fd", pid) d, err := os.Open(statPath) if err != nil { From ef52d4e62c61be05913ef15e049fde9d519fcd19 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 14 May 2019 15:23:44 -0500 Subject: [PATCH 06/12] undo unregistering collector --- src/x/instrument/config.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/x/instrument/config.go b/src/x/instrument/config.go index d0b4bcfd30..2012bab206 100644 --- a/src/x/instrument/config.go +++ b/src/x/instrument/config.go @@ -23,10 +23,8 @@ package instrument import ( "errors" "io" - "os" "time" - promFork "github.com/m3db/prometheus_client_golang/prometheus" "github.com/uber-go/tally" "github.com/uber-go/tally/m3" "github.com/uber-go/tally/multi" @@ -88,13 +86,6 @@ func (mc *MetricsConfiguration) NewRootScope() (tally.Scope, io.Closer, error) { return nil, nil, err } - // Turn off the default Prometheus process collector because it can be very - // slow if a lot of file descriptors are open. - toUnregister := promFork.NewProcessCollector(os.Getpid(), "") - if unregistered := promFork.Unregister(toUnregister); !unregistered { - return nil, nil, errors.New("unable to unregister prometheus process collector") - } - reporters = append(reporters, r) } if len(reporters) == 0 { From c9a90c2a720688dd6c49e8ee7c769b08e741a75b Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 15 May 2019 17:28:07 -0400 Subject: [PATCH 07/12] Override prometheus registry --- src/x/instrument/config.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/x/instrument/config.go b/src/x/instrument/config.go index 2012bab206..540aaf09a2 100644 --- a/src/x/instrument/config.go +++ b/src/x/instrument/config.go @@ -25,6 +25,7 @@ import ( "io" "time" + promfork "github.com/m3db/prometheus_client_golang/prometheus" "github.com/uber-go/tally" "github.com/uber-go/tally/m3" "github.com/uber-go/tally/multi" @@ -80,7 +81,15 @@ func (mc *MetricsConfiguration) NewRootScope() (tally.Scope, io.Closer, error) { reporters = append(reporters, r) } if mc.PrometheusReporter != nil { - var opts prometheus.ConfigurationOptions + opts := prometheus.ConfigurationOptions{ + // Override the default registry with an empty one that does not have the default + // registered collectors (Go and Process) because the M3 reporters will emit those + // metrics anyways and some of the metrics can be expensive to collect. For example, + // collecting the number of F.Ds for a process that has many of them can take a long + // time and be very CPU intensive, especially the Prometheus implementation which is + // less optimized than the M3 implementation. + Registry: promfork.NewRegistry(), + } r, err := mc.PrometheusReporter.NewReporter(opts) if err != nil { return nil, nil, err From 1748f1b7c1fec95379f9b64f270fb87e6bd1fe00 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 15 May 2019 17:29:01 -0400 Subject: [PATCH 08/12] dont add space --- src/x/instrument/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/x/instrument/config.go b/src/x/instrument/config.go index 540aaf09a2..ed1c2c5198 100644 --- a/src/x/instrument/config.go +++ b/src/x/instrument/config.go @@ -94,7 +94,6 @@ func (mc *MetricsConfiguration) NewRootScope() (tally.Scope, io.Closer, error) { if err != nil { return nil, nil, err } - reporters = append(reporters, r) } if len(reporters) == 0 { From 898467237556e84600488036d3b6b2a112e23cfc Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 15 May 2019 17:34:13 -0400 Subject: [PATCH 09/12] fix extended metrics methods --- src/x/instrument/extended.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/x/instrument/extended.go b/src/x/instrument/extended.go index 69ff51008b..0d836cd277 100644 --- a/src/x/instrument/extended.go +++ b/src/x/instrument/extended.go @@ -27,6 +27,8 @@ import ( "sync/atomic" "time" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/uber-go/tally" ) @@ -182,6 +184,29 @@ type extendedMetricsReporter struct { runtime runtimeMetrics } +func (e *extendedMetricsReporter) Start() error { + if err := e.baseReporter.Start(); err != nil { + return err + } + if err := e.processReporter.Start(); err != nil { + return err + } + return nil +} + +func (e *extendedMetricsReporter) Stop() error { + multiErr := xerrors.NewMultiError() + + if err := e.baseReporter.Stop(); err != nil { + multiErr = multiErr.Add(err) + } + if err := e.processReporter.Stop(); err != nil { + multiErr = multiErr.Add(err) + } + + return multiErr.FinalError() +} + // NewExtendedMetricsReporter creates a new extended metrics reporter // that reports runtime and process metrics. func NewExtendedMetricsReporter( From e67079266d1ab97a309480cfb100eaa6555344d5 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 15 May 2019 17:35:41 -0400 Subject: [PATCH 10/12] handle nil process reporter --- src/x/instrument/extended.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/x/instrument/extended.go b/src/x/instrument/extended.go index 0d836cd277..b08a77f829 100644 --- a/src/x/instrument/extended.go +++ b/src/x/instrument/extended.go @@ -188,9 +188,13 @@ func (e *extendedMetricsReporter) Start() error { if err := e.baseReporter.Start(); err != nil { return err } - if err := e.processReporter.Start(); err != nil { - return err + + if e.processReporter != nil { + if err := e.processReporter.Start(); err != nil { + return err + } } + return nil } @@ -200,8 +204,11 @@ func (e *extendedMetricsReporter) Stop() error { if err := e.baseReporter.Stop(); err != nil { multiErr = multiErr.Add(err) } - if err := e.processReporter.Stop(); err != nil { - multiErr = multiErr.Add(err) + + if e.processReporter != nil { + if err := e.processReporter.Stop(); err != nil { + multiErr = multiErr.Add(err) + } } return multiErr.FinalError() From e1be759fd32edfb016388e519d06120188f174e3 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 15 May 2019 17:36:49 -0400 Subject: [PATCH 11/12] dont add whitespace --- src/x/instrument/process.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/x/instrument/process.go b/src/x/instrument/process.go index d3b2f9b164..487f4cdb6a 100644 --- a/src/x/instrument/process.go +++ b/src/x/instrument/process.go @@ -38,8 +38,7 @@ type processReporter struct { type processMetrics struct { NumFDs tally.Gauge NumFDErrors tally.Counter - - pid int + pid int } func (r *processMetrics) report() { From f7d36d9fad62d5f06d27df16e39b4d69150f2657 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 16 May 2019 11:41:36 -0400 Subject: [PATCH 12/12] address feedback --- src/x/instrument/config.go | 4 +- src/x/instrument/extended.go | 60 ++++++++++++++--------------- src/x/process/process_linux.go | 4 +- src/x/process/process_linux_test.go | 19 ++++++--- 4 files changed, 47 insertions(+), 40 deletions(-) diff --git a/src/x/instrument/config.go b/src/x/instrument/config.go index ed1c2c5198..e62e53477d 100644 --- a/src/x/instrument/config.go +++ b/src/x/instrument/config.go @@ -25,7 +25,7 @@ import ( "io" "time" - promfork "github.com/m3db/prometheus_client_golang/prometheus" + prom "github.com/m3db/prometheus_client_golang/prometheus" "github.com/uber-go/tally" "github.com/uber-go/tally/m3" "github.com/uber-go/tally/multi" @@ -88,7 +88,7 @@ func (mc *MetricsConfiguration) NewRootScope() (tally.Scope, io.Closer, error) { // collecting the number of F.Ds for a process that has many of them can take a long // time and be very CPU intensive, especially the Prometheus implementation which is // less optimized than the M3 implementation. - Registry: promfork.NewRegistry(), + Registry: prom.NewRegistry(), } r, err := mc.PrometheusReporter.NewReporter(opts) if err != nil { diff --git a/src/x/instrument/extended.go b/src/x/instrument/extended.go index b08a77f829..c5ce55aab2 100644 --- a/src/x/instrument/extended.go +++ b/src/x/instrument/extended.go @@ -184,36 +184,6 @@ type extendedMetricsReporter struct { runtime runtimeMetrics } -func (e *extendedMetricsReporter) Start() error { - if err := e.baseReporter.Start(); err != nil { - return err - } - - if e.processReporter != nil { - if err := e.processReporter.Start(); err != nil { - return err - } - } - - return nil -} - -func (e *extendedMetricsReporter) Stop() error { - multiErr := xerrors.NewMultiError() - - if err := e.baseReporter.Stop(); err != nil { - multiErr = multiErr.Add(err) - } - - if e.processReporter != nil { - if err := e.processReporter.Stop(); err != nil { - multiErr = multiErr.Add(err) - } - } - - return multiErr.FinalError() -} - // NewExtendedMetricsReporter creates a new extended metrics reporter // that reports runtime and process metrics. func NewExtendedMetricsReporter( @@ -258,3 +228,33 @@ func NewExtendedMetricsReporter( return r } + +func (e *extendedMetricsReporter) Start() error { + if err := e.baseReporter.Start(); err != nil { + return err + } + + if e.processReporter != nil { + if err := e.processReporter.Start(); err != nil { + return err + } + } + + return nil +} + +func (e *extendedMetricsReporter) Stop() error { + multiErr := xerrors.NewMultiError() + + if err := e.baseReporter.Stop(); err != nil { + multiErr = multiErr.Add(err) + } + + if e.processReporter != nil { + if err := e.processReporter.Stop(); err != nil { + multiErr = multiErr.Add(err) + } + } + + return multiErr.FinalError() +} diff --git a/src/x/process/process_linux.go b/src/x/process/process_linux.go index 9ce1fe3d50..b18eb2cede 100644 --- a/src/x/process/process_linux.go +++ b/src/x/process/process_linux.go @@ -40,10 +40,10 @@ var ( doubleDotBytes = []byte("..") ) -// NumFDsReference returns the number of file descriptors for a given process. +// numFDsSlow returns the number of file descriptors for a given process. // This is a reference implementation that can be used to compare against for // correctness. -func NumFDsReference(pid int) (int, error) { +func numFDsSlow(pid int) (int, error) { statPath := fmt.Sprintf("/proc/%d/fd", pid) d, err := os.Open(statPath) if err != nil { diff --git a/src/x/process/process_linux_test.go b/src/x/process/process_linux_test.go index 269eb874a3..ee2cc048a1 100644 --- a/src/x/process/process_linux_test.go +++ b/src/x/process/process_linux_test.go @@ -33,6 +33,13 @@ import ( type cleanupFn func() +// Stdin +// stdout +// stderr +// /proc//fd +// One more (not sure what it is, probably something related to the Go test runner.) +const numStdProcessFiles = 5 + func TestNumFDs(t *testing.T) { for i := 0; i <= 8; i++ { var numFiles int @@ -43,14 +50,14 @@ func TestNumFDs(t *testing.T) { } func() { - numExpectedFds := numFiles + 5 + numExpectedFds := numFiles + numStdProcessFiles cleanupFn := createTempFiles(numFiles) defer cleanupFn() selfPID := os.Getpid() - t.Run(fmt.Sprintf("func: %s, numFiles: %d", "NumFDsReference", numFiles), func(t *testing.T) { - numFDs, err := NumFDsReference(selfPID) + t.Run(fmt.Sprintf("func: %s, numFiles: %d", "numFDsSlow", numFiles), func(t *testing.T) { + numFDs, err := numFDsSlow(selfPID) require.NoError(t, err) require.Equal(t, numExpectedFds, numFDs) }) @@ -76,15 +83,15 @@ func BenchmarkNumFDs(b *testing.B) { // when performing actual benchmarking. numFiles = 16000 // +5 to account for standard F.Ds that each process gets. - numExpectedFds = numFiles + 5 + numExpectedFds = numFiles + numStdProcessFiles ) cleanupFn := createTempFiles(numFiles) defer cleanupFn() selfPID := os.Getpid() - b.Run("NumFDsReference", func(b *testing.B) { + b.Run("numFDsSlow", func(b *testing.B) { for i := 0; i < b.N; i++ { - numFDs, err := NumFDsReference(selfPID) + numFDs, err := numFDsSlow(selfPID) if err != nil { b.Fatal(err) }