Skip to content

Commit

Permalink
Implement pprof module extension
Browse files Browse the repository at this point in the history
  • Loading branch information
niklr committed Jun 14, 2021
1 parent ba08520 commit 5b35c1c
Show file tree
Hide file tree
Showing 12 changed files with 525 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,6 @@ explorer_storage_*

# local blskeys for testing
.hmy/blskeys

# pprof profiles
profiles/*.pb.gz
3 changes: 3 additions & 0 deletions api/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
Consensus
BlockProposal
NetworkInfo
Pprof
Prometheus
Synchronize
)
Expand All @@ -36,6 +37,8 @@ func (t Type) String() string {
return "BlockProposal"
case NetworkInfo:
return "NetworkInfo"
case Pprof:
return "Pprof"
case Prometheus:
return "Prometheus"
case Synchronize:
Expand Down
235 changes: 235 additions & 0 deletions api/service/pprof/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package pprof

import (
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/internal/utils"
)

// Config is the config for the pprof service
type Config struct {
Enabled bool
ListenAddr string
Folder string
ProfileNames []string
ProfileIntervals []int
ProfileDebugValues []int
}

type Profile struct {
Name string
Interval int
Debug int
}

func (p Config) String() string {
return fmt.Sprintf("%v, %v, %v, %v/%v/%v", p.Enabled, p.ListenAddr, p.Folder, p.ProfileNames, p.ProfileIntervals, p.ProfileDebugValues)
}

// Constants for profile names
const (
CPU = "cpu"
)

// Service provides access to pprof profiles via HTTP and can save them to local disk periodically as user settings.
type Service struct {
config Config
profiles map[string]Profile
}

var (
initOnce sync.Once
svc = &Service{}
)

// NewService creates the new pprof service
func NewService(cfg Config) *Service {
initOnce.Do(func() {
svc = newService(cfg)
})
return svc
}

func newService(cfg Config) *Service {
if !cfg.Enabled {
utils.Logger().Info().Msg("Pprof service disabled...")
return nil
}

utils.Logger().Debug().Str("cfg", cfg.String()).Msg("Pprof")
svc.config = cfg

if profiles, err := cfg.unpackProfilesIntoMap(); err != nil {
utils.Logger().Error().Err(err).Msg("Could not unpack profiles into map")
return nil
} else {
svc.profiles = profiles
}

go func() {
utils.Logger().Info().Str("address", cfg.ListenAddr).Msg("Starting pprof HTTP service")
http.ListenAndServe(cfg.ListenAddr, nil)
}()

return svc
}

// Start start the service
func (s *Service) Start() error {
dir := getBasePath(s.config.Folder)
err := os.MkdirAll(dir, os.FileMode(0755))
if err != nil {
return err
}

if cpuProfile, ok := s.profiles[CPU]; ok {
handleCpuProfile(cpuProfile, dir, false)
}

for _, profile := range s.profiles {
scheduleProfile(profile, dir)
}

return nil
}

// Stop stop the service
func (s *Service) Stop() error {
dir := getBasePath(s.config.Folder)
for _, profile := range s.profiles {
if profile.Name == CPU {
pprof.StopCPUProfile()
} else {
saveProfile(profile, dir)
}
}
return nil
}

// APIs return all APIs of the service
func (s *Service) APIs() []rpc.API {
return nil
}

// scheduleProfile schedules the provided profile based on the specified interval (e.g. saves the profile every x seconds)
func scheduleProfile(profile Profile, dir string) {
go func() {
if profile.Interval > 0 {
ticker := time.NewTicker(time.Second * time.Duration(profile.Interval))
defer ticker.Stop()
for {
select {
case <-ticker.C:
if profile.Name == CPU {
handleCpuProfile(profile, dir, true)
} else {
saveProfile(profile, dir)
}
}
}
}
}()
}

// saveProfile saves the provided profile in the specified directory
func saveProfile(profile Profile, dir string) {
f, err := newTempFile(dir, profile.Name, ".pb.gz")
if err == nil {
defer f.Close()
if p := pprof.Lookup(profile.Name); p == nil {
utils.Logger().Error().Msg(fmt.Sprintf("Profile does not exist: %s", profile.Name))
} else {
runtime.GC() // get up-to-date statistics
if p.WriteTo(f, profile.Debug); err == nil {
utils.Logger().Info().Msg(fmt.Sprintf("Saved profile in: %s", f.Name()))
}
}
} else {
utils.Logger().Error().Err(err).Msg(fmt.Sprintf("Could not save profile: %s", profile.Name))
}
}

// handleCpuProfile handles the provided CPU profile by stopping or starting it as specified
func handleCpuProfile(profile Profile, dir string, shouldStop bool) {
if shouldStop {
pprof.StopCPUProfile()
}
f, err := newTempFile(dir, profile.Name, ".pb.gz")
if err == nil {
pprof.StartCPUProfile(f)
utils.Logger().Info().Msg(fmt.Sprintf("Saved CPU profile in: %s", f.Name()))
} else {
utils.Logger().Error().Err(err).Msg("Could not start CPU profile")
}
}

// unpackProfilesIntoMap unpacks the profiles specified in the configuration into a map
func (config *Config) unpackProfilesIntoMap() (map[string]Profile, error) {
result := make(map[string]Profile)
if len(config.ProfileNames) > 0 {
for index, name := range config.ProfileNames {
if _, ok := result[name]; ok {
return nil, errors.New(fmt.Sprintf("Pprof profile names contains duplicate: %v", name))
} else {
profile := &Profile{
Name: name,
Interval: 0,
Debug: 0,
}
// Try set interval value
if len(config.ProfileIntervals) == len(config.ProfileNames) {
profile.Interval = config.ProfileIntervals[index]
} else if len(config.ProfileIntervals) > 0 {
profile.Interval = config.ProfileIntervals[0]
}
// Try set debug value
if len(config.ProfileDebugValues) == len(config.ProfileNames) {
profile.Debug = config.ProfileDebugValues[index]
} else if len(config.ProfileDebugValues) > 0 {
profile.Debug = config.ProfileDebugValues[0]
}
result[name] = *profile
}
}
}
return result, nil
}

func getBasePath(origPath string) string {
modifiedPath := strings.Replace(origPath, "\\", "/", -1)
if modifiedPath == "" || strings.HasSuffix(modifiedPath, string(os.PathSeparator)) {
return filepath.Dir(modifiedPath)
} else {
return filepath.Dir(modifiedPath + string(os.PathSeparator))
}
}

func joinPath(dir, name string) string {
return filepath.Join(append(strings.Split(dir, string(os.PathSeparator)), []string{name}...)...)
}

// newTempFile returns a new output file in dir with the provided prefix and suffix.
// Similar to https://github.com/google/pprof/blob/a478d1d731e942fbe69291748caafefe749a19ea/internal/driver/tempfile.go#L25
func newTempFile(dir, name, suffix string) (*os.File, error) {
prefix := name + "."
for index := 1; index < 10000; index++ {
switch f, err := os.OpenFile(joinPath(dir, fmt.Sprintf("%s%03d%s", prefix, index, suffix)), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666); {
case err == nil:
return f, nil
case !os.IsExist(err):
return nil, err
}
}
// Give up
return nil, fmt.Errorf("could not create file of the form %s%03d%s", prefix, 1, suffix)
}
113 changes: 113 additions & 0 deletions api/service/pprof/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package pprof

import (
"errors"
"fmt"
"os"
"reflect"
"strings"
"testing"
)

var Separator = string(os.PathSeparator)

func exerciseGetBasePath(t *testing.T, source, expected string) {
if actual := getBasePath(source); actual != expected {
t.Errorf("expected basePath %#v; got %#v", expected, actual)
}
}

func TestGetBasePath(t *testing.T) {
exerciseGetBasePath(t, "", ".")
exerciseGetBasePath(t, "./test", "test")
exerciseGetBasePath(t, "./test/", "test")
exerciseGetBasePath(t, "./test1/test2", fmt.Sprintf("test1%stest2", Separator))
exerciseGetBasePath(t, ".\\test", "test")
exerciseGetBasePath(t, ".\\test\\", "test")
exerciseGetBasePath(t, ".\\test1\\test2\\", fmt.Sprintf("test1%stest2", Separator))
}

func exerciseJoinPath(t *testing.T, dir, name, expected string) {
if actual := joinPath(dir, name); actual != expected {
t.Errorf("expected joinPath %#v; got %#v", expected, actual)
}
}

func TestJoinPath(t *testing.T) {
exerciseJoinPath(t, "", "", "")
exerciseJoinPath(t, "test1", "", "test1")
exerciseJoinPath(t, "", "test2", "test2")
exerciseJoinPath(t, "test1", Separator, "test1")
exerciseJoinPath(t, "test1", "test2", fmt.Sprintf("test1%stest2", Separator))
}

func TestUnpackProfilesIntoMap(t *testing.T) {
tests := []struct {
input *Config
expMap map[string]Profile
expErr error
}{
{
input: &Config{},
expMap: make(map[string]Profile),
},
{
input: &Config{
ProfileNames: []string{"test", "test"},
},
expMap: nil,
expErr: errors.New("Pprof profile names contains duplicate: test"),
},
{
input: &Config{
ProfileNames: []string{"test"},
},
expMap: map[string]Profile{
"test": {
Name: "test",
},
},
},
{
input: &Config{
ProfileNames: []string{"test1", "test2"},
ProfileIntervals: []int{0, 60},
ProfileDebugValues: []int{1},
},
expMap: map[string]Profile{
"test1": {
Name: "test1",
Interval: 0,
Debug: 1,
},
"test2": {
Name: "test2",
Interval: 60,
Debug: 1,
},
},
},
}
for i, test := range tests {
actual, err := test.input.unpackProfilesIntoMap()
if assErr := assertError(err, test.expErr); assErr != nil {
t.Fatalf("Test %v: %v", i, assErr)
}
if !reflect.DeepEqual(actual, test.expMap) {
t.Errorf("Test %v: unexpected map\n\t%+v\n\t%+v", i, actual, test.expMap)
}
}
}

func assertError(gotErr, expErr error) error {
if (gotErr == nil) != (expErr == nil) {
return fmt.Errorf("error unexpected [%v] / [%v]", gotErr, expErr)
}
if gotErr == nil {
return nil
}
if !strings.Contains(gotErr.Error(), expErr.Error()) {
return fmt.Errorf("error unexpected [%v] / [%v]", gotErr, expErr)
}
return nil
}
8 changes: 6 additions & 2 deletions cmd/harmony/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ type txPoolConfig struct {
}

type pprofConfig struct {
Enabled bool
ListenAddr string
Enabled bool
ListenAddr string
Folder string
ProfileNames []string `toml:",omitempty"`
ProfileIntervals []int `toml:",omitempty"`
ProfileDebugValues []int `toml:",omitempty"`
}

type logConfig struct {
Expand Down
Loading

0 comments on commit 5b35c1c

Please sign in to comment.