Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
add register peers,tasks,dfgettasks counter metrics
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed Aug 14, 2019
1 parent 4053d7a commit f091250
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 34 deletions.
5 changes: 4 additions & 1 deletion docs/user_guide/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ This doc contains all the metrics that Dragonfly components currently support. N
- dragonfly_supernode_http_response_size_bytes{code, handler, method} - http response size in bytes
- dragonfly_supernode_peers{peer} - dragonfly peers, the label peer consists of the hostname and ip address of one peer.
- dragonfly_supernode_tasks{cdnstatus} - dragonfly tasks
- dragonfly_supernode_dfgettasks{callsystem} - dragonfly dfget tasks
- dragonfly_supernode_tasks_registered_total{} - total times of registering new tasks. counter type.
- dragonfly_supernode_dfgettasks{callsystem, status} - dragonfly dfget tasks
- dragonfly_supernode_dfgettasks_registered_total{callsystem} - total times of registering new dfgettasks. counter type.
- dragonfly_supernode_dfgettasks_failed_total{callsystem} - total times of failed dfgettasks. counter type.
- dragonfly_supernode_schedule_duration_milliseconds{peer} - duration for task scheduling in milliseconds
- dragonfly_supernode_trigger_cdn_total{} - total times of triggering cdn.
- dragonfly_supernode_trigger_cdn_failed_total{} - total failed times of triggering cdn.
Expand Down
32 changes: 27 additions & 5 deletions supernode/daemon/mgr/dfgettask/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,36 @@ import (
var _ mgr.DfgetTaskMgr = &Manager{}

type metrics struct {
dfgetTasks *prometheus.GaugeVec
dfgetTasks *prometheus.GaugeVec
dfgetTasksRegisterCount *prometheus.CounterVec
dfgetTasksFailCount *prometheus.CounterVec
}

func newMetrics(register prometheus.Registerer) *metrics {
return &metrics{
dfgetTasks: metricsutils.NewGauge(config.SubsystemSupernode, "dfgettasks",
"The number of dfget tasks", []string{"callsystem"}, register),
"Current status of dfgettasks", []string{"callsystem", "status"}, register),

dfgetTasksRegisterCount: metricsutils.NewCounter(config.SubsystemSupernode, "dfgettasks_registered_total",
"Total times of registering dfgettasks", []string{"callsystem"}, register),

dfgetTasksFailCount: metricsutils.NewCounter(config.SubsystemSupernode, "dfgettasks_failed_total",
"Total failure times of dfgettasks", []string{"callsystem"}, register),
}
}

// Manager is an implementation of the interface of DfgetTaskMgr.
type Manager struct {
cfg *config.Config
dfgetTaskStore *dutil.Store
ptoc *syncmap.SyncMap
metrics *metrics
}

// NewManager returns a new Manager.
func NewManager(register prometheus.Registerer) (*Manager, error) {
func NewManager(cfg *config.Config, register prometheus.Registerer) (*Manager, error) {
return &Manager{
cfg: cfg,
dfgetTaskStore: dutil.NewStore(),
ptoc: syncmap.NewSyncMap(),
metrics: newMetrics(register),
Expand Down Expand Up @@ -89,7 +99,12 @@ func (dtm *Manager) Add(ctx context.Context, dfgetTask *types.DfGetTask) error {

dtm.ptoc.Add(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID), dfgetTask.CID)
dtm.dfgetTaskStore.Put(key, dfgetTask)
dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem).Inc()

// If dfget task is created by supernode cdn, don't update metrics.
if !dtm.cfg.IsSuperPID(dfgetTask.PeerID) || !dtm.cfg.IsSuperCID(dfgetTask.CID) {
dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, dfgetTask.Status).Inc()
dtm.metrics.dfgetTasksRegisterCount.WithLabelValues(dfgetTask.CallSystem).Inc()
}

return nil
}
Expand Down Expand Up @@ -121,7 +136,7 @@ func (dtm *Manager) Delete(ctx context.Context, clientID, taskID string) error {
return err
}
dtm.ptoc.Delete(generatePeerKey(dfgetTask.PeerID, dfgetTask.TaskID))
dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem).Dec()
dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, dfgetTask.Status).Dec()
return dtm.dfgetTaskStore.Delete(key)
}

