From 3dc9888d7f3bdcdb03f5538f2b2f94fb397907ee Mon Sep 17 00:00:00 2001 From: yeya24 Date: Wed, 31 Jul 2019 23:59:02 +0800 Subject: [PATCH 1/4] add some supernode metrics Signed-off-by: yeya24 --- dfdaemon/constant/constant.go | 4 +- dfdaemon/handler/root_handler.go | 2 +- docs/user_guide/metrics.md | 7 + go.mod | 1 + go.sum | 2 + pkg/{util => metricsutils}/metrics_util.go | 3 +- pkg/timeutils/time_util.go | 5 + supernode/config/constants.go | 4 +- supernode/daemon/mgr/dfgettask/manager.go | 33 ++- .../daemon/mgr/dfgettask/manager_test.go | 245 ++++++++++++++---- supernode/daemon/mgr/peer/manager.go | 21 +- supernode/daemon/mgr/peer/manager_test.go | 26 +- supernode/daemon/mgr/task/manager.go | 28 ++ supernode/daemon/mgr/task/manager_test.go | 2 + supernode/daemon/mgr/task/manager_util.go | 44 ++-- .../daemon/mgr/task/manager_util_test.go | 56 ++++ supernode/server/metrics.go | 14 +- supernode/server/router.go | 2 +- supernode/server/router_test.go | 6 +- test/api_metrics_test.go | 8 +- test/util_api.go | 6 +- version/version.go | 6 +- 22 files changed, 425 insertions(+), 100 deletions(-) rename pkg/{util => metricsutils}/metrics_util.go (95%) diff --git a/dfdaemon/constant/constant.go b/dfdaemon/constant/constant.go index c03aa0bf2..b63c40442 100644 --- a/dfdaemon/constant/constant.go +++ b/dfdaemon/constant/constant.go @@ -46,8 +46,8 @@ const ( ) const ( - // Namespace is the prefix of the metrics' name of dragonfly + // Namespace is the prefix of the metricsutils' name of dragonfly Namespace = "dragonfly" - // Subsystem represents metrics for dfdaemon + // Subsystem represents metricsutils for dfdaemon Subsystem = "dfdaemon" ) diff --git a/dfdaemon/handler/root_handler.go b/dfdaemon/handler/root_handler.go index 5473851b7..e2eb4f512 100644 --- a/dfdaemon/handler/root_handler.go +++ b/dfdaemon/handler/root_handler.go @@ -32,6 +32,6 @@ func New() *http.ServeMux { s.HandleFunc("/args", getArgs) s.HandleFunc("/env", getEnv) s.HandleFunc("/debug/version", version.Handler) - s.HandleFunc("/metrics", promhttp.Handler().ServeHTTP) + s.HandleFunc("/metricsutils", promhttp.Handler().ServeHTTP) return s } diff --git a/docs/user_guide/metrics.md b/docs/user_guide/metrics.md index cfbd8a722..c5ef0dd2b 100644 --- a/docs/user_guide/metrics.md +++ b/docs/user_guide/metrics.md @@ -9,6 +9,13 @@ This doc contains all the metrics that Dragonfly components currently support. N - dragonfly_supernode_http_request_duration_seconds{code, handler, method} - http request latency in seconds - dragonfly_supernode_http_request_size_bytes{code, handler, method} - http request size in bytes - dragonfly_supernode_http_response_size_bytes{code, handler, method} - http response size in bytes +- dragonfly_supernode_peers{hostname} - dragonfly peers +- dragonfly_supernode_tasks{taskid, cdnstatus} - dragonfly tasks +- dragonfly_supernode_dfgettasks{taskid, callsystem} - dragonfly dfget tasks +- dragonfly_supernode_daemon_dfgettasks{taskid, callsystem} - dragonfly current dfget tasks, which are called from dfdaemon +- dragonfly_supernode_schedule_duration_milliseconds{taskid} - duration for task scheduling in milliseconds +- dragonfly_supernode_trigger_cdn_total{} - total times of triggering cdn. +- dragonfly_supernode_trigger_cdn_failed_total{} - total failed times of triggering cdn. ## Dfdaemon diff --git a/go.mod b/go.mod index 7f5675924..18dd7feb3 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/stretchr/testify v1.2.2 github.com/valyala/fasthttp v1.3.0 github.com/willf/bitset v0.0.0-20190228212526-18bd95f470f9 + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect gopkg.in/gcfg.v1 v1.2.3 gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528 // indirect gopkg.in/warnings.v0 v0.1.2 diff --git a/go.sum b/go.sum index 405f174ed..583d2a606 100644 --- a/go.sum +++ b/go.sum @@ -182,6 +182,8 @@ golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco= golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/pkg/util/metrics_util.go b/pkg/metricsutils/metrics_util.go similarity index 95% rename from pkg/util/metrics_util.go rename to pkg/metricsutils/metrics_util.go index 9d1d27a2a..98a3dfd5f 100644 --- a/pkg/util/metrics_util.go +++ b/pkg/metricsutils/metrics_util.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package util +package metricsutils import ( "github.com/prometheus/client_golang/prometheus" @@ -26,6 +26,7 @@ const ( ) // NewCounter will auto-register a Counter metric to prometheus default registry and return it. +// TODO(yeya24): Stop using default registry, add registry as a parameter instead. func NewCounter(subsystem, name, help string, labels []string) *prometheus.CounterVec { return promauto.NewCounterVec( prometheus.CounterOpts{ diff --git a/pkg/timeutils/time_util.go b/pkg/timeutils/time_util.go index 53a6886bd..f7526b3c8 100644 --- a/pkg/timeutils/time_util.go +++ b/pkg/timeutils/time_util.go @@ -24,3 +24,8 @@ import ( func GetCurrentTimeMillis() int64 { return time.Now().UnixNano() / time.Millisecond.Nanoseconds() } + +// GetCurrentTimeMillisFloat returns the time in millis for now in float64 format. +func GetCurrentTimeMillisFloat() float64 { + return float64(time.Now().UnixNano()) / float64(time.Millisecond.Nanoseconds()) +} diff --git a/supernode/config/constants.go b/supernode/config/constants.go index 2b7f4ad09..988ef41fb 100644 --- a/supernode/config/constants.go +++ b/supernode/config/constants.go @@ -70,8 +70,8 @@ const ( ) const ( - // SubsystemSupernode represents metrics from supernode + // SubsystemSupernode represents metricsutils from supernode SubsystemSupernode = "supernode" - // SubsystemDfget represents metrics from dfget + // SubsystemDfget represents metricsutils from dfget SubsystemDfget = "dfget" ) diff --git a/supernode/daemon/mgr/dfgettask/manager.go b/supernode/daemon/mgr/dfgettask/manager.go index b417e6fcb..684ebadd0 100644 --- a/supernode/daemon/mgr/dfgettask/manager.go +++ b/supernode/daemon/mgr/dfgettask/manager.go @@ -19,6 +19,8 @@ package dfgettask import ( "context" "fmt" + "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" + "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" @@ -28,14 +30,31 @@ import ( dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" ) var _ mgr.DfgetTaskMgr = &Manager{} +type metrics struct { + dfgetTasks *prometheus.GaugeVec + dfgetTasksDaemon *prometheus.GaugeVec +} + +func newMetrics() *metrics { + return &metrics{ + dfgetTasks: metricsutils.NewGauge(config.SubsystemSupernode, "dfgettasks", + "The number of dfget tasks", []string{"taskid", "callsystem"}), + + dfgetTasksDaemon: metricsutils.NewGauge(config.SubsystemSupernode, "daemon_dfgettasks", + "The number of dfget tasks from dfdaemon", []string{"taskid", "callsystem"}), + } +} + // Manager is an implementation of the interface of DfgetTaskMgr. type Manager struct { dfgetTaskStore *dutil.Store ptoc *syncmap.SyncMap + metrics *metrics } // NewManager returns a new Manager. @@ -43,6 +62,7 @@ func NewManager() (*Manager, error) { return &Manager{ dfgetTaskStore: dutil.NewStore(), ptoc: syncmap.NewSyncMap(), + metrics: newMetrics(), }, nil } @@ -73,6 +93,13 @@ func (dtm *Manager) Add(ctx context.Context, dfgetTask *types.DfGetTask) error { dtm.ptoc.Add(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID), dfgetTask.CID) dtm.dfgetTaskStore.Put(key, dfgetTask) + + if dfgetTask.Dfdaemon { + dtm.metrics.dfgetTasksDaemon.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Inc() + } else { + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Inc() + } + return nil } @@ -103,7 +130,11 @@ func (dtm *Manager) Delete(ctx context.Context, clientID, taskID string) error { return err } dtm.ptoc.Delete(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID)) - + if dfgetTask.Dfdaemon { + dtm.metrics.dfgetTasksDaemon.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Dec() + } else { + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Dec() + } return dtm.dfgetTaskStore.Delete(key) } diff --git a/supernode/daemon/mgr/dfgettask/manager_test.go b/supernode/daemon/mgr/dfgettask/manager_test.go index 83400099c..9dc7b762b 100644 --- a/supernode/daemon/mgr/dfgettask/manager_test.go +++ b/supernode/daemon/mgr/dfgettask/manager_test.go @@ -24,6 +24,8 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/go-check/check" + "github.com/prometheus/client_golang/prometheus" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" ) func Test(t *testing.T) { @@ -35,61 +37,202 @@ func init() { } type DfgetTaskMgrTestSuite struct { - manager *Manager } -// SetUpSuite does common setup in the beginning of each test. -func (s *DfgetTaskMgrTestSuite) SetUpSuite(c *check.C) { - s.manager, _ = NewManager() +// SetUpTest does common setup in the beginning of each test. +func (s *DfgetTaskMgrTestSuite) SetUpTest(c *check.C) { + // In every test, we should reset Prometheus default registry, otherwise + // it will panic because of duplicate metricsutils. + prometheus.DefaultRegisterer = prometheus.NewRegistry() } -func (s *DfgetTaskMgrTestSuite) TestDfgetTaskMgr(c *check.C) { - clientID := "foo" - taskID := "00c4e7b174af7ed61c414b36ef82810ac0c98142c03e5748c00e1d1113f3c882" - - // Add - dfgetTask := &types.DfGetTask{ - CID: clientID, - Path: "/peer/file/taskFileName", - PieceSize: 4 * 1024 * 1024, - TaskID: taskID, - PeerID: "foo-192.168.10.11-1553838710990554281", +func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { + manager, _ := NewManager() + dfgetNum := manager.metrics.dfgetTasks + dfgetDaemonNum := manager.metrics.dfgetTasksDaemon + + var testCases = []struct { + dfgetTask *types.DfGetTask + Expect *types.DfGetTask + }{ + { + dfgetTask: &types.DfGetTask{ + CID: "foo", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test1", + PeerID: "peer1", + }, + Expect: &types.DfGetTask{ + CID: "foo", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test1", + PeerID: "peer1", + Status: types.DfGetTaskStatusWAITING, + }, + }, + { + dfgetTask: &types.DfGetTask{ + CID: "bar", + CallSystem: "bar", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test2", + PeerID: "peer2", + }, + Expect: &types.DfGetTask{ + CID: "bar", + CallSystem: "bar", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test2", + PeerID: "peer2", + Status: types.DfGetTaskStatusWAITING, + }, + }, } - err := s.manager.Add(context.Background(), dfgetTask) - c.Check(err, check.IsNil) - - // Get - dt, err := s.manager.Get(context.Background(), clientID, taskID) - c.Check(err, check.IsNil) - c.Check(dt, check.DeepEquals, &types.DfGetTask{ - CID: clientID, - Path: "/peer/file/taskFileName", - PieceSize: 4 * 1024 * 1024, - TaskID: taskID, - Status: types.DfGetTaskStatusWAITING, - PeerID: "foo-192.168.10.11-1553838710990554281", - }) - - // UpdateStatus - err = s.manager.UpdateStatus(context.Background(), clientID, taskID, types.DfGetTaskStatusSUCCESS) - c.Check(err, check.IsNil) - - dt, err = s.manager.Get(context.Background(), clientID, taskID) - c.Check(err, check.IsNil) - c.Check(dt, check.DeepEquals, &types.DfGetTask{ - CID: clientID, - Path: "/peer/file/taskFileName", - PieceSize: 4 * 1024 * 1024, - TaskID: taskID, - Status: types.DfGetTaskStatusSUCCESS, - PeerID: "foo-192.168.10.11-1553838710990554281", - }) - - // Delete - err = s.manager.Delete(context.Background(), clientID, taskID) - c.Check(err, check.IsNil) - - _, err = s.manager.Get(context.Background(), clientID, taskID) - c.Check(errortypes.IsDataNotFound(err), check.Equals, true) + for _, tc := range testCases { + err := manager.Add(context.Background(), tc.dfgetTask) + c.Check(err, check.IsNil) + if tc.dfgetTask.Dfdaemon { + c.Assert(1, check.Equals, + int(prom_testutil.ToFloat64( + dfgetDaemonNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) + } else { + c.Assert(1, check.Equals, + int(prom_testutil.ToFloat64( + dfgetNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) + } + + dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) + c.Check(err, check.IsNil) + c.Check(dt, check.DeepEquals, tc.Expect) + } +} + +func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) { + manager, _ := NewManager() + var testCases = []struct { + dfgetTask *types.DfGetTask + taskStatus string + Expect *types.DfGetTask + }{ + { + dfgetTask: &types.DfGetTask{ + CID: "foo", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test1", + PeerID: "peer1", + }, + taskStatus: types.DfGetTaskStatusFAILED, + Expect: &types.DfGetTask{ + CID: "foo", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test1", + PeerID: "peer1", + Status: types.DfGetTaskStatusFAILED, + }, + }, + { + dfgetTask: &types.DfGetTask{ + CID: "bar", + CallSystem: "bar", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test2", + PeerID: "peer2", + }, + taskStatus: types.DfGetTaskStatusSUCCESS, + Expect: &types.DfGetTask{ + CID: "bar", + CallSystem: "bar", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test2", + PeerID: "peer2", + Status: types.DfGetTaskStatusSUCCESS, + }, + }, + } + + for _, tc := range testCases { + err := manager.Add(context.Background(), tc.dfgetTask) + c.Check(err, check.IsNil) + + err = manager.UpdateStatus(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID, tc.taskStatus) + c.Check(err, check.IsNil) + + dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) + c.Check(dt, check.DeepEquals, tc.Expect) + } +} + +func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) { + manager, _ := NewManager() + dfgetNum := manager.metrics.dfgetTasks + dfgetDaemonNum := manager.metrics.dfgetTasksDaemon + + var testCases = []struct { + dfgetTask *types.DfGetTask + }{ + { + dfgetTask: &types.DfGetTask{ + CID: "foo", + CallSystem: "foo", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test1", + PeerID: "peer1", + }, + }, + { + dfgetTask: &types.DfGetTask{ + CID: "bar", + CallSystem: "bar", + Dfdaemon: true, + Path: "/peer/file/taskFileName", + PieceSize: 4 * 1024 * 1024, + TaskID: "test2", + PeerID: "peer2", + }, + }, + } + + for _, tc := range testCases { + err := manager.Add(context.Background(), tc.dfgetTask) + c.Check(err, check.IsNil) + + err = manager.Delete(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) + c.Check(err, check.IsNil) + if tc.dfgetTask.Dfdaemon { + c.Assert(0, check.Equals, + int(prom_testutil.ToFloat64( + dfgetDaemonNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) + } else { + c.Assert(0, check.Equals, + int(prom_testutil.ToFloat64( + dfgetNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) + } + + _, err = manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) + c.Check(errors.IsDataNotFound(err), check.Equals, true) + } +>>>>>>> 40bbeb8... add some supernode metrics } diff --git a/supernode/daemon/mgr/peer/manager.go b/supernode/daemon/mgr/peer/manager.go index 969452fc1..1aaca1584 100644 --- a/supernode/daemon/mgr/peer/manager.go +++ b/supernode/daemon/mgr/peer/manager.go @@ -19,10 +19,12 @@ package peer import ( "context" "fmt" + "github.com/dragonflyoss/Dragonfly/supernode/config" "time" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + metrics_util "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" @@ -30,19 +32,33 @@ import ( "github.com/go-openapi/strfmt" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" ) var _ mgr.PeerMgr = &Manager{} +type metrics struct { + peers *prometheus.GaugeVec +} + +func newMetrics() *metrics { + return &metrics{ + peers: metrics_util.NewGauge(config.SubsystemSupernode, "peers", + "The number of supernode peers", []string{"hostname"}), + } +} + // Manager is an implement of the interface of PeerMgr. type Manager struct { peerStore *dutil.Store + metrics *metrics } // NewManager return a new Manager Object. func NewManager() (*Manager, error) { return &Manager{ peerStore: dutil.NewStore(), + metrics: newMetrics(), }, nil } @@ -67,6 +83,7 @@ func (pm *Manager) Register(ctx context.Context, peerCreateRequest *types.PeerCr Created: strfmt.DateTime(time.Now()), } pm.peerStore.Put(id, peerInfo) + pm.metrics.peers.WithLabelValues(peerInfo.HostName.String()).Inc() return &types.PeerCreateResponse{ ID: id, @@ -75,11 +92,13 @@ func (pm *Manager) Register(ctx context.Context, peerCreateRequest *types.PeerCr // DeRegister a peer from p2p network. func (pm *Manager) DeRegister(ctx context.Context, peerID string) error { - if _, err := pm.getPeerInfo(peerID); err != nil { + peerInfo, err := pm.getPeerInfo(peerID) + if err != nil { return err } pm.peerStore.Delete(peerID) + pm.metrics.peers.WithLabelValues(peerInfo.HostName.String()).Dec() return nil } diff --git a/supernode/daemon/mgr/peer/manager_test.go b/supernode/daemon/mgr/peer/manager_test.go index c8b90cfb5..4e9678337 100644 --- a/supernode/daemon/mgr/peer/manager_test.go +++ b/supernode/daemon/mgr/peer/manager_test.go @@ -24,8 +24,11 @@ import ( "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" + "github.com/dragonflyoss/Dragonfly/version" "github.com/go-check/check" + "github.com/prometheus/client_golang/prometheus" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" ) func Test(t *testing.T) { @@ -39,19 +42,29 @@ func init() { type PeerMgrTestSuite struct { } +// SetUpTest does common setup in the beginning of each test. +func (s *PeerMgrTestSuite) SetUpTest(c *check.C) { + // In every test, we should reset Prometheus default registry, otherwise + // it will panic because of duplicate metricsutils. + prometheus.DefaultRegisterer = prometheus.NewRegistry() +} + func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { manager, _ := NewManager() - + peersNumCounter := manager.metrics.peers // register request := &types.PeerCreateRequest{ IP: "192.168.10.11", HostName: "foo", Port: 65001, - Version: "v0.3.0", + Version: version.DFGetVersion, } resp, err := manager.Register(context.Background(), request) c.Check(err, check.IsNil) + c.Assert(1, check.Equals, + int(prom_testutil.ToFloat64(peersNumCounter.WithLabelValues("foo")))) + // get id := resp.ID info, err := manager.Get(context.Background(), id) @@ -75,6 +88,9 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { err = manager.DeRegister(context.Background(), id) c.Check(err, check.IsNil) + c.Assert(0, check.Equals, + int(prom_testutil.ToFloat64(peersNumCounter.WithLabelValues("foo")))) + // get info, err = manager.Get(context.Background(), id) c.Check(errortypes.IsDataNotFound(err), check.Equals, true) @@ -89,7 +105,7 @@ func (s *PeerMgrTestSuite) TestGet(c *check.C) { IP: "192.168.10.11", HostName: "foo", Port: 65001, - Version: "v0.3.0", + Version: version.DFGetVersion, } resp, err := manager.Register(context.Background(), request) c.Check(err, check.IsNil) @@ -126,7 +142,7 @@ func (s *PeerMgrTestSuite) TestList(c *check.C) { IP: "192.168.10.11", HostName: "foo", Port: 65001, - Version: "v0.3.0", + Version: version.DFGetVersion, } resp, err := manager.Register(context.Background(), request) c.Check(err, check.IsNil) @@ -139,7 +155,7 @@ func (s *PeerMgrTestSuite) TestList(c *check.C) { IP: "192.168.10.11", HostName: "foo2", Port: 65001, - Version: "v0.3.0", + Version: version.DFGetVersion, } resp, err = manager.Register(context.Background(), request) c.Check(err, check.IsNil) diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index d62b5e98a..341768105 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -22,6 +22,7 @@ import ( "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" "github.com/dragonflyoss/Dragonfly/pkg/timeutils" @@ -32,6 +33,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -41,6 +43,30 @@ const ( var _ mgr.TaskMgr = &Manager{} +type metrics struct { + tasks *prometheus.GaugeVec + triggerCdnCount *prometheus.CounterVec + triggerCdnFailCount *prometheus.CounterVec + scheduleDurationMilliSeconds *prometheus.HistogramVec +} + +func newMetrics() *metrics { + return &metrics{ + tasks: metricsutils.NewGauge(config.SubsystemSupernode, "tasks", + "The status of Supernode tasks", []string{"taskid", "cdnstatus"}), + + triggerCdnCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_total", + "The number of triggering cdn", []string{}), + + triggerCdnFailCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_failed_total", + "The number of triggering cdn failure", []string{}), + + scheduleDurationMilliSeconds: metricsutils.NewHistogram(config.SubsystemSupernode, "schedule_duration_milliseconds", + "duration for task scheduling in milliseconds", []string{"taskid"}, + prometheus.ExponentialBuckets(0.02, 2, 7)), + } +} + // Manager is an implementation of the interface of TaskMgr. type Manager struct { cfg *config.Config @@ -56,6 +82,7 @@ type Manager struct { cdnMgr mgr.CDNMgr schedulerMgr mgr.SchedulerMgr OriginClient httpclient.OriginHTTPClient + metrics *metrics } // NewManager returns a new Manager Object. @@ -73,6 +100,7 @@ func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetT accessTimeMap: syncmap.NewSyncMap(), taskURLUnReachableStore: syncmap.NewSyncMap(), OriginClient: originClient, + metrics: newMetrics(), }, nil } diff --git a/supernode/daemon/mgr/task/manager_test.go b/supernode/daemon/mgr/task/manager_test.go index e79c830f1..e9c42c8e9 100644 --- a/supernode/daemon/mgr/task/manager_test.go +++ b/supernode/daemon/mgr/task/manager_test.go @@ -29,6 +29,7 @@ import ( "github.com/go-check/check" "github.com/golang/mock/gomock" + "github.com/prometheus/client_golang/prometheus" ) func Test(t *testing.T) { @@ -73,6 +74,7 @@ func (s *TaskMgrTestSuite) SetUpSuite(c *check.C) { func (s *TaskMgrTestSuite) TearDownSuite(c *check.C) { s.mockCtl.Finish() + prometheus.DefaultRegisterer = prometheus.NewRegistry() } func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) { diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index a70db67c0..0ae90027a 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -19,6 +19,7 @@ package task import ( "context" "fmt" + "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "net/http" "time" @@ -109,6 +110,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq task.PieceTotal = int32((fileLength + (int64(pieceSize) - 1)) / int64(pieceSize)) tm.taskStore.Put(taskID, task) + tm.metrics.tasks.WithLabelValues(taskID, task.CdnStatus).Inc() return task, nil } @@ -152,14 +154,11 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) err return err } - if !isSuccessCDN(updateTaskInfo.CdnStatus) { - // when the origin CDNStatus equals success, do not update it to unsuccessful - if isSuccessCDN(task.CdnStatus) { - return nil - } - - // only update the task CdnStatus when the new CDNStatus and - // the origin CDNStatus both not equals success + // Update the origin task CDNStatus when the origin CDNStatus not equals success. + if !isSuccessCDN(task.CdnStatus) { + // In order to update CDNStatus we should reset the origin CDNStatus to 0. + tm.metrics.tasks.WithLabelValues(taskID, task.CdnStatus).Dec() + tm.metrics.tasks.WithLabelValues(taskID, updateTaskInfo.CdnStatus).Inc() task.CdnStatus = updateTaskInfo.CdnStatus return nil } @@ -174,14 +173,25 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) err task.RealMd5 = updateTaskInfo.RealMd5 } - var pieceTotal int32 - if updateTaskInfo.FileLength > 0 { - pieceTotal = int32((updateTaskInfo.FileLength + int64(task.PieceSize-1)) / int64(task.PieceSize)) - } - if pieceTotal != 0 { - task.PieceTotal = pieceTotal + // only update the task info when the new CDNStatus equals success + // and the origin CDNStatus not equals success. + if isSuccessCDN(updateTaskInfo.CdnStatus) { + if updateTaskInfo.FileLength != 0 { + task.FileLength = updateTaskInfo.FileLength + } + + if !stringutils.IsEmptyStr(updateTaskInfo.RealMd5) { + task.RealMd5 = updateTaskInfo.RealMd5 + } + + var pieceTotal int32 + if updateTaskInfo.FileLength > 0 { + pieceTotal = int32((updateTaskInfo.FileLength + int64(task.PieceSize-1)) / int64(task.PieceSize)) + } + if pieceTotal != 0 { + task.PieceTotal = pieceTotal + } } - task.CdnStatus = updateTaskInfo.CdnStatus return nil } @@ -227,7 +237,9 @@ func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.TaskInf go func() { updateTaskInfo, err := tm.cdnMgr.TriggerCDN(ctx, task) + tm.metrics.triggerCdnCount.WithLabelValues().Inc() if err != nil { + tm.metrics.triggerCdnFailCount.WithLabelValues().Inc() logrus.Errorf("taskID(%s) trigger cdn get error: %v", task.ID, err) } tm.updateTask(task.ID, updateTaskInfo) @@ -324,10 +336,12 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas // get scheduler pieceResult logrus.Debugf("start scheduler for taskID: %s clientID: %s", task.ID, clientID) + startTime := timeutils.GetCurrentTimeMillisFloat() pieceResult, err := tm.schedulerMgr.Schedule(ctx, task.ID, clientID, dfgetTask.PeerID) if err != nil { return false, nil, err } + tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(task.ID).Observe(timeutils.GetCurrentTimeMillisFloat() - startTime) logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID) var pieceInfos []*types.PieceInfo diff --git a/supernode/daemon/mgr/task/manager_util_test.go b/supernode/daemon/mgr/task/manager_util_test.go index 5edfc21b8..9ea5a2702 100644 --- a/supernode/daemon/mgr/task/manager_util_test.go +++ b/supernode/daemon/mgr/task/manager_util_test.go @@ -17,6 +17,7 @@ package task import ( + "context" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock" @@ -24,6 +25,7 @@ import ( "github.com/go-check/check" "github.com/golang/mock/gomock" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" ) func init() { @@ -123,3 +125,57 @@ func (s *TaskUtilTestSuite) TestEqualsTask(c *check.C) { c.Check(result, check.DeepEquals, v.result) } } + +func (s *TaskUtilTestSuite) TestTriggerCdnSyncAction(c *check.C) { + var err error + totalCounter := s.taskManager.metrics.triggerCdnCount + + var cases = []struct { + task *types.TaskInfo + err error + skip bool + total float64 + }{ + { + task: &types.TaskInfo{ + CdnStatus: types.TaskInfoCdnStatusRUNNING, + }, + err: nil, + skip: true, + }, + { + task: &types.TaskInfo{ + CdnStatus: types.TaskInfoCdnStatusSUCCESS, + }, + err: nil, + skip: true, + }, + { + task: &types.TaskInfo{ + ID: "foo", + CdnStatus: types.TaskInfoCdnStatusWAITING, + }, + err: nil, + skip: false, + total: 1, + }, + { + task: &types.TaskInfo{ + ID: "foo1", + CdnStatus: types.TaskInfoCdnStatusWAITING, + }, + err: nil, + skip: false, + total: 2, + }, + } + + for _, tc := range cases { + err = s.taskManager.triggerCdnSyncAction(context.Background(), tc.task) + c.Assert(err, check.Equals, tc.err) + if !tc.skip { + c.Assert(tc.total, check.Equals, + int(prom_testutil.ToFloat64(totalCounter.WithLabelValues()))) + } + } +} diff --git a/supernode/server/metrics.go b/supernode/server/metrics.go index da0b70850..fd333cb11 100644 --- a/supernode/server/metrics.go +++ b/supernode/server/metrics.go @@ -19,14 +19,14 @@ package server import ( "net/http" - "github.com/dragonflyoss/Dragonfly/pkg/util" + "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) -// metrics defines three prometheus metrics for monitoring http handler status +// metricsutils defines three prometheus metricsutils for monitoring http handler status type metrics struct { requestCounter *prometheus.CounterVec requestDuration *prometheus.HistogramVec @@ -36,25 +36,25 @@ type metrics struct { func newMetrics() *metrics { return &metrics{ - requestCounter: util.NewCounter(config.SubsystemSupernode, "http_requests_total", + requestCounter: metricsutils.NewCounter(config.SubsystemSupernode, "http_requests_total", "Counter of HTTP requests.", []string{"code", "handler", "method"}, ), - requestDuration: util.NewHistogram(config.SubsystemSupernode, "http_request_duration_seconds", + requestDuration: metricsutils.NewHistogram(config.SubsystemSupernode, "http_request_duration_seconds", "Histogram of latencies for HTTP requests.", []string{"code", "handler", "method"}, []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, ), - requestSize: util.NewHistogram(config.SubsystemSupernode, "http_request_size_bytes", + requestSize: metricsutils.NewHistogram(config.SubsystemSupernode, "http_request_size_bytes", "Histogram of request size for HTTP requests.", []string{"code", "handler", "method"}, prometheus.ExponentialBuckets(100, 10, 8), ), - responseSize: util.NewHistogram(config.SubsystemSupernode, "http_response_size_bytes", + responseSize: metricsutils.NewHistogram(config.SubsystemSupernode, "http_response_size_bytes", "Histogram of response size for HTTP requests.", []string{"code", "handler", "method"}, prometheus.ExponentialBuckets(100, 10, 8), ), } } -// instrumentHandler will update metrics for every http request +// instrumentHandler will update metricsutils for every http request func (m *metrics) instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { return promhttp.InstrumentHandlerDuration( m.requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), diff --git a/supernode/server/router.go b/supernode/server/router.go index 6f9f99440..d57af4bcd 100644 --- a/supernode/server/router.go +++ b/supernode/server/router.go @@ -54,7 +54,7 @@ func initRoute(s *Server) *mux.Router { {Method: http.MethodGet, Path: "/peers/{id}", HandlerFunc: s.getPeer}, {Method: http.MethodGet, Path: "/peers", HandlerFunc: s.listPeers}, - {Method: http.MethodGet, Path: "/metrics", HandlerFunc: handleMetrics}, + {Method: http.MethodGet, Path: "/metricsutils", HandlerFunc: handleMetrics}, } // register API diff --git a/supernode/server/router_test.go b/supernode/server/router_test.go index 3c176e48f..e53071ecd 100644 --- a/supernode/server/router_test.go +++ b/supernode/server/router_test.go @@ -128,14 +128,14 @@ func (rs *RouterTestSuite) TestVersionHandler(c *check.C) { } func (rs *RouterTestSuite) TestHTTPMetrics(c *check.C) { - // ensure /metrics is accessible - code, _, err := httputils.Get("http://"+rs.addr+"/metrics", 0) + // ensure /metricsutils is accessible + code, _, err := httputils.Get("http://"+rs.addr+"/metricsutils", 0) c.Check(err, check.IsNil) c.Assert(code, check.Equals, 200) counter := m.requestCounter c.Assert(1, check.Equals, - int(prom_testutil.ToFloat64(counter.WithLabelValues(strconv.Itoa(http.StatusOK), "/metrics", "get")))) + int(prom_testutil.ToFloat64(counter.WithLabelValues(strconv.Itoa(http.StatusOK), "/metricsutils", "get")))) for i := 0; i < 5; i++ { code, _, err := httputils.Get("http://"+rs.addr+"/_ping", 0) diff --git a/test/api_metrics_test.go b/test/api_metrics_test.go index 09040de63..969125a87 100644 --- a/test/api_metrics_test.go +++ b/test/api_metrics_test.go @@ -25,7 +25,7 @@ import ( "github.com/go-check/check" ) -// APIMetricsSuite is the test suite for Prometheus metrics. +// APIMetricsSuite is the test suite for Prometheus metricsutils. type APIMetricsSuite struct { starter *command.Starter } @@ -46,16 +46,16 @@ func (s *APIMetricsSuite) TearDownSuite(c *check.C) { s.starter.Clean() } -// TestMetrics tests /metrics API. +// TestMetrics tests /metricsutils API. func (s *APIMetricsSuite) TestMetrics(c *check.C) { - resp, err := request.Get("/metrics") + resp, err := request.Get("/metricsutils") c.Assert(err, check.IsNil) defer resp.Body.Close() CheckRespStatus(c, resp, 200) } -// TestMetricsRequestTotal tests http-related metrics. +// TestMetricsRequestTotal tests http-related metricsutils. func (s *APIMetricsSuite) TestHttpMetrics(c *check.C) { requestCounter := `dragonfly_supernode_http_requests_total{code="%d",handler="%s",method="%s"}` responseSizeSum := `dragonfly_supernode_http_response_size_bytes_sum{code="%d",handler="%s",method="%s"}` diff --git a/test/util_api.go b/test/util_api.go index a4d4d1e72..b6901ba86 100644 --- a/test/util_api.go +++ b/test/util_api.go @@ -36,11 +36,11 @@ func CheckRespStatus(c *check.C, resp *http.Response, status int) { } } -// CheckMetric find the specific metric from /metrics endpoint and it will compare the metric +// CheckMetric find the specific metric from /metricsutils endpoint and it will compare the metric // value with expected value. func CheckMetric(c *check.C, metric string, value float64) { var val float64 - resp, err := request.Get("/metrics") + resp, err := request.Get("/metricsutils") c.Assert(err, check.IsNil) defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) @@ -50,7 +50,7 @@ func CheckMetric(c *check.C, metric string, value float64) { if strings.Contains(line, metric) { vals := strings.Split(line, " ") if len(vals) != 2 { - c.Errorf("bad metrics format") + c.Errorf("bad metricsutils format") } val, err = strconv.ParseFloat(vals[1], 64) c.Assert(err, check.IsNil) diff --git a/version/version.go b/version/version.go index fa850ab6b..06692a288 100644 --- a/version/version.go +++ b/version/version.go @@ -28,7 +28,7 @@ import ( "text/template" "github.com/dragonflyoss/Dragonfly/apis/types" - "github.com/dragonflyoss/Dragonfly/pkg/util" + metricsutils "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" ) var ( @@ -105,9 +105,9 @@ func Print(program string) string { return strings.TrimSpace(buf.String()) } -// NewBuildInfo register a collector which exports metrics about version and build information. +// NewBuildInfo register a collector which exports metricsutils about version and build information. func NewBuildInfo(program string) { - buildInfo := util.NewGauge(program, "build_info", + buildInfo := metricsutils.NewGauge(program, "build_info", fmt.Sprintf("A metric with a constant '1' value labeled by version, revision, os, "+ "arch and goversion from which %s was built.", program), []string{"version", "revision", "os", "arch", "goversion"}, From aba48ed6749b16a2c30b742e3bce404b439b049b Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 1 Aug 2019 12:15:00 +0800 Subject: [PATCH 2/4] add register parameter when register metrics Signed-off-by: yeya24 --- dfdaemon/server.go | 5 +- docs/user_guide/metrics.md | 1 - docs/user_guide/monitoring.md | 2 +- pkg/metricsutils/metrics_util.go | 50 +++++++++++----- pkg/timeutils/time_util.go | 6 +- supernode/daemon/daemon.go | 3 +- supernode/daemon/mgr/dfgettask/manager.go | 31 +++------- .../daemon/mgr/dfgettask/manager_test.go | 57 ++++++------------- supernode/daemon/mgr/peer/manager.go | 14 ++--- supernode/daemon/mgr/peer/manager_test.go | 19 ++----- supernode/daemon/mgr/task/manager.go | 14 ++--- supernode/daemon/mgr/task/manager_test.go | 4 +- supernode/daemon/mgr/task/manager_util.go | 20 +++++-- .../daemon/mgr/task/manager_util_test.go | 5 +- supernode/server/metrics.go | 10 ++-- supernode/server/router.go | 3 +- supernode/server/router_test.go | 3 +- supernode/server/server.go | 15 ++--- version/version.go | 9 ++- 19 files changed, 132 insertions(+), 139 deletions(-) diff --git a/dfdaemon/server.go b/dfdaemon/server.go index e8d6fa751..c41f43179 100644 --- a/dfdaemon/server.go +++ b/dfdaemon/server.go @@ -28,6 +28,7 @@ import ( "github.com/dragonflyoss/Dragonfly/version" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -83,6 +84,8 @@ func New(opts ...Option) (*Server, error) { }, proxy: p, } + // register dfdaemon build information + version.NewBuildInfo("dfdaemon", prometheus.DefaultRegisterer) for _, opt := range opts { if err := opt(s); err != nil { @@ -121,8 +124,6 @@ func (s *Server) Start() error { } else { logrus.Infof("start dfdaemon http server on %s", s.server.Addr) } - // register dfdaemon build information - version.NewBuildInfo("dfdaemon") return s.server.ListenAndServe() } diff --git a/docs/user_guide/metrics.md b/docs/user_guide/metrics.md index c5ef0dd2b..f7cc36e8d 100644 --- a/docs/user_guide/metrics.md +++ b/docs/user_guide/metrics.md @@ -12,7 +12,6 @@ This doc contains all the metrics that Dragonfly components currently support. N - dragonfly_supernode_peers{hostname} - dragonfly peers - dragonfly_supernode_tasks{taskid, cdnstatus} - dragonfly tasks - dragonfly_supernode_dfgettasks{taskid, callsystem} - dragonfly dfget tasks -- dragonfly_supernode_daemon_dfgettasks{taskid, callsystem} - dragonfly current dfget tasks, which are called from dfdaemon - dragonfly_supernode_schedule_duration_milliseconds{taskid} - duration for task scheduling in milliseconds - dragonfly_supernode_trigger_cdn_total{} - total times of triggering cdn. - dragonfly_supernode_trigger_cdn_failed_total{} - total failed times of triggering cdn. diff --git a/docs/user_guide/monitoring.md b/docs/user_guide/monitoring.md index 5b327ff8e..72429958d 100644 --- a/docs/user_guide/monitoring.md +++ b/docs/user_guide/monitoring.md @@ -125,7 +125,7 @@ We provide several functions to add metrics easily. Here is an example to add a import "github.com/dragonflyoss/Dragonfly/pkg/util" requestCounter := util.NewCounter("supernode", "http_requests_total", - "Counter of HTTP requests.", []string{"code"}) + "Counter of HTTP requests.", []string{"code"}, nil) requestCounter.WithLabelValues("200").Inc() ``` diff --git a/pkg/metricsutils/metrics_util.go b/pkg/metricsutils/metrics_util.go index 98a3dfd5f..508a99b3f 100644 --- a/pkg/metricsutils/metrics_util.go +++ b/pkg/metricsutils/metrics_util.go @@ -18,17 +18,19 @@ package metricsutils import ( "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" ) const ( namespace = "dragonfly" ) -// NewCounter will auto-register a Counter metric to prometheus default registry and return it. -// TODO(yeya24): Stop using default registry, add registry as a parameter instead. -func NewCounter(subsystem, name, help string, labels []string) *prometheus.CounterVec { - return promauto.NewCounterVec( +// NewCounter will register a Counter metric to specified registry and return it. +// If registry is not specified, it will register metric to default prometheus registry. +func NewCounter(subsystem, name, help string, labels []string, register prometheus.Registerer) *prometheus.CounterVec { + if register == nil { + register = prometheus.DefaultRegisterer + } + m := prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, @@ -37,11 +39,17 @@ func NewCounter(subsystem, name, help string, labels []string) *prometheus.Count }, labels, ) + register.MustRegister(m) + return m } -// NewGauge will auto-register a Gauge metric to prometheus default registry and return it. -func NewGauge(subsystem, name, help string, labels []string) *prometheus.GaugeVec { - return promauto.NewGaugeVec( +// NewGauge will register a Gauge metric to specified registry and return it. +// If registry is not specified, it will register metric to default prometheus registry. +func NewGauge(subsystem, name, help string, labels []string, register prometheus.Registerer) *prometheus.GaugeVec { + if register == nil { + register = prometheus.DefaultRegisterer + } + m := prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, @@ -50,11 +58,17 @@ func NewGauge(subsystem, name, help string, labels []string) *prometheus.GaugeVe }, labels, ) + register.MustRegister(m) + return m } -// NewSummary will auto-register a Summary metric to prometheus default registry and return it. -func NewSummary(subsystem, name, help string, labels []string, objectives map[float64]float64) *prometheus.SummaryVec { - return promauto.NewSummaryVec( +// NewSummary will register a Summary metric to specified registry and return it. +// If registry is not specified, it will register metric to default prometheus registry. +func NewSummary(subsystem, name, help string, labels []string, objectives map[float64]float64, register prometheus.Registerer) *prometheus.SummaryVec { + if register == nil { + register = prometheus.DefaultRegisterer + } + m := prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, @@ -64,11 +78,17 @@ func NewSummary(subsystem, name, help string, labels []string, objectives map[fl }, labels, ) + register.MustRegister(m) + return m } -// NewHistogram will auto-register a Histogram metric to prometheus default registry and return it. -func NewHistogram(subsystem, name, help string, labels []string, buckets []float64) *prometheus.HistogramVec { - return promauto.NewHistogramVec( +// NewHistogram will register a Histogram metric to specified registry and return it. +// If registry is not specified, it will register metric to default prometheus registry. +func NewHistogram(subsystem, name, help string, labels []string, buckets []float64, register prometheus.Registerer) *prometheus.HistogramVec { + if register == nil { + register = prometheus.DefaultRegisterer + } + m := prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -78,4 +98,6 @@ func NewHistogram(subsystem, name, help string, labels []string, buckets []float }, labels, ) + register.MustRegister(m) + return m } diff --git a/pkg/timeutils/time_util.go b/pkg/timeutils/time_util.go index f7526b3c8..c18c0a956 100644 --- a/pkg/timeutils/time_util.go +++ b/pkg/timeutils/time_util.go @@ -25,7 +25,7 @@ func GetCurrentTimeMillis() int64 { return time.Now().UnixNano() / time.Millisecond.Nanoseconds() } -// GetCurrentTimeMillisFloat returns the time in millis for now in float64 format. -func GetCurrentTimeMillisFloat() float64 { - return float64(time.Now().UnixNano()) / float64(time.Millisecond.Nanoseconds()) +// SinceInMilliseconds gets the time since the specified start in milliseconds. +func SinceInMilliseconds(start time.Time) float64 { + return float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond.Nanoseconds()) } diff --git a/supernode/daemon/daemon.go b/supernode/daemon/daemon.go index 21490476b..13e2ca8a9 100644 --- a/supernode/daemon/daemon.go +++ b/supernode/daemon/daemon.go @@ -26,6 +26,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/server" "github.com/go-openapi/strfmt" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -47,7 +48,7 @@ func New(cfg *config.Config) (*Daemon, error) { return nil, err } - s, err := server.New(cfg) + s, err := server.New(cfg, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/supernode/daemon/mgr/dfgettask/manager.go b/supernode/daemon/mgr/dfgettask/manager.go index 684ebadd0..3e8cc2616 100644 --- a/supernode/daemon/mgr/dfgettask/manager.go +++ b/supernode/daemon/mgr/dfgettask/manager.go @@ -19,13 +19,13 @@ package dfgettask import ( "context" "fmt" - "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" - "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/pkg/syncmap" + "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" @@ -36,17 +36,13 @@ import ( var _ mgr.DfgetTaskMgr = &Manager{} type metrics struct { - dfgetTasks *prometheus.GaugeVec - dfgetTasksDaemon *prometheus.GaugeVec + dfgetTasks *prometheus.GaugeVec } -func newMetrics() *metrics { +func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ dfgetTasks: metricsutils.NewGauge(config.SubsystemSupernode, "dfgettasks", - "The number of dfget tasks", []string{"taskid", "callsystem"}), - - dfgetTasksDaemon: metricsutils.NewGauge(config.SubsystemSupernode, "daemon_dfgettasks", - "The number of dfget tasks from dfdaemon", []string{"taskid", "callsystem"}), + "The number of dfget tasks", []string{"taskid", "callsystem"}, register), } } @@ -58,11 +54,11 @@ type Manager struct { } // NewManager returns a new Manager. -func NewManager() (*Manager, error) { +func NewManager(register prometheus.Registerer) (*Manager, error) { return &Manager{ dfgetTaskStore: dutil.NewStore(), ptoc: syncmap.NewSyncMap(), - metrics: newMetrics(), + metrics: newMetrics(register), }, nil } @@ -93,12 +89,7 @@ func (dtm *Manager) Add(ctx context.Context, dfgetTask *types.DfGetTask) error { dtm.ptoc.Add(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID), dfgetTask.CID) dtm.dfgetTaskStore.Put(key, dfgetTask) - - if dfgetTask.Dfdaemon { - dtm.metrics.dfgetTasksDaemon.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Inc() - } else { - dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Inc() - } + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Inc() return nil } @@ -130,11 +121,7 @@ func (dtm *Manager) Delete(ctx context.Context, clientID, taskID string) error { return err } dtm.ptoc.Delete(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID)) - if dfgetTask.Dfdaemon { - dtm.metrics.dfgetTasksDaemon.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Dec() - } else { - dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Dec() - } + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Dec() return dtm.dfgetTaskStore.Delete(key) } diff --git a/supernode/daemon/mgr/dfgettask/manager_test.go b/supernode/daemon/mgr/dfgettask/manager_test.go index 9dc7b762b..b95c8e30c 100644 --- a/supernode/daemon/mgr/dfgettask/manager_test.go +++ b/supernode/daemon/mgr/dfgettask/manager_test.go @@ -39,17 +39,9 @@ func init() { type DfgetTaskMgrTestSuite struct { } -// SetUpTest does common setup in the beginning of each test. -func (s *DfgetTaskMgrTestSuite) SetUpTest(c *check.C) { - // In every test, we should reset Prometheus default registry, otherwise - // it will panic because of duplicate metricsutils. - prometheus.DefaultRegisterer = prometheus.NewRegistry() -} - func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { - manager, _ := NewManager() - dfgetNum := manager.metrics.dfgetTasks - dfgetDaemonNum := manager.metrics.dfgetTasksDaemon + manager, _ := NewManager(prometheus.NewRegistry()) + dfgetTasks := manager.metrics.dfgetTasks var testCases = []struct { dfgetTask *types.DfGetTask @@ -80,7 +72,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { dfgetTask: &types.DfGetTask{ CID: "bar", CallSystem: "bar", - Dfdaemon: true, + Dfdaemon: false, Path: "/peer/file/taskFileName", PieceSize: 4 * 1024 * 1024, TaskID: "test2", @@ -89,7 +81,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { Expect: &types.DfGetTask{ CID: "bar", CallSystem: "bar", - Dfdaemon: true, + Dfdaemon: false, Path: "/peer/file/taskFileName", PieceSize: 4 * 1024 * 1024, TaskID: "test2", @@ -102,16 +94,9 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { for _, tc := range testCases { err := manager.Add(context.Background(), tc.dfgetTask) c.Check(err, check.IsNil) - if tc.dfgetTask.Dfdaemon { - c.Assert(1, check.Equals, - int(prom_testutil.ToFloat64( - dfgetDaemonNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) - } else { - c.Assert(1, check.Equals, - int(prom_testutil.ToFloat64( - dfgetNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) - } - + c.Assert(1, check.Equals, + int(prom_testutil.ToFloat64( + dfgetTasks.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) c.Check(err, check.IsNil) c.Check(dt, check.DeepEquals, tc.Expect) @@ -119,7 +104,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { } func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) { - manager, _ := NewManager() + manager, _ := NewManager(prometheus.NewRegistry()) var testCases = []struct { dfgetTask *types.DfGetTask taskStatus string @@ -151,7 +136,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) { dfgetTask: &types.DfGetTask{ CID: "bar", CallSystem: "bar", - Dfdaemon: true, + Dfdaemon: false, Path: "/peer/file/taskFileName", PieceSize: 4 * 1024 * 1024, TaskID: "test2", @@ -161,7 +146,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) { Expect: &types.DfGetTask{ CID: "bar", CallSystem: "bar", - Dfdaemon: true, + Dfdaemon: false, Path: "/peer/file/taskFileName", PieceSize: 4 * 1024 * 1024, TaskID: "test2", @@ -184,9 +169,8 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) { } func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) { - manager, _ := NewManager() - dfgetNum := manager.metrics.dfgetTasks - dfgetDaemonNum := manager.metrics.dfgetTasksDaemon + manager, _ := NewManager(prometheus.NewRegistry()) + dfgetTasks := manager.metrics.dfgetTasks var testCases = []struct { dfgetTask *types.DfGetTask @@ -195,7 +179,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) { dfgetTask: &types.DfGetTask{ CID: "foo", CallSystem: "foo", - Dfdaemon: true, + Dfdaemon: false, Path: "/peer/file/taskFileName", PieceSize: 4 * 1024 * 1024, TaskID: "test1", @@ -221,18 +205,11 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) { err = manager.Delete(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) c.Check(err, check.IsNil) - if tc.dfgetTask.Dfdaemon { - c.Assert(0, check.Equals, - int(prom_testutil.ToFloat64( - dfgetDaemonNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) - } else { - c.Assert(0, check.Equals, - int(prom_testutil.ToFloat64( - dfgetNum.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) - } + c.Assert(0, check.Equals, + int(prom_testutil.ToFloat64( + dfgetTasks.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) _, err = manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) - c.Check(errors.IsDataNotFound(err), check.Equals, true) + c.Check(errortypes.IsDataNotFound(err), check.Equals, true) } ->>>>>>> 40bbeb8... add some supernode metrics } diff --git a/supernode/daemon/mgr/peer/manager.go b/supernode/daemon/mgr/peer/manager.go index 1aaca1584..b474d3411 100644 --- a/supernode/daemon/mgr/peer/manager.go +++ b/supernode/daemon/mgr/peer/manager.go @@ -19,14 +19,14 @@ package peer import ( "context" "fmt" - "github.com/dragonflyoss/Dragonfly/supernode/config" "time" "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" - metrics_util "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" + "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" dutil "github.com/dragonflyoss/Dragonfly/supernode/daemon/util" @@ -41,10 +41,10 @@ type metrics struct { peers *prometheus.GaugeVec } -func newMetrics() *metrics { +func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ - peers: metrics_util.NewGauge(config.SubsystemSupernode, "peers", - "The number of supernode peers", []string{"hostname"}), + peers: metricsutils.NewGauge(config.SubsystemSupernode, "peers", + "The number of supernode peers", []string{"hostname"}, register), } } @@ -55,10 +55,10 @@ type Manager struct { } // NewManager return a new Manager Object. -func NewManager() (*Manager, error) { +func NewManager(register prometheus.Registerer) (*Manager, error) { return &Manager{ peerStore: dutil.NewStore(), - metrics: newMetrics(), + metrics: newMetrics(register), }, nil } diff --git a/supernode/daemon/mgr/peer/manager_test.go b/supernode/daemon/mgr/peer/manager_test.go index 4e9678337..3bbbb02d7 100644 --- a/supernode/daemon/mgr/peer/manager_test.go +++ b/supernode/daemon/mgr/peer/manager_test.go @@ -42,16 +42,9 @@ func init() { type PeerMgrTestSuite struct { } -// SetUpTest does common setup in the beginning of each test. -func (s *PeerMgrTestSuite) SetUpTest(c *check.C) { - // In every test, we should reset Prometheus default registry, otherwise - // it will panic because of duplicate metricsutils. - prometheus.DefaultRegisterer = prometheus.NewRegistry() -} - func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { - manager, _ := NewManager() - peersNumCounter := manager.metrics.peers + manager, _ := NewManager(prometheus.NewRegistry()) + peers := manager.metrics.peers // register request := &types.PeerCreateRequest{ IP: "192.168.10.11", @@ -63,7 +56,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { c.Check(err, check.IsNil) c.Assert(1, check.Equals, - int(prom_testutil.ToFloat64(peersNumCounter.WithLabelValues("foo")))) + int(prom_testutil.ToFloat64(peers.WithLabelValues("foo")))) // get id := resp.ID @@ -89,7 +82,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { c.Check(err, check.IsNil) c.Assert(0, check.Equals, - int(prom_testutil.ToFloat64(peersNumCounter.WithLabelValues("foo")))) + int(prom_testutil.ToFloat64(peers.WithLabelValues("foo")))) // get info, err = manager.Get(context.Background(), id) @@ -98,7 +91,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { } func (s *PeerMgrTestSuite) TestGet(c *check.C) { - manager, _ := NewManager() + manager, _ := NewManager(prometheus.NewRegistry()) // register request := &types.PeerCreateRequest{ @@ -136,7 +129,7 @@ func (s *PeerMgrTestSuite) TestGet(c *check.C) { } func (s *PeerMgrTestSuite) TestList(c *check.C) { - manager, _ := NewManager() + manager, _ := NewManager(prometheus.NewRegistry()) // the first data request := &types.PeerCreateRequest{ IP: "192.168.10.11", diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 341768105..48a14a86e 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -50,20 +50,20 @@ type metrics struct { scheduleDurationMilliSeconds *prometheus.HistogramVec } -func newMetrics() *metrics { +func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ tasks: metricsutils.NewGauge(config.SubsystemSupernode, "tasks", - "The status of Supernode tasks", []string{"taskid", "cdnstatus"}), + "The status of Supernode tasks", []string{"taskid", "cdnstatus"}, register), triggerCdnCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_total", - "The number of triggering cdn", []string{}), + "The number of triggering cdn", []string{}, register), triggerCdnFailCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_failed_total", - "The number of triggering cdn failure", []string{}), + "The number of triggering cdn failure", []string{}, register), scheduleDurationMilliSeconds: metricsutils.NewHistogram(config.SubsystemSupernode, "schedule_duration_milliseconds", "duration for task scheduling in milliseconds", []string{"taskid"}, - prometheus.ExponentialBuckets(0.02, 2, 7)), + prometheus.ExponentialBuckets(0.02, 2, 7), register), } } @@ -87,7 +87,7 @@ type Manager struct { // NewManager returns a new Manager Object. func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetTaskMgr, - progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr, originClient httpclient.OriginHTTPClient) (*Manager, error) { + progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr, originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) { return &Manager{ cfg: cfg, taskStore: dutil.NewStore(), @@ -100,7 +100,7 @@ func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetT accessTimeMap: syncmap.NewSyncMap(), taskURLUnReachableStore: syncmap.NewSyncMap(), OriginClient: originClient, - metrics: newMetrics(), + metrics: newMetrics(register), }, nil } diff --git a/supernode/daemon/mgr/task/manager_test.go b/supernode/daemon/mgr/task/manager_test.go index e9c42c8e9..eb0dcdccd 100644 --- a/supernode/daemon/mgr/task/manager_test.go +++ b/supernode/daemon/mgr/task/manager_test.go @@ -67,14 +67,12 @@ func (s *TaskMgrTestSuite) SetUpSuite(c *check.C) { s.mockProgressMgr.EXPECT().InitProgress(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() s.mockOriginClient.EXPECT().GetContentLength(gomock.Any(), gomock.Any()).Return(int64(1000), 200, nil) cfg := config.NewConfig() - s.taskManager, _ = NewManager(cfg, s.mockPeerMgr, s.mockDfgetTaskMgr, - s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient) + s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient, prometheus.NewRegistry()) } func (s *TaskMgrTestSuite) TearDownSuite(c *check.C) { s.mockCtl.Finish() - prometheus.DefaultRegisterer = prometheus.NewRegistry() } func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) { diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index 0ae90027a..bc45a307b 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -19,7 +19,6 @@ package task import ( "context" "fmt" - "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "net/http" "time" @@ -28,6 +27,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/errortypes" "github.com/dragonflyoss/Dragonfly/pkg/netutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" "github.com/dragonflyoss/Dragonfly/supernode/util" @@ -154,9 +154,14 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) err return err } - // Update the origin task CDNStatus when the origin CDNStatus not equals success. - if !isSuccessCDN(task.CdnStatus) { - // In order to update CDNStatus we should reset the origin CDNStatus to 0. + if !isSuccessCDN(updateTaskInfo.CdnStatus) { + // when the origin CDNStatus equals success, do not update it to unsuccessful + if isSuccessCDN(task.CdnStatus) { + return nil + } + + // only update the task CdnStatus when the new CDNStatus and + // the origin CDNStatus both not equals success tm.metrics.tasks.WithLabelValues(taskID, task.CdnStatus).Dec() tm.metrics.tasks.WithLabelValues(taskID, updateTaskInfo.CdnStatus).Inc() task.CdnStatus = updateTaskInfo.CdnStatus @@ -192,6 +197,9 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) err task.PieceTotal = pieceTotal } } + tm.metrics.tasks.WithLabelValues(taskID, task.CdnStatus).Dec() + tm.metrics.tasks.WithLabelValues(taskID, updateTaskInfo.CdnStatus).Inc() + task.CdnStatus = updateTaskInfo.CdnStatus return nil } @@ -336,12 +344,12 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas // get scheduler pieceResult logrus.Debugf("start scheduler for taskID: %s clientID: %s", task.ID, clientID) - startTime := timeutils.GetCurrentTimeMillisFloat() + startTime := time.Now() pieceResult, err := tm.schedulerMgr.Schedule(ctx, task.ID, clientID, dfgetTask.PeerID) if err != nil { return false, nil, err } - tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(task.ID).Observe(timeutils.GetCurrentTimeMillisFloat() - startTime) + tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(task.ID).Observe(timeutils.SinceInMilliseconds(startTime)) logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID) var pieceInfos []*types.PieceInfo diff --git a/supernode/daemon/mgr/task/manager_util_test.go b/supernode/daemon/mgr/task/manager_util_test.go index 9ea5a2702..11fc54eef 100644 --- a/supernode/daemon/mgr/task/manager_util_test.go +++ b/supernode/daemon/mgr/task/manager_util_test.go @@ -25,6 +25,7 @@ import ( "github.com/go-check/check" "github.com/golang/mock/gomock" + "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" ) @@ -53,9 +54,9 @@ func (s *TaskUtilTestSuite) SetUpSuite(c *check.C) { s.mockProgressMgr = mock.NewMockProgressMgr(s.mockCtl) s.mockSchedulerMgr = mock.NewMockSchedulerMgr(s.mockCtl) s.mockOriginClient = cMock.NewMockOriginHTTPClient(s.mockCtl) - s.taskManager, _ = NewManager(config.NewConfig(), s.mockPeerMgr, s.mockDfgetTaskMgr, - s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient) + s.mockProgressMgr, s.mockCDNMgr, s.mockSchedulerMgr, s.mockOriginClient, prometheus.NewRegistry()) + s.mockOriginClient.EXPECT().GetContentLength(gomock.Any(), gomock.Any()).Return(int64(1000), 200, nil) } diff --git a/supernode/server/metrics.go b/supernode/server/metrics.go index fd333cb11..6045d7cda 100644 --- a/supernode/server/metrics.go +++ b/supernode/server/metrics.go @@ -34,22 +34,22 @@ type metrics struct { responseSize *prometheus.HistogramVec } -func newMetrics() *metrics { +func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ requestCounter: metricsutils.NewCounter(config.SubsystemSupernode, "http_requests_total", - "Counter of HTTP requests.", []string{"code", "handler", "method"}, + "Counter of HTTP requests.", []string{"code", "handler", "method"}, register, ), requestDuration: metricsutils.NewHistogram(config.SubsystemSupernode, "http_request_duration_seconds", "Histogram of latencies for HTTP requests.", []string{"code", "handler", "method"}, - []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, + []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, register, ), requestSize: metricsutils.NewHistogram(config.SubsystemSupernode, "http_request_size_bytes", "Histogram of request size for HTTP requests.", []string{"code", "handler", "method"}, - prometheus.ExponentialBuckets(100, 10, 8), + prometheus.ExponentialBuckets(100, 10, 8), register, ), responseSize: metricsutils.NewHistogram(config.SubsystemSupernode, "http_response_size_bytes", "Histogram of response size for HTTP requests.", []string{"code", "handler", "method"}, - prometheus.ExponentialBuckets(100, 10, 8), + prometheus.ExponentialBuckets(100, 10, 8), register, ), } } diff --git a/supernode/server/router.go b/supernode/server/router.go index d57af4bcd..f5edf3734 100644 --- a/supernode/server/router.go +++ b/supernode/server/router.go @@ -26,13 +26,14 @@ import ( "github.com/dragonflyoss/Dragonfly/version" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" ) // versionMatcher defines to parse version url path. const versionMatcher = "/v{version:[0-9.]+}" -var m = newMetrics() +var m = newMetrics(prometheus.DefaultRegisterer) func initRoute(s *Server) *mux.Router { r := mux.NewRouter() diff --git a/supernode/server/router_test.go b/supernode/server/router_test.go index e53071ecd..b8feaa80d 100644 --- a/supernode/server/router_test.go +++ b/supernode/server/router_test.go @@ -34,6 +34,7 @@ import ( "github.com/go-check/check" "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" ) @@ -67,7 +68,7 @@ func (rs *RouterTestSuite) SetUpSuite(c *check.C) { Plugins: nil, Storages: nil, } - s, err := New(testConf) + s, err := New(testConf, prometheus.NewRegistry()) c.Check(err, check.IsNil) version.DFVersion = &types.DragonflyVersion{ Version: "test", diff --git a/supernode/server/server.go b/supernode/server/server.go index 90ed89202..c068aceb9 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/store" "github.com/dragonflyoss/Dragonfly/version" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -48,7 +49,10 @@ type Server struct { } // New creates a brand new server instance. -func New(cfg *config.Config) (*Server, error) { +func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) { + // register supernode build information + version.NewBuildInfo("supernode", register) + sm, err := store.NewManager(cfg) if err != nil { return nil, err @@ -59,13 +63,12 @@ func New(cfg *config.Config) (*Server, error) { } originClient := httpclient.NewOriginClient() - - peerMgr, err := peer.NewManager() + peerMgr, err := peer.NewManager(register) if err != nil { return nil, err } - dfgetTaskMgr, err := dfgettask.NewManager() + dfgetTaskMgr, err := dfgettask.NewManager(register) if err != nil { return nil, err } @@ -85,7 +88,7 @@ func New(cfg *config.Config) (*Server, error) { return nil, err } - taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr, schedulerMgr, originClient) + taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr, schedulerMgr, originClient, register) if err != nil { return nil, err } @@ -112,8 +115,6 @@ func (s *Server) Start() error { return err } - // register supernode build information - version.NewBuildInfo("supernode") server := &http.Server{ Handler: router, ReadTimeout: time.Minute * 10, diff --git a/version/version.go b/version/version.go index 06692a288..7d9a91c91 100644 --- a/version/version.go +++ b/version/version.go @@ -28,7 +28,9 @@ import ( "text/template" "github.com/dragonflyoss/Dragonfly/apis/types" - metricsutils "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" + "github.com/dragonflyoss/Dragonfly/pkg/metricsutils" + + "github.com/prometheus/client_golang/prometheus" ) var ( @@ -105,12 +107,13 @@ func Print(program string) string { return strings.TrimSpace(buf.String()) } -// NewBuildInfo register a collector which exports metricsutils about version and build information. -func NewBuildInfo(program string) { +// NewBuildInfo register a collector which exports metrics about version and build information. +func NewBuildInfo(program string, registerer prometheus.Registerer) { buildInfo := metricsutils.NewGauge(program, "build_info", fmt.Sprintf("A metric with a constant '1' value labeled by version, revision, os, "+ "arch and goversion from which %s was built.", program), []string{"version", "revision", "os", "arch", "goversion"}, + registerer, ) buildInfo.WithLabelValues(version, revision, os, arch, goVersion).Set(1) } From 4053d7a382948d16a697a211bfc23f2fd496b598 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Fri, 2 Aug 2019 18:27:12 +0800 Subject: [PATCH 3/4] add peer label remove some labels which are possibly cause cardinality problem Signed-off-by: yeya24 --- dfdaemon/constant/constant.go | 4 +- dfdaemon/handler/root_handler.go | 2 +- docs/user_guide/metrics.md | 8 ++-- supernode/config/constants.go | 4 +- supernode/daemon/mgr/dfgettask/manager.go | 6 +-- .../daemon/mgr/dfgettask/manager_test.go | 4 +- supernode/daemon/mgr/peer/manager.go | 11 +++-- supernode/daemon/mgr/peer/manager_test.go | 4 +- supernode/daemon/mgr/task/manager.go | 6 +-- supernode/daemon/mgr/task/manager_util.go | 41 ++++++++----------- .../daemon/mgr/task/manager_util_test.go | 1 + supernode/server/metrics.go | 12 +++--- supernode/server/router.go | 3 +- supernode/server/router_test.go | 8 ++-- test/api_metrics_test.go | 24 +++++------ test/util_api.go | 6 +-- 16 files changed, 72 insertions(+), 72 deletions(-) diff --git a/dfdaemon/constant/constant.go b/dfdaemon/constant/constant.go index b63c40442..df53a8930 100644 --- a/dfdaemon/constant/constant.go +++ b/dfdaemon/constant/constant.go @@ -46,8 +46,8 @@ const ( ) const ( - // Namespace is the prefix of the metricsutils' name of dragonfly + // Namespace is the prefix of metrics namespace of dragonfly Namespace = "dragonfly" - // Subsystem represents metricsutils for dfdaemon + // Subsystem represents metrics for dfdaemon Subsystem = "dfdaemon" ) diff --git a/dfdaemon/handler/root_handler.go b/dfdaemon/handler/root_handler.go index e2eb4f512..5473851b7 100644 --- a/dfdaemon/handler/root_handler.go +++ b/dfdaemon/handler/root_handler.go @@ -32,6 +32,6 @@ func New() *http.ServeMux { s.HandleFunc("/args", getArgs) s.HandleFunc("/env", getEnv) s.HandleFunc("/debug/version", version.Handler) - s.HandleFunc("/metricsutils", promhttp.Handler().ServeHTTP) + s.HandleFunc("/metrics", promhttp.Handler().ServeHTTP) return s } diff --git a/docs/user_guide/metrics.md b/docs/user_guide/metrics.md index f7cc36e8d..3e50461c2 100644 --- a/docs/user_guide/metrics.md +++ b/docs/user_guide/metrics.md @@ -9,10 +9,10 @@ This doc contains all the metrics that Dragonfly components currently support. N - dragonfly_supernode_http_request_duration_seconds{code, handler, method} - http request latency in seconds - dragonfly_supernode_http_request_size_bytes{code, handler, method} - http request size in bytes - dragonfly_supernode_http_response_size_bytes{code, handler, method} - http response size in bytes -- dragonfly_supernode_peers{hostname} - dragonfly peers -- dragonfly_supernode_tasks{taskid, cdnstatus} - dragonfly tasks -- dragonfly_supernode_dfgettasks{taskid, callsystem} - dragonfly dfget tasks -- dragonfly_supernode_schedule_duration_milliseconds{taskid} - duration for task scheduling in milliseconds +- dragonfly_supernode_peers{peer} - dragonfly peers, the label peer consists of the hostname and ip address of one peer. +- dragonfly_supernode_tasks{cdnstatus} - dragonfly tasks +- dragonfly_supernode_dfgettasks{callsystem} - dragonfly dfget tasks +- dragonfly_supernode_schedule_duration_milliseconds{peer} - duration for task scheduling in milliseconds - dragonfly_supernode_trigger_cdn_total{} - total times of triggering cdn. - dragonfly_supernode_trigger_cdn_failed_total{} - total failed times of triggering cdn. diff --git a/supernode/config/constants.go b/supernode/config/constants.go index 988ef41fb..2b7f4ad09 100644 --- a/supernode/config/constants.go +++ b/supernode/config/constants.go @@ -70,8 +70,8 @@ const ( ) const ( - // SubsystemSupernode represents metricsutils from supernode + // SubsystemSupernode represents metrics from supernode SubsystemSupernode = "supernode" - // SubsystemDfget represents metricsutils from dfget + // SubsystemDfget represents metrics from dfget SubsystemDfget = "dfget" ) diff --git a/supernode/daemon/mgr/dfgettask/manager.go b/supernode/daemon/mgr/dfgettask/manager.go index 3e8cc2616..1173d98d8 100644 --- a/supernode/daemon/mgr/dfgettask/manager.go +++ b/supernode/daemon/mgr/dfgettask/manager.go @@ -42,7 +42,7 @@ type metrics struct { func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ dfgetTasks: metricsutils.NewGauge(config.SubsystemSupernode, "dfgettasks", - "The number of dfget tasks", []string{"taskid", "callsystem"}, register), + "The number of dfget tasks", []string{"callsystem"}, register), } } @@ -89,7 +89,7 @@ func (dtm *Manager) Add(ctx context.Context, dfgetTask *types.DfGetTask) error { dtm.ptoc.Add(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID), dfgetTask.CID) dtm.dfgetTaskStore.Put(key, dfgetTask) - dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Inc() + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem).Inc() return nil } @@ -121,7 +121,7 @@ func (dtm *Manager) Delete(ctx context.Context, clientID, taskID string) error { return err } dtm.ptoc.Delete(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID)) - dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.TaskID, dfgetTask.CallSystem).Dec() + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem).Dec() return dtm.dfgetTaskStore.Delete(key) } diff --git a/supernode/daemon/mgr/dfgettask/manager_test.go b/supernode/daemon/mgr/dfgettask/manager_test.go index b95c8e30c..15fc822c4 100644 --- a/supernode/daemon/mgr/dfgettask/manager_test.go +++ b/supernode/daemon/mgr/dfgettask/manager_test.go @@ -96,7 +96,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { c.Check(err, check.IsNil) c.Assert(1, check.Equals, int(prom_testutil.ToFloat64( - dfgetTasks.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) + dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem)))) dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) c.Check(err, check.IsNil) c.Check(dt, check.DeepEquals, tc.Expect) @@ -207,7 +207,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) { c.Check(err, check.IsNil) c.Assert(0, check.Equals, int(prom_testutil.ToFloat64( - dfgetTasks.WithLabelValues(tc.dfgetTask.TaskID, tc.dfgetTask.CallSystem)))) + dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem)))) _, err = manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) c.Check(errortypes.IsDataNotFound(err), check.Equals, true) diff --git a/supernode/daemon/mgr/peer/manager.go b/supernode/daemon/mgr/peer/manager.go index b474d3411..143587850 100644 --- a/supernode/daemon/mgr/peer/manager.go +++ b/supernode/daemon/mgr/peer/manager.go @@ -44,7 +44,7 @@ type metrics struct { func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ peers: metricsutils.NewGauge(config.SubsystemSupernode, "peers", - "The number of supernode peers", []string{"hostname"}, register), + "The number of supernode peers", []string{"peer"}, register), } } @@ -83,7 +83,7 @@ func (pm *Manager) Register(ctx context.Context, peerCreateRequest *types.PeerCr Created: strfmt.DateTime(time.Now()), } pm.peerStore.Put(id, peerInfo) - pm.metrics.peers.WithLabelValues(peerInfo.HostName.String()).Inc() + pm.metrics.peers.WithLabelValues(GeneratePeerName(peerInfo)).Inc() return &types.PeerCreateResponse{ ID: id, @@ -98,7 +98,7 @@ func (pm *Manager) DeRegister(ctx context.Context, peerID string) error { } pm.peerStore.Delete(peerID) - pm.metrics.peers.WithLabelValues(peerInfo.HostName.String()).Dec() + pm.metrics.peers.WithLabelValues(GeneratePeerName(peerInfo)).Dec() return nil } @@ -200,3 +200,8 @@ func getLessFunc(listResult []interface{}, desc bool) (less func(i, j int) bool) func generatePeerID(peerInfo *types.PeerCreateRequest) string { return fmt.Sprintf("%s-%s-%d", peerInfo.HostName.String(), peerInfo.IP.String(), time.Now().UnixNano()) } + +// GeneratePeerName extracts the hostname and ip from peerInfo. +func GeneratePeerName(info *types.PeerInfo) string { + return info.HostName.String() + "-" + info.IP.String() +} diff --git a/supernode/daemon/mgr/peer/manager_test.go b/supernode/daemon/mgr/peer/manager_test.go index 3bbbb02d7..6af1f2457 100644 --- a/supernode/daemon/mgr/peer/manager_test.go +++ b/supernode/daemon/mgr/peer/manager_test.go @@ -56,7 +56,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { c.Check(err, check.IsNil) c.Assert(1, check.Equals, - int(prom_testutil.ToFloat64(peers.WithLabelValues("foo")))) + int(prom_testutil.ToFloat64(peers.WithLabelValues("foo-192.168.10.11")))) // get id := resp.ID @@ -82,7 +82,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { c.Check(err, check.IsNil) c.Assert(0, check.Equals, - int(prom_testutil.ToFloat64(peers.WithLabelValues("foo")))) + int(prom_testutil.ToFloat64(peers.WithLabelValues("foo-192.168.10.11")))) // get info, err = manager.Get(context.Background(), id) diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 48a14a86e..b6d313893 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -53,7 +53,7 @@ type metrics struct { func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ tasks: metricsutils.NewGauge(config.SubsystemSupernode, "tasks", - "The status of Supernode tasks", []string{"taskid", "cdnstatus"}, register), + "The status of Supernode tasks", []string{"cdnstatus"}, register), triggerCdnCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_total", "The number of triggering cdn", []string{}, register), @@ -62,8 +62,8 @@ func newMetrics(register prometheus.Registerer) *metrics { "The number of triggering cdn failure", []string{}, register), scheduleDurationMilliSeconds: metricsutils.NewHistogram(config.SubsystemSupernode, "schedule_duration_milliseconds", - "duration for task scheduling in milliseconds", []string{"taskid"}, - prometheus.ExponentialBuckets(0.02, 2, 7), register), + "Duration for task scheduling in milliseconds", []string{"peer"}, + prometheus.ExponentialBuckets(0.02, 2, 6), register), } } diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index bc45a307b..c7e7fdc1f 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -30,6 +30,7 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/pkg/errors" @@ -110,7 +111,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, req *types.TaskCreateReq task.PieceTotal = int32((fileLength + (int64(pieceSize) - 1)) / int64(pieceSize)) tm.taskStore.Put(taskID, task) - tm.metrics.tasks.WithLabelValues(taskID, task.CdnStatus).Inc() + tm.metrics.tasks.WithLabelValues(task.CdnStatus).Inc() return task, nil } @@ -162,8 +163,8 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) err // only update the task CdnStatus when the new CDNStatus and // the origin CDNStatus both not equals success - tm.metrics.tasks.WithLabelValues(taskID, task.CdnStatus).Dec() - tm.metrics.tasks.WithLabelValues(taskID, updateTaskInfo.CdnStatus).Inc() + tm.metrics.tasks.WithLabelValues(task.CdnStatus).Dec() + tm.metrics.tasks.WithLabelValues(updateTaskInfo.CdnStatus).Inc() task.CdnStatus = updateTaskInfo.CdnStatus return nil } @@ -178,27 +179,15 @@ func (tm *Manager) updateTask(taskID string, updateTaskInfo *types.TaskInfo) err task.RealMd5 = updateTaskInfo.RealMd5 } - // only update the task info when the new CDNStatus equals success - // and the origin CDNStatus not equals success. - if isSuccessCDN(updateTaskInfo.CdnStatus) { - if updateTaskInfo.FileLength != 0 { - task.FileLength = updateTaskInfo.FileLength - } - - if !stringutils.IsEmptyStr(updateTaskInfo.RealMd5) { - task.RealMd5 = updateTaskInfo.RealMd5 - } - - var pieceTotal int32 - if updateTaskInfo.FileLength > 0 { - pieceTotal = int32((updateTaskInfo.FileLength + int64(task.PieceSize-1)) / int64(task.PieceSize)) - } - if pieceTotal != 0 { - task.PieceTotal = pieceTotal - } + var pieceTotal int32 + if updateTaskInfo.FileLength > 0 { + pieceTotal = int32((updateTaskInfo.FileLength + int64(task.PieceSize-1)) / int64(task.PieceSize)) } - tm.metrics.tasks.WithLabelValues(taskID, task.CdnStatus).Dec() - tm.metrics.tasks.WithLabelValues(taskID, updateTaskInfo.CdnStatus).Inc() + if pieceTotal != 0 { + task.PieceTotal = pieceTotal + } + tm.metrics.tasks.WithLabelValues(task.CdnStatus).Dec() + tm.metrics.tasks.WithLabelValues(updateTaskInfo.CdnStatus).Inc() task.CdnStatus = updateTaskInfo.CdnStatus return nil @@ -342,6 +331,10 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas return true, finishInfo, nil } + // Get peerName to represent peer in metrics. + p, _ := tm.peerMgr.Get(context.Background(), dfgetTask.PeerID) + peerName := peer.GeneratePeerName(p) + // get scheduler pieceResult logrus.Debugf("start scheduler for taskID: %s clientID: %s", task.ID, clientID) startTime := time.Now() @@ -349,7 +342,7 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas if err != nil { return false, nil, err } - tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(task.ID).Observe(timeutils.SinceInMilliseconds(startTime)) + tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peerName).Observe(timeutils.SinceInMilliseconds(startTime)) logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID) var pieceInfos []*types.PieceInfo diff --git a/supernode/daemon/mgr/task/manager_util_test.go b/supernode/daemon/mgr/task/manager_util_test.go index 11fc54eef..8de07404d 100644 --- a/supernode/daemon/mgr/task/manager_util_test.go +++ b/supernode/daemon/mgr/task/manager_util_test.go @@ -18,6 +18,7 @@ package task import ( "context" + "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/mock" diff --git a/supernode/server/metrics.go b/supernode/server/metrics.go index 6045d7cda..2bb4e6ff2 100644 --- a/supernode/server/metrics.go +++ b/supernode/server/metrics.go @@ -26,7 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -// metricsutils defines three prometheus metricsutils for monitoring http handler status +// metrics defines three prometheus metrics for monitoring http handler status type metrics struct { requestCounter *prometheus.CounterVec requestDuration *prometheus.HistogramVec @@ -37,24 +37,24 @@ type metrics struct { func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ requestCounter: metricsutils.NewCounter(config.SubsystemSupernode, "http_requests_total", - "Counter of HTTP requests.", []string{"code", "handler", "method"}, register, + "Counter of HTTP requests.", []string{"code", "handler"}, register, ), requestDuration: metricsutils.NewHistogram(config.SubsystemSupernode, "http_request_duration_seconds", - "Histogram of latencies for HTTP requests.", []string{"code", "handler", "method"}, + "Histogram of latencies for HTTP requests.", []string{"handler"}, []float64{.1, .2, .4, 1, 3, 8, 20, 60, 120}, register, ), requestSize: metricsutils.NewHistogram(config.SubsystemSupernode, "http_request_size_bytes", - "Histogram of request size for HTTP requests.", []string{"code", "handler", "method"}, + "Histogram of request size for HTTP requests.", []string{"handler"}, prometheus.ExponentialBuckets(100, 10, 8), register, ), responseSize: metricsutils.NewHistogram(config.SubsystemSupernode, "http_response_size_bytes", - "Histogram of response size for HTTP requests.", []string{"code", "handler", "method"}, + "Histogram of response size for HTTP requests.", []string{"handler"}, prometheus.ExponentialBuckets(100, 10, 8), register, ), } } -// instrumentHandler will update metricsutils for every http request +// instrumentHandler will update metrics for every http request func (m *metrics) instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc { return promhttp.InstrumentHandlerDuration( m.requestDuration.MustCurryWith(prometheus.Labels{"handler": handlerName}), diff --git a/supernode/server/router.go b/supernode/server/router.go index f5edf3734..b4cc49428 100644 --- a/supernode/server/router.go +++ b/supernode/server/router.go @@ -55,7 +55,8 @@ func initRoute(s *Server) *mux.Router { {Method: http.MethodGet, Path: "/peers/{id}", HandlerFunc: s.getPeer}, {Method: http.MethodGet, Path: "/peers", HandlerFunc: s.listPeers}, - {Method: http.MethodGet, Path: "/metricsutils", HandlerFunc: handleMetrics}, + // metrics + {Method: http.MethodGet, Path: "/metrics", HandlerFunc: handleMetrics}, } // register API diff --git a/supernode/server/router_test.go b/supernode/server/router_test.go index b8feaa80d..83711cdfb 100644 --- a/supernode/server/router_test.go +++ b/supernode/server/router_test.go @@ -129,20 +129,20 @@ func (rs *RouterTestSuite) TestVersionHandler(c *check.C) { } func (rs *RouterTestSuite) TestHTTPMetrics(c *check.C) { - // ensure /metricsutils is accessible - code, _, err := httputils.Get("http://"+rs.addr+"/metricsutils", 0) + // ensure /metrics is accessible + code, _, err := httputils.Get("http://"+rs.addr+"/metrics", 0) c.Check(err, check.IsNil) c.Assert(code, check.Equals, 200) counter := m.requestCounter c.Assert(1, check.Equals, - int(prom_testutil.ToFloat64(counter.WithLabelValues(strconv.Itoa(http.StatusOK), "/metricsutils", "get")))) + int(prom_testutil.ToFloat64(counter.WithLabelValues(strconv.Itoa(http.StatusOK), "/metrics")))) for i := 0; i < 5; i++ { code, _, err := httputils.Get("http://"+rs.addr+"/_ping", 0) c.Check(err, check.IsNil) c.Assert(code, check.Equals, 200) c.Assert(i+1, check.Equals, - int(prom_testutil.ToFloat64(counter.WithLabelValues(strconv.Itoa(http.StatusOK), "/_ping", "get")))) + int(prom_testutil.ToFloat64(counter.WithLabelValues(strconv.Itoa(http.StatusOK), "/_ping")))) } } diff --git a/test/api_metrics_test.go b/test/api_metrics_test.go index 969125a87..1aab3609c 100644 --- a/test/api_metrics_test.go +++ b/test/api_metrics_test.go @@ -25,7 +25,7 @@ import ( "github.com/go-check/check" ) -// APIMetricsSuite is the test suite for Prometheus metricsutils. +// APIMetricsSuite is the test suite for Prometheus metrics. type APIMetricsSuite struct { starter *command.Starter } @@ -46,37 +46,37 @@ func (s *APIMetricsSuite) TearDownSuite(c *check.C) { s.starter.Clean() } -// TestMetrics tests /metricsutils API. +// TestMetrics tests /metrics API. func (s *APIMetricsSuite) TestMetrics(c *check.C) { - resp, err := request.Get("/metricsutils") + resp, err := request.Get("/metrics") c.Assert(err, check.IsNil) defer resp.Body.Close() CheckRespStatus(c, resp, 200) } -// TestMetricsRequestTotal tests http-related metricsutils. +// TestMetricsRequestTotal tests http-related metrics. func (s *APIMetricsSuite) TestHttpMetrics(c *check.C) { - requestCounter := `dragonfly_supernode_http_requests_total{code="%d",handler="%s",method="%s"}` - responseSizeSum := `dragonfly_supernode_http_response_size_bytes_sum{code="%d",handler="%s",method="%s"}` - responseSizeCount := `dragonfly_supernode_http_response_size_bytes_count{code="%d",handler="%s",method="%s"}` - requestSizeCount := `dragonfly_supernode_http_request_size_bytes_count{code="%d",handler="%s",method="%s"}` + requestCounter := `dragonfly_supernode_http_requests_total{code="%d",handler="%s"}` + responseSizeSum := `dragonfly_supernode_http_response_size_bytes_sum{handler="%s"}` + responseSizeCount := `dragonfly_supernode_http_response_size_bytes_count{handler="%s"}` + requestSizeCount := `dragonfly_supernode_http_request_size_bytes_count{handler="%s"}` resp, err := request.Get("/_ping") c.Assert(err, check.IsNil) CheckRespStatus(c, resp, 200) // Get httpRequest counter value equals 1. - CheckMetric(c, fmt.Sprintf(requestCounter, 200, "/_ping", "get"), 1) + CheckMetric(c, fmt.Sprintf(requestCounter, 200, "/_ping"), 1) // Get httpResponse size sum value equals 2. - CheckMetric(c, fmt.Sprintf(responseSizeSum, 200, "/_ping", "get"), 2) + CheckMetric(c, fmt.Sprintf(responseSizeSum, "/_ping"), 2) // Get httpResponse size count value equals 1. - CheckMetric(c, fmt.Sprintf(responseSizeCount, 200, "/_ping", "get"), 1) + CheckMetric(c, fmt.Sprintf(responseSizeCount, "/_ping"), 1) // Get httpRequest size count value equals 1. - CheckMetric(c, fmt.Sprintf(requestSizeCount, 200, "/_ping", "get"), 1) + CheckMetric(c, fmt.Sprintf(requestSizeCount, "/_ping"), 1) } // TestBuildInfoMetrics tests build info metric. diff --git a/test/util_api.go b/test/util_api.go index b6901ba86..a4d4d1e72 100644 --- a/test/util_api.go +++ b/test/util_api.go @@ -36,11 +36,11 @@ func CheckRespStatus(c *check.C, resp *http.Response, status int) { } } -// CheckMetric find the specific metric from /metricsutils endpoint and it will compare the metric +// CheckMetric find the specific metric from /metrics endpoint and it will compare the metric // value with expected value. func CheckMetric(c *check.C, metric string, value float64) { var val float64 - resp, err := request.Get("/metricsutils") + resp, err := request.Get("/metrics") c.Assert(err, check.IsNil) defer resp.Body.Close() data, err := ioutil.ReadAll(resp.Body) @@ -50,7 +50,7 @@ func CheckMetric(c *check.C, metric string, value float64) { if strings.Contains(line, metric) { vals := strings.Split(line, " ") if len(vals) != 2 { - c.Errorf("bad metricsutils format") + c.Errorf("bad metrics format") } val, err = strconv.ParseFloat(vals[1], 64) c.Assert(err, check.IsNil) From f0912509bf2351df41ef815641b9d134a589b615 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 5 Aug 2019 21:28:25 +0800 Subject: [PATCH 4/4] add register peers,tasks,dfgettasks counter metrics Signed-off-by: yeya24 --- docs/user_guide/metrics.md | 5 ++- supernode/daemon/mgr/dfgettask/manager.go | 32 ++++++++++++++++--- .../daemon/mgr/dfgettask/manager_test.go | 30 ++++++++++++++--- supernode/daemon/mgr/peer/manager.go | 11 ++----- supernode/daemon/mgr/peer/manager_test.go | 5 ++- supernode/daemon/mgr/task/manager.go | 14 +++++--- supernode/daemon/mgr/task/manager_test.go | 4 +++ supernode/daemon/mgr/task/manager_util.go | 7 ++-- supernode/server/metrics.go | 2 +- supernode/server/server.go | 5 +-- 10 files changed, 81 insertions(+), 34 deletions(-) diff --git a/docs/user_guide/metrics.md b/docs/user_guide/metrics.md index 3e50461c2..8c8fddbb6 100644 --- a/docs/user_guide/metrics.md +++ b/docs/user_guide/metrics.md @@ -11,7 +11,10 @@ This doc contains all the metrics that Dragonfly components currently support. N - dragonfly_supernode_http_response_size_bytes{code, handler, method} - http response size in bytes - dragonfly_supernode_peers{peer} - dragonfly peers, the label peer consists of the hostname and ip address of one peer. - dragonfly_supernode_tasks{cdnstatus} - dragonfly tasks -- dragonfly_supernode_dfgettasks{callsystem} - dragonfly dfget tasks +- dragonfly_supernode_tasks_registered_total{} - total times of registering new tasks. counter type. +- dragonfly_supernode_dfgettasks{callsystem, status} - dragonfly dfget tasks +- dragonfly_supernode_dfgettasks_registered_total{callsystem} - total times of registering new dfgettasks. counter type. +- dragonfly_supernode_dfgettasks_failed_total{callsystem} - total times of failed dfgettasks. counter type. - dragonfly_supernode_schedule_duration_milliseconds{peer} - duration for task scheduling in milliseconds - dragonfly_supernode_trigger_cdn_total{} - total times of triggering cdn. - dragonfly_supernode_trigger_cdn_failed_total{} - total failed times of triggering cdn. diff --git a/supernode/daemon/mgr/dfgettask/manager.go b/supernode/daemon/mgr/dfgettask/manager.go index 1173d98d8..4807264ff 100644 --- a/supernode/daemon/mgr/dfgettask/manager.go +++ b/supernode/daemon/mgr/dfgettask/manager.go @@ -36,26 +36,36 @@ import ( var _ mgr.DfgetTaskMgr = &Manager{} type metrics struct { - dfgetTasks *prometheus.GaugeVec + dfgetTasks *prometheus.GaugeVec + dfgetTasksRegisterCount *prometheus.CounterVec + dfgetTasksFailCount *prometheus.CounterVec } func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ dfgetTasks: metricsutils.NewGauge(config.SubsystemSupernode, "dfgettasks", - "The number of dfget tasks", []string{"callsystem"}, register), + "Current status of dfgettasks", []string{"callsystem", "status"}, register), + + dfgetTasksRegisterCount: metricsutils.NewCounter(config.SubsystemSupernode, "dfgettasks_registered_total", + "Total times of registering dfgettasks", []string{"callsystem"}, register), + + dfgetTasksFailCount: metricsutils.NewCounter(config.SubsystemSupernode, "dfgettasks_failed_total", + "Total failure times of dfgettasks", []string{"callsystem"}, register), } } // Manager is an implementation of the interface of DfgetTaskMgr. type Manager struct { + cfg *config.Config dfgetTaskStore *dutil.Store ptoc *syncmap.SyncMap metrics *metrics } // NewManager returns a new Manager. -func NewManager(register prometheus.Registerer) (*Manager, error) { +func NewManager(cfg *config.Config, register prometheus.Registerer) (*Manager, error) { return &Manager{ + cfg: cfg, dfgetTaskStore: dutil.NewStore(), ptoc: syncmap.NewSyncMap(), metrics: newMetrics(register), @@ -89,7 +99,12 @@ func (dtm *Manager) Add(ctx context.Context, dfgetTask *types.DfGetTask) error { dtm.ptoc.Add(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID), dfgetTask.CID) dtm.dfgetTaskStore.Put(key, dfgetTask) - dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem).Inc() + + // If dfget task is created by supernode cdn, don't update metrics. + if !dtm.cfg.IsSuperPID(dfgetTask.PeerID) || !dtm.cfg.IsSuperCID(dfgetTask.CID) { + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, dfgetTask.Status).Inc() + dtm.metrics.dfgetTasksRegisterCount.WithLabelValues(dfgetTask.CallSystem).Inc() + } return nil } @@ -121,7 +136,7 @@ func (dtm *Manager) Delete(ctx context.Context, clientID, taskID string) error { return err } dtm.ptoc.Delete(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID)) - dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem).Dec() + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, dfgetTask.Status).Dec() return dtm.dfgetTaskStore.Delete(key) } @@ -133,9 +148,16 @@ func (dtm *Manager) UpdateStatus(ctx context.Context, clientID, taskID, status s } if dfgetTask.Status != types.DfGetTaskStatusSUCCESS { + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, dfgetTask.Status).Dec() + dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, status).Inc() dfgetTask.Status = status } + // Add the total failed count. + if dfgetTask.Status == types.DfGetTaskStatusFAILED { + dtm.metrics.dfgetTasksFailCount.WithLabelValues(dfgetTask.CallSystem).Inc() + } + return nil } diff --git a/supernode/daemon/mgr/dfgettask/manager_test.go b/supernode/daemon/mgr/dfgettask/manager_test.go index 15fc822c4..e21ea0a36 100644 --- a/supernode/daemon/mgr/dfgettask/manager_test.go +++ b/supernode/daemon/mgr/dfgettask/manager_test.go @@ -22,6 +22,7 @@ import ( "github.com/dragonflyoss/Dragonfly/apis/types" "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/go-check/check" "github.com/prometheus/client_golang/prometheus" @@ -37,11 +38,18 @@ func init() { } type DfgetTaskMgrTestSuite struct { + cfg *config.Config +} + +func (s *DfgetTaskMgrTestSuite) SetUpSuite(c *check.C) { + s.cfg = config.NewConfig() + s.cfg.SetCIDPrefix("127.0.0.1") } func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { - manager, _ := NewManager(prometheus.NewRegistry()) + manager, _ := NewManager(s.cfg, prometheus.NewRegistry()) dfgetTasks := manager.metrics.dfgetTasks + dfgetTasksRegisterCount := manager.metrics.dfgetTasksRegisterCount var testCases = []struct { dfgetTask *types.DfGetTask @@ -96,7 +104,11 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { c.Check(err, check.IsNil) c.Assert(1, check.Equals, int(prom_testutil.ToFloat64( - dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem)))) + dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem, tc.dfgetTask.Status)))) + + c.Assert(1, check.Equals, + int(prom_testutil.ToFloat64( + dfgetTasksRegisterCount.WithLabelValues(tc.dfgetTask.CallSystem)))) dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) c.Check(err, check.IsNil) c.Check(dt, check.DeepEquals, tc.Expect) @@ -104,7 +116,9 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) { } func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) { - manager, _ := NewManager(prometheus.NewRegistry()) + manager, _ := NewManager(s.cfg, prometheus.NewRegistry()) + dfgetTasksFailCount := manager.metrics.dfgetTasksFailCount + var testCases = []struct { dfgetTask *types.DfGetTask taskStatus string @@ -163,13 +177,19 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) { err = manager.UpdateStatus(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID, tc.taskStatus) c.Check(err, check.IsNil) + if tc.taskStatus == types.DfGetTaskStatusFAILED { + c.Assert(1, check.Equals, + int(prom_testutil.ToFloat64( + dfgetTasksFailCount.WithLabelValues(tc.dfgetTask.CallSystem)))) + } + dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) c.Check(dt, check.DeepEquals, tc.Expect) } } func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) { - manager, _ := NewManager(prometheus.NewRegistry()) + manager, _ := NewManager(s.cfg, prometheus.NewRegistry()) dfgetTasks := manager.metrics.dfgetTasks var testCases = []struct { @@ -207,7 +227,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) { c.Check(err, check.IsNil) c.Assert(0, check.Equals, int(prom_testutil.ToFloat64( - dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem)))) + dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem, tc.dfgetTask.Status)))) _, err = manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID) c.Check(errortypes.IsDataNotFound(err), check.Equals, true) diff --git a/supernode/daemon/mgr/peer/manager.go b/supernode/daemon/mgr/peer/manager.go index 143587850..6dffac9bf 100644 --- a/supernode/daemon/mgr/peer/manager.go +++ b/supernode/daemon/mgr/peer/manager.go @@ -44,7 +44,7 @@ type metrics struct { func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ peers: metricsutils.NewGauge(config.SubsystemSupernode, "peers", - "The number of supernode peers", []string{"peer"}, register), + "Current status of peers", []string{"peer"}, register), } } @@ -83,7 +83,7 @@ func (pm *Manager) Register(ctx context.Context, peerCreateRequest *types.PeerCr Created: strfmt.DateTime(time.Now()), } pm.peerStore.Put(id, peerInfo) - pm.metrics.peers.WithLabelValues(GeneratePeerName(peerInfo)).Inc() + pm.metrics.peers.WithLabelValues(peerInfo.IP.String()).Inc() return &types.PeerCreateResponse{ ID: id, @@ -98,7 +98,7 @@ func (pm *Manager) DeRegister(ctx context.Context, peerID string) error { } pm.peerStore.Delete(peerID) - pm.metrics.peers.WithLabelValues(GeneratePeerName(peerInfo)).Dec() + pm.metrics.peers.WithLabelValues(peerInfo.IP.String()).Dec() return nil } @@ -200,8 +200,3 @@ func getLessFunc(listResult []interface{}, desc bool) (less func(i, j int) bool) func generatePeerID(peerInfo *types.PeerCreateRequest) string { return fmt.Sprintf("%s-%s-%d", peerInfo.HostName.String(), peerInfo.IP.String(), time.Now().UnixNano()) } - -// GeneratePeerName extracts the hostname and ip from peerInfo. -func GeneratePeerName(info *types.PeerInfo) string { - return info.HostName.String() + "-" + info.IP.String() -} diff --git a/supernode/daemon/mgr/peer/manager_test.go b/supernode/daemon/mgr/peer/manager_test.go index 6af1f2457..db053e15d 100644 --- a/supernode/daemon/mgr/peer/manager_test.go +++ b/supernode/daemon/mgr/peer/manager_test.go @@ -56,8 +56,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { c.Check(err, check.IsNil) c.Assert(1, check.Equals, - int(prom_testutil.ToFloat64(peers.WithLabelValues("foo-192.168.10.11")))) - + int(prom_testutil.ToFloat64(peers.WithLabelValues("192.168.10.11")))) // get id := resp.ID info, err := manager.Get(context.Background(), id) @@ -82,7 +81,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) { c.Check(err, check.IsNil) c.Assert(0, check.Equals, - int(prom_testutil.ToFloat64(peers.WithLabelValues("foo-192.168.10.11")))) + int(prom_testutil.ToFloat64(peers.WithLabelValues("192.168.10.11")))) // get info, err = manager.Get(context.Background(), id) diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index b6d313893..e55dc44be 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -45,6 +45,7 @@ var _ mgr.TaskMgr = &Manager{} type metrics struct { tasks *prometheus.GaugeVec + tasksRegisterCount *prometheus.CounterVec triggerCdnCount *prometheus.CounterVec triggerCdnFailCount *prometheus.CounterVec scheduleDurationMilliSeconds *prometheus.HistogramVec @@ -53,13 +54,16 @@ type metrics struct { func newMetrics(register prometheus.Registerer) *metrics { return &metrics{ tasks: metricsutils.NewGauge(config.SubsystemSupernode, "tasks", - "The status of Supernode tasks", []string{"cdnstatus"}, register), + "Current status of Supernode tasks", []string{"cdnstatus"}, register), + + tasksRegisterCount: metricsutils.NewCounter(config.SubsystemSupernode, "tasks_registered_total", + "Total times of registering tasks", []string{}, register), triggerCdnCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_total", - "The number of triggering cdn", []string{}, register), + "Total times of triggering cdn", []string{}, register), triggerCdnFailCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_failed_total", - "The number of triggering cdn failure", []string{}, register), + "Total failure times of triggering cdn", []string{}, register), scheduleDurationMilliSeconds: metricsutils.NewHistogram(config.SubsystemSupernode, "schedule_duration_milliseconds", "Duration for task scheduling in milliseconds", []string{"peer"}, @@ -87,7 +91,8 @@ type Manager struct { // NewManager returns a new Manager Object. func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetTaskMgr, - progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr, originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) { + progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr, + originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) { return &Manager{ cfg: cfg, taskStore: dutil.NewStore(), @@ -118,6 +123,7 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) ( logrus.Infof("failed to add or update task with req %+v: %v", req, err) return nil, err } + tm.metrics.tasksRegisterCount.WithLabelValues().Inc() logrus.Debugf("success to get task info: %+v", task) // TODO: defer rollback the task update diff --git a/supernode/daemon/mgr/task/manager_test.go b/supernode/daemon/mgr/task/manager_test.go index eb0dcdccd..59c2b604d 100644 --- a/supernode/daemon/mgr/task/manager_test.go +++ b/supernode/daemon/mgr/task/manager_test.go @@ -30,6 +30,7 @@ import ( "github.com/go-check/check" "github.com/golang/mock/gomock" "github.com/prometheus/client_golang/prometheus" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" ) func Test(t *testing.T) { @@ -76,6 +77,7 @@ func (s *TaskMgrTestSuite) TearDownSuite(c *check.C) { } func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) { + tasksRegisterCount := s.taskManager.metrics.tasksRegisterCount s.taskManager.taskStore = dutil.NewStore() req := &types.TaskCreateRequest{ CID: "cid", @@ -87,6 +89,8 @@ func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) { } resp, err := s.taskManager.Register(context.Background(), req) c.Check(err, check.IsNil) + c.Assert(1, check.Equals, + int(prom_testutil.ToFloat64(tasksRegisterCount.WithLabelValues()))) isSuccess, err := s.taskManager.CheckTaskStatus(context.Background(), resp.ID) c.Check(err, check.IsNil) diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index c7e7fdc1f..4e1fe414b 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -30,7 +30,6 @@ import ( "github.com/dragonflyoss/Dragonfly/pkg/timeutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" - "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/pkg/errors" @@ -332,9 +331,7 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas } // Get peerName to represent peer in metrics. - p, _ := tm.peerMgr.Get(context.Background(), dfgetTask.PeerID) - peerName := peer.GeneratePeerName(p) - + peer, _ := tm.peerMgr.Get(context.Background(), dfgetTask.PeerID) // get scheduler pieceResult logrus.Debugf("start scheduler for taskID: %s clientID: %s", task.ID, clientID) startTime := time.Now() @@ -342,7 +339,7 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas if err != nil { return false, nil, err } - tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peerName).Observe(timeutils.SinceInMilliseconds(startTime)) + tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peer.IP.String()).Observe(timeutils.SinceInMilliseconds(startTime)) logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID) var pieceInfos []*types.PieceInfo diff --git a/supernode/server/metrics.go b/supernode/server/metrics.go index 2bb4e6ff2..20aa7eb8d 100644 --- a/supernode/server/metrics.go +++ b/supernode/server/metrics.go @@ -26,7 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -// metrics defines three prometheus metrics for monitoring http handler status +// metrics defines some prometheus metrics for monitoring supernode type metrics struct { requestCounter *prometheus.CounterVec requestDuration *prometheus.HistogramVec diff --git a/supernode/server/server.go b/supernode/server/server.go index c068aceb9..486b71457 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -68,7 +68,7 @@ func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) { return nil, err } - dfgetTaskMgr, err := dfgettask.NewManager(register) + dfgetTaskMgr, err := dfgettask.NewManager(cfg, register) if err != nil { return nil, err } @@ -88,7 +88,8 @@ func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) { return nil, err } - taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr, schedulerMgr, originClient, register) + taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr, + schedulerMgr, originClient, register) if err != nil { return nil, err }