Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Append Service Metrics #318

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions broker/client/append_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"context"
"fmt"
"io"
"os"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
pb "go.gazette.dev/core/broker/protocol"
Expand Down Expand Up @@ -422,10 +425,13 @@ var serveAppends = func(s *AppendService, aa *AsyncAppend, err error) {
// appendBuffer composes a backing File with a bufio.Writer, and additionally
// tracks the offset through which the file is written.
type appendBuffer struct {
// appendBuffer's file presents a limited file interface to not assume more than necessary
// about what it can be used for
file interface {
io.ReaderAt
io.Seeker
io.Writer
Size() int64
}
offset int64
buf *bufio.Writer
Expand Down Expand Up @@ -517,6 +523,7 @@ func newAppendBufferPool() *sync.Pool {
}, "", "failed to create appendBuffer")

fb.pool = pool
AppendServiceCollector.Register(fb.file)
return fb
}

Expand Down Expand Up @@ -544,3 +551,55 @@ var (
appendBufferSize = 8 * 1024 // 8KB.
appendBufferCutoff int64 = 1 << 26 // 64MB.
)

// appendServiceCollector implements prometheus.Collector - registers the files associated with
// each appendBuffer created and emits the total disk usage when asked by prometheus
type appendServiceCollector struct {
files []interface {
Size() int64
}
}

func (asc *appendServiceCollector) Register(file interface{ Size() int64 }) {
asc.files = append(asc.files, file)
}

// Describe implements prometheus.Collector
func (asc *appendServiceCollector) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(asc, ch)
}

// Collect implements prometheus.Collector
func (asc *appendServiceCollector) Collect(ch chan<- prometheus.Metric) {
var size int64
for _, f := range asc.files {
size += f.Size()
}
ch <- prometheus.MustNewConstMetric(
appendServiceDiskBufferDesc,
prometheus.GaugeValue,
float64(size))
}

var (
AppendServiceCollector = &appendServiceCollector{}

appendServiceDiskBufferDesc = prometheus.NewDesc(
"gazette_append_service_disk_buffer_bytes",
"The total size in bytes on disk used by file-backed appendBuffers.",
[]string{}, nil)
)

// file provides an implementation of appendBuffer.file interface with necessary Size() method for collecting
// metrics
type file struct {
*os.File
}

func (f *file) Size() int64 {
if stat, err := f.Stat(); err != nil {
return 0
} else {
return stat.Size()
}
}
3 changes: 2 additions & 1 deletion broker/client/append_service_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ var newAppendBuffer = func() (*appendBuffer, error) {
} else if err = os.Remove(f.Name()); err != nil {
return nil, err
} else {
var fb = &appendBuffer{file: f}
var fb = &appendBuffer{file: &file{f}}
fb.buf = bufio.NewWriterSize(fb, appendBufferSize)
return fb, nil
}
}

2 changes: 1 addition & 1 deletion broker/client/append_service_win.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var newAppendBuffer = func() (*appendBuffer, error) {
return nil, err
} else {
runtime.SetFinalizer(f, removeFileFinalizer)
var fb = &appendBuffer{file: f}
var fb = &appendBuffer{file: &file{f}}
fb.buf = bufio.NewWriterSize(fb, appendBufferSize)
return fb, nil
}
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.3.0
github.com/gorilla/schema v1.2.0
Expand Down Expand Up @@ -52,7 +51,7 @@ require (
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
google.golang.org/api v0.56.0
google.golang.org/grpc v1.40.0
google.golang.org/protobuf v1.27.1 // indirect
google.golang.org/protobuf v1.27.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.0.0-20190620073856-dcce3486da33
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
Expand Down
2 changes: 1 addition & 1 deletion mainboilerplate/runconsumer/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (sc Cmd) Execute(args []string) error {
ks.WatchApplyDelay = bc.Consumer.WatchDelay

// Register Resolver as a prometheus.Collector for tracking shard status
prometheus.MustRegister(service.Resolver)
prometheus.MustRegister(service.Resolver, client.AppendServiceCollector)

log.WithFields(log.Fields{
"zone": spec.Id.Zone,
Expand Down