Skip to content

Commit

Permalink
fix: return disk space check (#751)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Jan 21, 2022
1 parent d005a73 commit 0641244
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 32 deletions.
3 changes: 2 additions & 1 deletion pkg/adhoc/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/fatih/color"
"github.com/prometheus/client_golang/prometheus"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/sirupsen/logrus"

"github.com/pyroscope-io/pyroscope/pkg/agent/spy"
Expand Down Expand Up @@ -81,7 +82,7 @@ func Cli(cfg *config.Adhoc, args []string) error {
return fmt.Errorf("invalid output format '%s', the only supported output formats are 'html', 'pprof' and 'collapsed'", cfg.OutputFormat)
}

st, err := storage.New(newStorageConfig(cfg), logger, prometheus.DefaultRegisterer)
st, err := storage.New(newStorageConfig(cfg), logger, prometheus.DefaultRegisterer, new(health.Controller))
if err != nil {
return fmt.Errorf("could not initialize storage: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/analytics/analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/sirupsen/logrus"

"github.com/pyroscope-io/pyroscope/pkg/config"
Expand Down Expand Up @@ -62,7 +63,7 @@ var _ = Describe("analytics", func() {
defer httpServer.Close()
url = httpServer.URL + "/api/events"

s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())

analytics := NewService(&(*cfg).Server, s, mockStatsProvider{})
Expand Down
14 changes: 7 additions & 7 deletions pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ func newServerService(c *config.Server) (*serverService, error) {
done: make(chan struct{}),
}

svc.storage, err = storage.New(storage.NewConfig(svc.config), svc.logger, prometheus.DefaultRegisterer)
diskPressure := health.DiskPressure{
Threshold: 512 * bytesize.MB,
Path: c.StoragePath,
}

svc.healthController = health.NewController(svc.logger, time.Minute, diskPressure)
svc.storage, err = storage.New(storage.NewConfig(svc.config), svc.logger, prometheus.DefaultRegisterer, svc.healthController)
if err != nil {
return nil, fmt.Errorf("new storage: %w", err)
}
Expand Down Expand Up @@ -120,12 +126,6 @@ func newServerService(c *config.Server) (*serverService, error) {
return nil, fmt.Errorf("new metric exporter: %w", err)
}

diskPressure := health.DiskPressure{
Threshold: 512 * bytesize.MB,
Path: c.StoragePath,
}

svc.healthController = health.NewController(svc.logger, time.Minute, diskPressure)
svc.debugReporter = debug.NewReporter(svc.logger, svc.storage, prometheus.DefaultRegisterer)
svc.directUpstream = direct.New(svc.storage, metricsExporter)
svc.directScrapeUpstream = direct.New(svc.storage, metricsExporter)
Expand Down
3 changes: 2 additions & 1 deletion pkg/dbmanager/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/cheggaaa/pb/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/sirupsen/logrus"

"github.com/pyroscope-io/pyroscope/pkg/agent"
Expand Down Expand Up @@ -60,7 +61,7 @@ func copyData(dbCfg *config.DbManager, stCfg *storage.Config) error {
"src start: %q end: %q, dst start: %q end: %q", srcSt, srcEt, dstSt, dstEt)
}

s, err := storage.New(stCfg, logrus.StandardLogger(), prometheus.DefaultRegisterer)
s, err := storage.New(stCfg, logrus.StandardLogger(), prometheus.DefaultRegisterer, new(health.Controller))
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,14 @@ func (c *Controller) NotificationText() string {
}
return ""
}

func (c *Controller) IsOutOfDiskSpace() bool {
c.m.RLock()
defer c.m.RUnlock()
for i := range c.conditions {
if _, ok := c.conditions[i].(DiskPressure); ok && c.current[i].Status > Healthy {
return true
}
}
return false
}
3 changes: 2 additions & 1 deletion pkg/server/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/testing"
)
Expand All @@ -27,7 +28,7 @@ var _ = Describe("server", func() {
defer GinkgoRecover()

(*cfg).Server.APIBindAddr = ":10044"
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/testing"
)
Expand All @@ -26,7 +27,7 @@ var _ = Describe("server", func() {
defer GinkgoRecover()

(*cfg).Server.APIBindAddr = ":10045"
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/controller_gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/testing"
)
Expand All @@ -40,7 +41,7 @@ var _ = Describe("server", func() {
defer GinkgoRecover()

(*cfg).Server.APIBindAddr = ":10045"
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Expand Down
5 changes: 3 additions & 2 deletions pkg/server/controller_https_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/testing"
)
Expand All @@ -32,7 +33,7 @@ var _ = Describe("server", func() {
(*cfg).Server.TLSCertificateFile = filepath.Join(testDataDir, tlsCertificateFile)
(*cfg).Server.TLSKeyFile = filepath.Join(testDataDir, tlsKeyFile)

s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Expand Down Expand Up @@ -72,7 +73,7 @@ var _ = Describe("server", func() {
const addr = ":10046"
(*cfg).Server.APIBindAddr = addr

s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
"github.com/pyroscope-io/pyroscope/pkg/testing"
Expand All @@ -39,7 +40,7 @@ var _ = Describe("server", func() {
done := make(chan interface{})
go func() {
defer GinkgoRecover()
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/exporter"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
"github.com/pyroscope-io/pyroscope/pkg/testing"
Expand Down Expand Up @@ -68,7 +69,7 @@ var _ = Describe("server", func() {
testing.WithConfig(func(cfg **config.Config) {
BeforeEach(func() {
(*cfg).Server.APIBindAddr = ":10044"
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
e, _ := exporter.NewExporter(nil, nil)
c, _ := New(Config{
Expand Down
11 changes: 8 additions & 3 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"

"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage/labels"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
"github.com/pyroscope-io/pyroscope/pkg/util/bytesize"
)

var (
errRetention = errors.New("could not write because of retention settings")
errClosed = errors.New("storage closed")
errRetention = errors.New("could not write because of retention settings")
errOutOfSpace = errors.New("running out of space")
errClosed = errors.New("storage closed")
)

type Storage struct {
Expand All @@ -35,6 +37,8 @@ type Storage struct {
main *db
labels *labels.Labels

hc *health.Controller

// Maintenance tasks are executed exclusively to avoid competition:
// extensive writing during GC is harmful and deteriorates the
// overall performance. Same for write back, eviction, and retention
Expand Down Expand Up @@ -79,7 +83,7 @@ type SampleObserver interface {
Observe(k []byte, v int)
}

func New(c *Config, logger *logrus.Logger, reg prometheus.Registerer) (*Storage, error) {
func New(c *Config, logger *logrus.Logger, reg prometheus.Registerer, hc *health.Controller) (*Storage, error) {
s := &Storage{
config: c,
storageOptions: &storageOptions{
Expand All @@ -100,6 +104,7 @@ func New(c *Config, logger *logrus.Logger, reg prometheus.Registerer) (*Storage,
queueWorkers: runtime.NumCPU(),
},

hc: hc,
logger: logger,
metrics: newMetrics(reg),
stop: make(chan struct{}),
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/storage_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage/dict"
"github.com/pyroscope-io/pyroscope/pkg/storage/dimension"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
Expand All @@ -22,7 +23,7 @@ var _ = Describe("storage package", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})
})
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/storage_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ func (s *Storage) Put(pi *PutInput) error {
// TODO: This is a pretty broad lock. We should find a way to make these locks more selective.
s.putMutex.Lock()
defer s.putMutex.Unlock()

if s.hc.IsOutOfDiskSpace() {
return errOutOfSpace
}
if pi.StartTime.Before(s.retentionPolicy().LowerTimeBoundary()) {
return errRetention
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/flameql"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage/dimension"
"github.com/pyroscope-io/pyroscope/pkg/storage/segment"
"github.com/pyroscope-io/pyroscope/pkg/storage/tree"
Expand Down Expand Up @@ -301,7 +302,7 @@ var _ = Describe("storage package", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})
suite()
Expand All @@ -311,7 +312,7 @@ var _ = Describe("storage package", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server).WithInMemory(), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server).WithInMemory(), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})
suite()
Expand All @@ -323,7 +324,7 @@ var _ = Describe("persistence", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})
Context("persist data between restarts", func() {
Expand Down Expand Up @@ -360,7 +361,7 @@ var _ = Describe("persistence", func() {
Expect(o.Tree.String()).To(Equal(tree.String()))
Expect(s.Close()).ToNot(HaveOccurred())

s2, err := New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s2, err := New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())

o2, err := s2.Get(&GetInput{
Expand Down Expand Up @@ -542,7 +543,7 @@ var _ = Describe("querying", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
setup()
})
Expand All @@ -553,7 +554,7 @@ var _ = Describe("querying", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server).WithInMemory(), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server).WithInMemory(), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
setup()
})
Expand Down Expand Up @@ -621,7 +622,7 @@ var _ = Describe("CollectGarbage", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})
suite()
Expand All @@ -631,7 +632,7 @@ var _ = Describe("CollectGarbage", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server).WithInMemory(), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server).WithInMemory(), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})
suite()
Expand All @@ -642,7 +643,7 @@ var _ = Describe("Getters", func() {
testing.WithConfig(func(cfg **config.Config) {
JustBeforeEach(func() {
var err error
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry())
s, err = New(NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
Expect(err).ToNot(HaveOccurred())
})

Expand Down
3 changes: 2 additions & 1 deletion pkg/testing/load/cmd/dataloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/pyroscope-io/pyroscope/pkg/config"
"github.com/pyroscope-io/pyroscope/pkg/health"
"github.com/pyroscope-io/pyroscope/pkg/storage"
"github.com/pyroscope-io/pyroscope/pkg/testing/load"
)
Expand Down Expand Up @@ -47,7 +48,7 @@ func openStorage(path string) (*storage.Storage, error) {
CacheEvictThreshold: 0.02,
CacheEvictVolume: 0.10,
MaxNodesSerialization: 2048,
}), logrus.StandardLogger(), prometheus.NewRegistry())
}), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller))
}

func main() {
Expand Down

0 comments on commit 0641244

Please sign in to comment.