Expand All @@ -133,9 +148,16 @@ func (dtm *Manager) UpdateStatus(ctx context.Context, clientID, taskID, status s
}

if dfgetTask.Status != types.DfGetTaskStatusSUCCESS {
dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, dfgetTask.Status).Dec()
dtm.metrics.dfgetTasks.WithLabelValues(dfgetTask.CallSystem, status).Inc()
dfgetTask.Status = status
}

// Add the total failed count.
if dfgetTask.Status == types.DfGetTaskStatusFAILED {
dtm.metrics.dfgetTasksFailCount.WithLabelValues(dfgetTask.CallSystem).Inc()
}

return nil
}

Expand Down
30 changes: 25 additions & 5 deletions supernode/daemon/mgr/dfgettask/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/supernode/config"

"github.com/go-check/check"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -37,11 +38,18 @@ func init() {
}

type DfgetTaskMgrTestSuite struct {
cfg *config.Config
}

func (s *DfgetTaskMgrTestSuite) SetUpSuite(c *check.C) {
s.cfg = config.NewConfig()
s.cfg.SetCIDPrefix("127.0.0.1")
}

func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) {
manager, _ := NewManager(prometheus.NewRegistry())
manager, _ := NewManager(s.cfg, prometheus.NewRegistry())
dfgetTasks := manager.metrics.dfgetTasks
dfgetTasksRegisterCount := manager.metrics.dfgetTasksRegisterCount

var testCases = []struct {
dfgetTask *types.DfGetTask
Expand Down Expand Up @@ -96,15 +104,21 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskAdd(c *check.C) {
c.Check(err, check.IsNil)
c.Assert(1, check.Equals,
int(prom_testutil.ToFloat64(
dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem))))
dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem, tc.dfgetTask.Status))))

c.Assert(1, check.Equals,
int(prom_testutil.ToFloat64(
dfgetTasksRegisterCount.WithLabelValues(tc.dfgetTask.CallSystem))))
dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID)
c.Check(err, check.IsNil)
c.Check(dt, check.DeepEquals, tc.Expect)
}
}

func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) {
manager, _ := NewManager(prometheus.NewRegistry())
manager, _ := NewManager(s.cfg, prometheus.NewRegistry())
dfgetTasksFailCount := manager.metrics.dfgetTasksFailCount

var testCases = []struct {
dfgetTask *types.DfGetTask
taskStatus string
Expand Down Expand Up @@ -163,13 +177,19 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskUpdate(c *check.C) {
err = manager.UpdateStatus(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID, tc.taskStatus)
c.Check(err, check.IsNil)

if tc.taskStatus == types.DfGetTaskStatusFAILED {
c.Assert(1, check.Equals,
int(prom_testutil.ToFloat64(
dfgetTasksFailCount.WithLabelValues(tc.dfgetTask.CallSystem))))
}

dt, err := manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID)
c.Check(dt, check.DeepEquals, tc.Expect)
}
}

func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) {
manager, _ := NewManager(prometheus.NewRegistry())
manager, _ := NewManager(s.cfg, prometheus.NewRegistry())
dfgetTasks := manager.metrics.dfgetTasks

var testCases = []struct {
Expand Down Expand Up @@ -207,7 +227,7 @@ func (s *DfgetTaskMgrTestSuite) TestDfgetTaskDelete(c *check.C) {
c.Check(err, check.IsNil)
c.Assert(0, check.Equals,
int(prom_testutil.ToFloat64(
dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem))))
dfgetTasks.WithLabelValues(tc.dfgetTask.CallSystem, tc.dfgetTask.Status))))

_, err = manager.Get(context.Background(), tc.dfgetTask.CID, tc.dfgetTask.TaskID)
c.Check(errortypes.IsDataNotFound(err), check.Equals, true)
Expand Down
11 changes: 3 additions & 8 deletions supernode/daemon/mgr/peer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type metrics struct {
func newMetrics(register prometheus.Registerer) *metrics {
return &metrics{
peers: metricsutils.NewGauge(config.SubsystemSupernode, "peers",
"The number of supernode peers", []string{"peer"}, register),
"Current status of peers", []string{"peer"}, register),
}
}

