Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/sync: add p8s metrics for sync command #4119

Merged
merged 8 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cmd/mdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"sync"
"time"

"github.com/juicedata/juicefs/pkg/utils"
"github.com/mattn/go-isatty"

"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/fs"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/metric"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/mattn/go-isatty"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -221,9 +221,13 @@ func initForMdtest(c *cli.Context, mp string, metaUrl string) *fs.FileSystem {
conf.EntryTimeout = time.Millisecond * time.Duration(c.Float64("entry-cache")*1000)
conf.DirEntryTimeout = time.Millisecond * time.Duration(c.Float64("dir-entry-cache")*1000)

metricsAddr := exposeMetrics(c, m, registerer, registry)
metricsAddr := exposeMetrics(c, registerer, registry)
m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
if c.IsSet("consul") {
metric.RegisterToConsul(c.String("consul"), metricsAddr, conf.Meta.MountPoint)
metadata := make(map[string]string)
metadata["mountPoint"] = conf.Meta.MountPoint
metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata)
}
jfs, err := fs.NewFileSystem(conf, m, store)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,14 @@ func installHandler(mp string) {
}()
}

func exposeMetrics(c *cli.Context, m meta.Meta, registerer prometheus.Registerer, registry *prometheus.Registry) string {
func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string {
var ip, port string
//default set
ip, port, err := net.SplitHostPort(c.String("metrics"))
if err != nil {
logger.Fatalf("metrics format error: %v", err)
}

m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
go metric.UpdateMetrics(m, registerer)
go metric.UpdateMetrics(registerer)
http.Handle("/metrics", promhttp.HandlerFor(
registry,
promhttp.HandlerOpts{
Expand Down Expand Up @@ -424,10 +421,14 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
}

func initBackgroundTasks(c *cli.Context, vfsConf *vfs.Config, metaConf *meta.Config, m meta.Meta, blob object.ObjectStorage, registerer prometheus.Registerer, registry *prometheus.Registry) {
metricsAddr := exposeMetrics(c, m, registerer, registry)
metricsAddr := exposeMetrics(c, registerer, registry)
m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
vfsConf.Port.PrometheusAgent = metricsAddr
if c.IsSet("consul") {
metric.RegisterToConsul(c.String("consul"), metricsAddr, vfsConf.Meta.MountPoint)
metadata := make(map[string]string)
metadata["mountPoint"] = vfsConf.Meta.MountPoint
metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata)
vfsConf.Port.ConsulAddr = c.String("consul")
}
if !metaConf.ReadOnly && !metaConf.NoBGJob && vfsConf.BackupMeta > 0 {
Expand Down
5 changes: 3 additions & 2 deletions cmd/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func Test_exposeMetrics(t *testing.T) {
defer isSetPatches.Reset()
ResetHttp()
registerer, registry := wrapRegister("test", "test")
metricsAddr := exposeMetrics(appCtx, client, registerer, registry)

metricsAddr := exposeMetrics(appCtx, registerer, registry)
client.InitMetrics(registerer)
vfs.InitMetrics(registerer)
u := url.URL{Scheme: "http", Host: metricsAddr, Path: "/metrics"}
resp, err := http.Get(u.String())
So(err, ShouldBeNil)
Expand Down
48 changes: 41 additions & 7 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ import (
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"

"github.com/juicedata/juicefs/pkg/metric"
"github.com/juicedata/juicefs/pkg/object"
"github.com/juicedata/juicefs/pkg/sync"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -78,14 +83,18 @@ Supported storage systems: https://juicefs.com/docs/community/how_to_setup_objec
syncActionFlags(),
syncStorageFlags(),
clusterFlags(),
[]cli.Flag{
&cli.IntFlag{
Name: "http-port",
Value: 6070,
Hidden: true,
Usage: "HTTP `PORT` to listen to",
addCategories("METRICS", []cli.Flag{
&cli.StringFlag{
Name: "metrics",
Value: "127.0.0.1:9567",
Usage: "address to export metrics",
},
},
&cli.StringFlag{
Name: "consul",
Value: "127.0.0.1:8500",
Usage: "consul address to register",
},
}),
),
}
}
Expand Down Expand Up @@ -423,5 +432,30 @@ func doSync(c *cli.Context) error {
os.SetStorageClass(config.StorageClass)
}
}

if config.Manager == "" && !config.Dry {
var srcPath, dstPath string
if strings.HasPrefix(src.String(), "file://") {
srcPath = src.String()
}
if strings.HasPrefix(dst.String(), "file://") {
dstPath = dst.String()
}
srcPath = utils.RemovePassword(srcPath)
dstPath = utils.RemovePassword(dstPath)
registry := prometheus.NewRegistry()
config.Registerer = prometheus.WrapRegistererWithPrefix("juicefs_sync_",
prometheus.WrapRegistererWith(prometheus.Labels{"cmd": "sync", "pid": strconv.Itoa(os.Getpid())}, registry))
config.Registerer.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
config.Registerer.MustRegister(collectors.NewGoCollector())
metricsAddr := exposeMetrics(c, config.Registerer, registry)
if c.IsSet("consul") {
metadata := make(map[string]string)
metadata["src"] = srcPath
metadata["dst"] = dstPath
metadata["pid"] = strconv.Itoa(os.Getpid())
metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata)
}
}
return sync.Sync(src, dst, config)
}
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ require (
xorm.io/xorm v1.0.7
)

require (
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
)

require (
cloud.google.com/go v0.102.1 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
Expand Down Expand Up @@ -205,6 +200,7 @@ require (
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/pyroscope-io/godeltaprof v0.1.2 // indirect
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
Expand All @@ -231,6 +227,7 @@ require (
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a // indirect
github.com/willf/bitset v1.1.11 // indirect
github.com/willf/bloom v2.0.3+incompatible // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
go.opencensus.io v0.23.0 // indirect
Expand All @@ -254,7 +251,7 @@ replace github.com/hanwen/go-fuse/v2 v2.1.1-0.20210611132105-24a1dfe6b4f8 => git

replace github.com/dgrijalva/jwt-go v3.2.0+incompatible => github.com/golang-jwt/jwt v3.2.1+incompatible

replace github.com/vbauerster/mpb/v7 v7.0.3 => github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba
replace github.com/vbauerster/mpb/v7 v7.0.3 => github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b

replace google.golang.org/grpc v1.43.0 => google.golang.org/grpc v1.29.0

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c0
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible/go.mod h1:Ukwa8ffRQLV6QRwpqGioPjn2Wnf7TBDA4DbennDOqHE=
github.com/juicedata/minio v0.0.0-20221113011458-8866d5c9df8c h1:w+4eiZLSLd6aQcy+7wn++hI1caDAm+rNOG7Me5qO7Sw=
github.com/juicedata/minio v0.0.0-20221113011458-8866d5c9df8c/go.mod h1:8oMBmyEWA8aYwMwO7eUNOjvVNOhDeqDlio2RSv6T/4Q=
github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba h1:YSCPvyONPDp/ivKgRanFpNEHh3N5/0UsKmwbbKQIuGE=
github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba/go.mod h1:NXGsfPGx6G2JssqvEcULtDqUrxuuYs4llpv8W6ZUpzk=
github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b h1:0/6suPNZnrOlRlBaU/Bnitu8HiKkkLSzQhHbwQ9AysM=
github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b/go.mod h1:NXGsfPGx6G2JssqvEcULtDqUrxuuYs4llpv8W6ZUpzk=
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down
27 changes: 17 additions & 10 deletions pkg/metric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"

"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -57,7 +55,7 @@ var (
})
)

func UpdateMetrics(m meta.Meta, registerer prometheus.Registerer) {
func UpdateMetrics(registerer prometheus.Registerer) {
if registerer == nil {
return
}
Expand All @@ -66,7 +64,7 @@ func UpdateMetrics(m meta.Meta, registerer prometheus.Registerer) {
registerer.MustRegister(uptime)
}

func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) {
func RegisterToConsul(consulAddr, metricsAddr string, metadata map[string]string) {
if metricsAddr == "" {
logger.Errorf("Metrics server start err,so can't register to consul")
return
Expand Down Expand Up @@ -101,14 +99,23 @@ func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) {
return
}

localMeta := make(map[string]string)
hostname, err := os.Hostname()
if err != nil {
logger.Errorf("Get hostname failed:%s", err)
return
}
localMeta["hostName"] = hostname
localMeta["mountPoint"] = mountPoint
metadata["hostName"] = hostname
var id, name string
if mp, ok := metadata["mountPoint"]; ok {
id = fmt.Sprintf("%s:%s", localIp, mp)
name = "juicefs"
} else {
// for sync metrics, id format: 127.0.0.1;src->dst;pid=6666
id = fmt.Sprintf("%s;%s->%s;pid=%s", localIp, metadata["src"], metadata["dst"], metadata["pid"])
delete(metadata, "src")
delete(metadata, "dst")
name = "juicefs-sync"
}

check := &consulapi.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/metrics", localIp, port),
Expand All @@ -118,11 +125,11 @@ func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) {
}

registration := consulapi.AgentServiceRegistration{
ID: fmt.Sprintf("%s:%s", localIp, mountPoint),
Name: "juicefs",
ID: id,
Name: name,
Port: port,
Address: localIp,
Meta: localMeta,
Meta: metadata,
Check: check,
}
if err = client.Agent().ServiceRegister(&registration); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -57,6 +58,7 @@ type Config struct {

rules []rule
concurrentList chan int
Registerer prometheus.Registerer
}

func envList() []string {
Expand Down
76 changes: 76 additions & 0 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/juicedata/juicefs/pkg/object"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juju/ratelimit"
"github.com/prometheus/client_golang/prometheus"
)

// The max number of key per listing request
Expand Down Expand Up @@ -1032,6 +1033,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
}
}()

initSyncMetrics(config)
for i := 0; i < config.Threads; i++ {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1108,3 +1110,77 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
}
return nil
}

func initSyncMetrics(config *Config) {
if config.Registerer != nil {
config.Registerer.MustRegister(
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "scanned",
Help: "Scanned objects",
}, func() float64 {
return float64(handled.Total())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "handled",
Help: "Handled objects",
}, func() float64 {
return float64(handled.Current())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "pending",
Help: "Pending objects",
}, func() float64 {
return float64(pending.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "copied",
Help: "Copied objects",
}, func() float64 {
return float64(copied.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "copied_bytes",
Help: "Copied bytes",
}, func() float64 {
return float64(copiedBytes.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "skipped",
Help: "Skipped objects",
}, func() float64 {
return float64(skipped.Current())
}),
)
if failed != nil {
config.Registerer.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "failed",
Help: "Failed objects",
}, func() float64 {
return float64(failed.Current())
}))
}
if deleted != nil {
config.Registerer.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "deleted",
Help: "Deleted objects",
}, func() float64 {
return float64(deleted.Current())
}))
}
if checked != nil && checkedBytes != nil {
config.Registerer.MustRegister(
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "checked",
Help: "Checked objects",
}, func() float64 {
return float64(checked.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "checked_bytes",
Help: "Checked bytes",
}, func() float64 {
return float64(checkedBytes.Current())
}))
}
}
}
2 changes: 1 addition & 1 deletion sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
}
m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
go metric.UpdateMetrics(m, registerer)
go metric.UpdateMetrics(registerer)
}

blob, err := cmd.NewReloadableStorage(format, m, func(f *meta.Format) {
Expand Down