From d1b8ee66e42ffffd169fd00297d717506220d569 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 3 Mar 2022 15:18:37 +0800 Subject: [PATCH] fix: scheduler piece cost time (#1118) Signed-off-by: Gaius --- pkg/util/timeutils/time_utils.go | 10 +++++ pkg/util/timeutils/time_utils_test.go | 58 ++++++++++++++++++++++++++- scheduler/resource/cdn.go | 4 +- scheduler/service/service.go | 3 +- 4 files changed, 71 insertions(+), 4 deletions(-) diff --git a/pkg/util/timeutils/time_utils.go b/pkg/util/timeutils/time_utils.go index 9bcc6faa371..6538f5f2807 100644 --- a/pkg/util/timeutils/time_utils.go +++ b/pkg/util/timeutils/time_utils.go @@ -65,3 +65,13 @@ func UnixSeconds(timeString string) int64 { func SecondsUnixTime(seconds int64) time.Time { return time.Unix(seconds, 0) } + +// NanoToTime converts an int64 nanoseconds to a time +func NanoToTime(nsec int64) time.Time { + return time.Unix(0, nsec) +} + +// SubNano returns the difference between two nanoseconds +func SubNano(x int64, y int64) int64 { + return NanoToTime(x).Sub(NanoToTime(y)).Nanoseconds() +} diff --git a/pkg/util/timeutils/time_utils_test.go b/pkg/util/timeutils/time_utils_test.go index 99c4c689725..33b5566cce1 100644 --- a/pkg/util/timeutils/time_utils_test.go +++ b/pkg/util/timeutils/time_utils_test.go @@ -139,7 +139,63 @@ func TestSecondsUnixTime(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if got := SecondsUnixTime(tt.args); got != tt.want { - t.Errorf("MillisUnixTime() = %v, want %v", got, tt.want) + t.Errorf("SecondsUnixTime() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNanoToTime(t *testing.T) { + const Layout = "2006-01-02 15:04:05" + sample, _ := time.ParseInLocation(Layout, "2021-01-02 12:04:05", time.Local) + now := time.Now() + tests := []struct { + name string + args int64 + want time.Time + }{ + { + name: "convert an int64 nanosecond to a time", + args: sample.UnixNano(), + want: sample.Local(), + }, + { + name: "convert now", + args: now.UnixNano(), + want: time.Unix(0, now.UnixNano()), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NanoToTime(tt.args); got != tt.want { + t.Errorf("NanoToTime() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSubNano(t *testing.T) { + now := time.Now() + tests := []struct { + name string + args []int64 + want int64 + }{ + { + name: "nanoseconds are not equal", + args: []int64{now.Add(1 * time.Nanosecond).UnixNano(), now.UnixNano()}, + want: 1, + }, + { + name: "nanoseconds are equal", + args: []int64{now.UnixNano(), now.UnixNano()}, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := SubNano(tt.args[0], tt.args[1]); got != tt.want { + t.Errorf("SubNano() = %v, want %v", got, tt.want) } }) } diff --git a/scheduler/resource/cdn.go b/scheduler/resource/cdn.go index f8632d65ad7..1ac3b993f98 100644 --- a/scheduler/resource/cdn.go +++ b/scheduler/resource/cdn.go @@ -33,6 +33,7 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" cdnclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" rpcscheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "d7y.io/dragonfly/v2/pkg/util/timeutils" "d7y.io/dragonfly/v2/scheduler/config" ) @@ -123,8 +124,7 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler // Handle piece download successfully peer.Log.Infof("receive piece from cdn: %#v %#v", piece, piece.PieceInfo) peer.Pieces.Set(uint(piece.PieceInfo.PieceNum)) - // TODO(244372610) CDN should set piece cost - peer.AppendPieceCost(0) + peer.AppendPieceCost(timeutils.SubNano(int64(piece.EndTime), int64(piece.BeginTime))) task.StorePiece(piece.PieceInfo) } } diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 043e8934e6d..9c15dbea98f 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -28,6 +28,7 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/base/common" rpcscheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "d7y.io/dragonfly/v2/pkg/util/timeutils" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/resource" @@ -479,7 +480,7 @@ func (s *Service) handleEndOfPiece(ctx context.Context, peer *resource.Peer) {} func (s *Service) handlePieceSuccess(ctx context.Context, peer *resource.Peer, piece *rpcscheduler.PieceResult) { // Update peer piece info peer.Pieces.Set(uint(piece.PieceInfo.PieceNum)) - peer.AppendPieceCost(int64(piece.EndTime - piece.BeginTime)) + peer.AppendPieceCost(timeutils.SubNano(int64(piece.EndTime), int64(piece.BeginTime))) // When the peer downloads back-to-source, // piece downloads successfully updates the task piece info