Expand Down Expand Up @@ -83,7 +83,7 @@ func (pm *Manager) Register(ctx context.Context, peerCreateRequest *types.PeerCr
Created: strfmt.DateTime(time.Now()),
}
pm.peerStore.Put(id, peerInfo)
pm.metrics.peers.WithLabelValues(GeneratePeerName(peerInfo)).Inc()
pm.metrics.peers.WithLabelValues(peerInfo.IP.String()).Inc()

return &types.PeerCreateResponse{
ID: id,
Expand All @@ -98,7 +98,7 @@ func (pm *Manager) DeRegister(ctx context.Context, peerID string) error {
}

pm.peerStore.Delete(peerID)
pm.metrics.peers.WithLabelValues(GeneratePeerName(peerInfo)).Dec()
pm.metrics.peers.WithLabelValues(peerInfo.IP.String()).Dec()
return nil
}

Expand Down Expand Up @@ -200,8 +200,3 @@ func getLessFunc(listResult []interface{}, desc bool) (less func(i, j int) bool)
func generatePeerID(peerInfo *types.PeerCreateRequest) string {
return fmt.Sprintf("%s-%s-%d", peerInfo.HostName.String(), peerInfo.IP.String(), time.Now().UnixNano())
}

// GeneratePeerName extracts the hostname and ip from peerInfo.
func GeneratePeerName(info *types.PeerInfo) string {
return info.HostName.String() + "-" + info.IP.String()
}
5 changes: 2 additions & 3 deletions supernode/daemon/mgr/peer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) {
c.Check(err, check.IsNil)

c.Assert(1, check.Equals,
int(prom_testutil.ToFloat64(peers.WithLabelValues("foo-192.168.10.11"))))

int(prom_testutil.ToFloat64(peers.WithLabelValues("192.168.10.11"))))
// get
id := resp.ID
info, err := manager.Get(context.Background(), id)
Expand All @@ -82,7 +81,7 @@ func (s *PeerMgrTestSuite) TestPeerMgr(c *check.C) {
c.Check(err, check.IsNil)

c.Assert(0, check.Equals,
int(prom_testutil.ToFloat64(peers.WithLabelValues("foo-192.168.10.11"))))
int(prom_testutil.ToFloat64(peers.WithLabelValues("192.168.10.11"))))

// get
info, err = manager.Get(context.Background(), id)
Expand Down
14 changes: 10 additions & 4 deletions supernode/daemon/mgr/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var _ mgr.TaskMgr = &Manager{}

type metrics struct {
tasks *prometheus.GaugeVec
tasksRegisterCount *prometheus.CounterVec
triggerCdnCount *prometheus.CounterVec
triggerCdnFailCount *prometheus.CounterVec
scheduleDurationMilliSeconds *prometheus.HistogramVec
Expand All @@ -53,13 +54,16 @@ type metrics struct {
func newMetrics(register prometheus.Registerer) *metrics {
return &metrics{
tasks: metricsutils.NewGauge(config.SubsystemSupernode, "tasks",
"The status of Supernode tasks", []string{"cdnstatus"}, register),
"Current status of Supernode tasks", []string{"cdnstatus"}, register),

tasksRegisterCount: metricsutils.NewCounter(config.SubsystemSupernode, "tasks_registered_total",
"Total times of registering tasks", []string{}, register),

triggerCdnCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_total",
"The number of triggering cdn", []string{}, register),
"Total times of triggering cdn", []string{}, register),

triggerCdnFailCount: metricsutils.NewCounter(config.SubsystemSupernode, "trigger_cdn_failed_total",
"The number of triggering cdn failure", []string{}, register),
"Total failure times of triggering cdn", []string{}, register),

