Skip to content

Commit

Permalink
add CDN piece download cost (#966)
Browse files Browse the repository at this point in the history
Signed-off-by: sunwp <[email protected]>
Co-authored-by: Gaius <[email protected]>
  • Loading branch information
244372610 and gaius-qi authored Mar 3, 2022
1 parent d7ab722 commit e77b5d9
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 124 deletions.
28 changes: 16 additions & 12 deletions cdn/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,19 @@ func (css *Server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest,
PeerId: peerID,
HostUuid: hostID,
PieceInfo: &base.PieceInfo{
PieceNum: int32(piece.PieceNum),
RangeStart: piece.PieceRange.StartIndex,
RangeSize: piece.PieceLen,
PieceMd5: piece.PieceMd5,
PieceOffset: piece.OriginRange.StartIndex,
PieceStyle: piece.PieceStyle,
PieceNum: int32(piece.PieceNum),
RangeStart: piece.PieceRange.StartIndex,
RangeSize: piece.PieceLen,
PieceMd5: piece.PieceMd5,
PieceOffset: piece.OriginRange.StartIndex,
PieceStyle: piece.PieceStyle,
DownloadCost: piece.DownloadCost,
},
Done: false,
ContentLength: registeredTask.SourceFileLength,
TotalPieceCount: registeredTask.TotalPieceCount,
BeginTime: piece.BeginDownloadTime,
EndTime: piece.EndDownloadTime,
}
psc <- pieceSeed
jsonPiece, err := json.Marshal(pieceSeed)
Expand Down Expand Up @@ -205,12 +208,13 @@ func (css *Server) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest
for _, piece := range pieces {
if piece.PieceNum >= req.StartNum && (count < req.Limit || req.Limit <= 0) {
p := &base.PieceInfo{
PieceNum: int32(piece.PieceNum),
RangeStart: piece.PieceRange.StartIndex,
RangeSize: piece.PieceLen,
PieceMd5: piece.PieceMd5,
PieceOffset: piece.OriginRange.StartIndex,
PieceStyle: piece.PieceStyle,
PieceNum: int32(piece.PieceNum),
RangeStart: piece.PieceRange.StartIndex,
RangeSize: piece.PieceLen,
PieceMd5: piece.PieceMd5,
PieceOffset: piece.OriginRange.StartIndex,
PieceStyle: piece.PieceStyle,
DownloadCost: piece.DownloadCost,
}
pieceInfos = append(pieceInfos, p)
count++
Expand Down
31 changes: 21 additions & 10 deletions cdn/supervisor/cdn/cache_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"io"
"sync"
"time"

"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
Expand All @@ -39,10 +40,13 @@ import (
)

type piece struct {
taskID string
pieceNum uint32
pieceSize uint32
pieceContent *bytes.Buffer
taskID string
pieceNum uint32
pieceSize uint32
pieceContent *bytes.Buffer
downloadCost uint64
beginDownloadTime uint64
endDownloadTime uint64
}

type downloadMetadata struct {
Expand Down Expand Up @@ -126,6 +130,7 @@ loop:
case <-writeCtx.Done():
break loop
default:
start := time.Now()
var bb = bufPool.Get().(*bytes.Buffer)
bb.Reset()
limitReader := io.LimitReader(reader, int64(seedTask.PieceSize))
Expand All @@ -139,12 +144,15 @@ loop:
break loop
}
backSourceLength += n

end := time.Now()
jobCh <- &piece{
taskID: seedTask.ID,
pieceNum: uint32(curPieceNum),
pieceSize: uint32(seedTask.PieceSize),
pieceContent: bb,
taskID: seedTask.ID,
pieceNum: uint32(curPieceNum),
pieceSize: uint32(seedTask.PieceSize),
pieceContent: bb,
downloadCost: uint64(end.Sub(start).Nanoseconds()),
beginDownloadTime: uint64(start.UnixNano()),
endDownloadTime: uint64(end.UnixNano()),
}
curPieceNum++
if n < int64(seedTask.PieceSize) {
Expand Down Expand Up @@ -197,7 +205,10 @@ func (cw *cacheWriter) writerPool(ctx context.Context, g *errgroup.Group, routin
StartIndex: start,
EndIndex: end,
},
PieceStyle: pieceStyle,
PieceStyle: pieceStyle,
DownloadCost: p.downloadCost,
BeginDownloadTime: p.beginDownloadTime,
EndDownloadTime: p.endDownloadTime,
}
// write piece meta to storage
if err = cw.metadataManager.appendPieceMetadata(p.taskID, pieceRecord); err != nil {
Expand Down
15 changes: 9 additions & 6 deletions cdn/supervisor/cdn/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ func (re *reporter) reportPieceMetaRecord(ctx context.Context, taskID string, re
*/
func convertPieceMeta2SeedPiece(record *storage.PieceMetaRecord) *task.PieceInfo {
return &task.PieceInfo{
PieceStyle: base.PieceStyle(record.PieceStyle),
PieceNum: record.PieceNum,
PieceMd5: record.Md5,
PieceRange: record.Range,
OriginRange: record.OriginRange,
PieceLen: record.PieceLen,
PieceNum: record.PieceNum,
PieceMd5: record.Md5,
PieceRange: record.Range,
OriginRange: record.OriginRange,
PieceLen: record.PieceLen,
PieceStyle: base.PieceStyle(record.PieceStyle),
DownloadCost: record.DownloadCost,
BeginDownloadTime: record.BeginDownloadTime,
EndDownloadTime: record.EndDownloadTime,
}
}
34 changes: 27 additions & 7 deletions cdn/supervisor/cdn/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,18 @@ type PieceMetaRecord struct {
OriginRange *rangeutils.Range `json:"originRange"`
// 0: PlainUnspecified
PieceStyle int32 `json:"pieceStyle"`
// total time(millisecond) consumed
DownloadCost uint64 `json:"downloadCost"`
BeginDownloadTime uint64 `json:"beginDownloadTime"`
EndDownloadTime uint64 `json:"endDownloadTime"`
}

const fieldSeparator = ":"

func (record PieceMetaRecord) String() string {
return fmt.Sprint(record.PieceNum, fieldSeparator, record.PieceLen, fieldSeparator, record.Md5, fieldSeparator, record.Range, fieldSeparator,
record.OriginRange, fieldSeparator, record.PieceStyle)
record.OriginRange, fieldSeparator, record.PieceStyle, fieldSeparator, record.DownloadCost, fieldSeparator, record.BeginDownloadTime, fieldSeparator,
record.EndDownloadTime)
}

func ParsePieceMetaRecord(value string) (record *PieceMetaRecord, err error) {
Expand Down Expand Up @@ -148,13 +153,28 @@ func ParsePieceMetaRecord(value string) (record *PieceMetaRecord, err error) {
if err != nil {
return nil, errors.Wrapf(err, "invalid pieceStyle: %s", fields[5])
}
downloadCost, err := strconv.ParseUint(fields[6], 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "invalid download cost: %s", fields[6])
}
beginDownloadTime, err := strconv.ParseUint(fields[7], 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "invalid begin download time: %s", fields[7])
}
endDownloadTime, err := strconv.ParseUint(fields[8], 10, 64)
if err != nil {
return nil, errors.Wrapf(err, "invalid end download time: %s", fields[8])
}
return &PieceMetaRecord{
PieceNum: uint32(pieceNum),
PieceLen: uint32(pieceLen),
Md5: md5,
Range: pieceRange,
OriginRange: originRange,
PieceStyle: int32(pieceStyle),
PieceNum: uint32(pieceNum),
PieceLen: uint32(pieceLen),
Md5: md5,
Range: pieceRange,
OriginRange: originRange,
PieceStyle: int32(pieceStyle),
DownloadCost: downloadCost,
BeginDownloadTime: beginDownloadTime,
EndDownloadTime: endDownloadTime,
}, nil
}

Expand Down
15 changes: 9 additions & 6 deletions cdn/supervisor/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ type SeedTask struct {
}

type PieceInfo struct {
PieceNum uint32 `json:"piece_num"`
PieceMd5 string `json:"piece_md5"`
PieceRange *rangeutils.Range `json:"piece_range"`
OriginRange *rangeutils.Range `json:"origin_range"`
PieceLen uint32 `json:"piece_len"`
PieceStyle base.PieceStyle `json:"piece_style"`
PieceNum uint32 `json:"piece_num"`
PieceMd5 string `json:"piece_md5"`
PieceRange *rangeutils.Range `json:"piece_range"`
OriginRange *rangeutils.Range `json:"origin_range"`
PieceLen uint32 `json:"piece_len"`
PieceStyle base.PieceStyle `json:"piece_style"`
DownloadCost uint64 `json:"download_cost"`
BeginDownloadTime uint64 `json:"begin_download_time"`
EndDownloadTime uint64 `json:"end_download_time"`
}

const (
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ require (
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.28.0
github.com/schollz/progressbar/v3 v3.8.2
github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b
github.com/shirou/gopsutil/v3 v3.21.11
Expand All @@ -68,7 +67,6 @@ require (
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.0
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871
golang.org/x/net v0.0.0-20211209124913-491a49abca63
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
Expand Down Expand Up @@ -164,6 +162,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.28.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
Expand Down Expand Up @@ -193,6 +192,7 @@ require (
golang.org/x/exp v0.0.0-20201221025956-e89b829e73ea // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.6-0.20210820212750-d4cc65f0b2ff // indirect
Expand Down
Loading

0 comments on commit e77b5d9

Please sign in to comment.