scheduleDurationMilliSeconds: metricsutils.NewHistogram(config.SubsystemSupernode, "schedule_duration_milliseconds",
"Duration for task scheduling in milliseconds", []string{"peer"},
Expand Down Expand Up @@ -87,7 +91,8 @@ type Manager struct {

// NewManager returns a new Manager Object.
func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetTaskMgr,
progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr, originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) {
progressMgr mgr.ProgressMgr, cdnMgr mgr.CDNMgr, schedulerMgr mgr.SchedulerMgr,
originClient httpclient.OriginHTTPClient, register prometheus.Registerer) (*Manager, error) {
return &Manager{
cfg: cfg,
taskStore: dutil.NewStore(),
Expand Down Expand Up @@ -118,6 +123,7 @@ func (tm *Manager) Register(ctx context.Context, req *types.TaskCreateRequest) (
logrus.Infof("failed to add or update task with req %+v: %v", req, err)
return nil, err
}
tm.metrics.tasksRegisterCount.WithLabelValues().Inc()
logrus.Debugf("success to get task info: %+v", task)
// TODO: defer rollback the task update

Expand Down
4 changes: 4 additions & 0 deletions supernode/daemon/mgr/task/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/go-check/check"
"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus"
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -76,6 +77,7 @@ func (s *TaskMgrTestSuite) TearDownSuite(c *check.C) {
}

func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) {
tasksRegisterCount := s.taskManager.metrics.tasksRegisterCount
s.taskManager.taskStore = dutil.NewStore()
req := &types.TaskCreateRequest{
CID: "cid",
Expand All @@ -87,6 +89,8 @@ func (s *TaskMgrTestSuite) TestCheckTaskStatus(c *check.C) {
}
resp, err := s.taskManager.Register(context.Background(), req)
c.Check(err, check.IsNil)
c.Assert(1, check.Equals,
int(prom_testutil.ToFloat64(tasksRegisterCount.WithLabelValues())))

isSuccess, err := s.taskManager.CheckTaskStatus(context.Background(), resp.ID)
c.Check(err, check.IsNil)
Expand Down
7 changes: 2 additions & 5 deletions supernode/daemon/mgr/task/manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/dragonflyoss/Dragonfly/pkg/timeutils"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer"
"github.com/dragonflyoss/Dragonfly/supernode/util"

"github.com/pkg/errors"
Expand Down Expand Up @@ -332,17 +331,15 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas
}

// Get peerName to represent peer in metrics.
p, _ := tm.peerMgr.Get(context.Background(), dfgetTask.PeerID)
peerName := peer.GeneratePeerName(p)

peer, _ := tm.peerMgr.Get(context.Background(), dfgetTask.PeerID)
// get scheduler pieceResult
logrus.Debugf("start scheduler for taskID: %s clientID: %s", task.ID, clientID)
startTime := time.Now()
pieceResult, err := tm.schedulerMgr.Schedule(ctx, task.ID, clientID, dfgetTask.PeerID)
if err != nil {
return false, nil, err
}
tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peerName).Observe(timeutils.SinceInMilliseconds(startTime))
tm.metrics.scheduleDurationMilliSeconds.WithLabelValues(peer.IP.String()).Observe(timeutils.SinceInMilliseconds(startTime))
logrus.Debugf("get scheduler result length(%d) with taskID(%s) and clientID(%s)", len(pieceResult), task.ID, clientID)

var pieceInfos []*types.PieceInfo
Expand Down
2 changes: 1 addition & 1 deletion supernode/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// metrics defines three prometheus metrics for monitoring http handler status
// metrics defines some prometheus metrics for monitoring supernode
type metrics struct {
requestCounter *prometheus.CounterVec
requestDuration *prometheus.HistogramVec
Expand Down
5 changes: 3 additions & 2 deletions supernode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) {
return nil, err
}

dfgetTaskMgr, err := dfgettask.NewManager(register)
dfgetTaskMgr, err := dfgettask.NewManager(cfg, register)
if err != nil {
return nil, err
}
Expand All @@ -88,7 +88,8 @@ func New(cfg *config.Config, register prometheus.Registerer) (*Server, error) {
return nil, err
}

taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr, schedulerMgr, originClient, register)
taskMgr, err := task.NewManager(cfg, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr,
schedulerMgr, originClient, register)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f091250

Please sign in to comment.