Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1375 from lowzj/fix-ByteBuffer-gc
Browse files Browse the repository at this point in the history
optimize: release allocated ByteBuffer explicitly
  • Loading branch information
starnop authored Jul 2, 2020
2 parents f01641e + 764b4ed commit 88cb000
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package downloader

import (
"bytes"
"io"
"sort"

"github.com/dragonflyoss/Dragonfly/dfget/config"
"github.com/dragonflyoss/Dragonfly/pkg/pool"

"github.com/go-check/check"
)
Expand Down Expand Up @@ -49,7 +49,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 0,
PieceSize: 6,
Content: bytes.NewBufferString("000010"),
Content: pool.NewBufferString("000010"),
},
noWrapper: false,
expected: "1",
Expand All @@ -58,7 +58,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 1,
PieceSize: 6,
Content: bytes.NewBufferString("000020"),
Content: pool.NewBufferString("000020"),
},
noWrapper: false,
expected: "2",
Expand All @@ -67,7 +67,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 3,
PieceSize: 6,
Content: bytes.NewBufferString("000040"),
Content: pool.NewBufferString("000040"),
},
noWrapper: false,
expected: "4",
Expand All @@ -76,7 +76,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 4,
PieceSize: 6,
Content: bytes.NewBufferString("000050"),
Content: pool.NewBufferString("000050"),
},
noWrapper: false,
expected: "5",
Expand All @@ -85,7 +85,7 @@ func (s *ClientStreamWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 2,
PieceSize: 6,
Content: bytes.NewBufferString("000030"),
Content: pool.NewBufferString("000030"),
},
noWrapper: false,
expected: "3",
Expand Down
9 changes: 5 additions & 4 deletions dfget/core/downloader/p2p_downloader/client_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package downloader

import (
"bufio"
"context"
"io"
"math/rand"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/dragonflyoss/Dragonfly/dfget/core/helper"
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
"github.com/dragonflyoss/Dragonfly/pkg/pool"
"github.com/dragonflyoss/Dragonfly/pkg/queue"

"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -244,9 +244,10 @@ func writePieceToFile(piece *Piece, file *os.File, cdnSource apiTypes.CdnSource)
return err
}

buf := bufio.NewWriterSize(file, 4*1024*1024)
_, err := io.Copy(buf, piece.RawContent(noWrapper))
buf.Flush()
writer := pool.AcquireWriter(file)
_, err := io.Copy(writer, piece.RawContent(noWrapper))
pool.ReleaseWriter(writer)
writer = nil
return err
}

Expand Down
8 changes: 4 additions & 4 deletions dfget/core/downloader/p2p_downloader/client_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package downloader

import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path/filepath"

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
"github.com/dragonflyoss/Dragonfly/pkg/pool"

"github.com/go-check/check"
)
Expand Down Expand Up @@ -63,7 +63,7 @@ func (s *ClientWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 0,
PieceSize: 6,
Content: bytes.NewBufferString("000010"),
Content: pool.NewBufferString("000010"),
},
cdnSource: apiTypes.CdnSourceSupernode,
expected: "1",
Expand All @@ -72,7 +72,7 @@ func (s *ClientWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 1,
PieceSize: 6,
Content: bytes.NewBufferString("000020"),
Content: pool.NewBufferString("000020"),
},
cdnSource: apiTypes.CdnSourceSupernode,
expected: "2",
Expand All @@ -81,7 +81,7 @@ func (s *ClientWriterTestSuite) TestWrite(c *check.C) {
piece: &Piece{
PieceNum: 1,
PieceSize: 6,
Content: bytes.NewBufferString("000030"),
Content: pool.NewBufferString("000030"),
},
cdnSource: apiTypes.CdnSourceSource,
expected: "000030",
Expand Down
5 changes: 2 additions & 3 deletions dfget/core/downloader/p2p_downloader/p2p_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package downloader

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -212,7 +211,7 @@ func (p2p *P2PDownloader) run(ctx context.Context, pieceWriter PieceWriter) erro
logrus.Infof("downloading piece:%v", lastItem)

curItem := *lastItem
curItem.Content = &bytes.Buffer{}
curItem.Content = nil
lastItem = nil

response, err := p2p.pullPieceTask(&curItem)
Expand Down Expand Up @@ -412,7 +411,7 @@ func (p2p *P2PDownloader) getItem(latestItem *Piece) (bool, *Piece) {
}
if !v && (item.Result == constants.ResultSemiSuc ||
item.Result == constants.ResultSuc) {
p2p.total += int64(item.Content.Len())
p2p.total += item.ContentLength()
p2p.pieceSet[item.Range] = true
} else if !v {
delete(p2p.pieceSet, item.Range)
Expand Down
47 changes: 40 additions & 7 deletions dfget/core/downloader/p2p_downloader/piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

apiTypes "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/pool"
)

// Piece contains all information of a piece.
Expand Down Expand Up @@ -51,14 +52,25 @@ type Piece struct {
PieceNum int `json:"pieceNum"`

// Content uses a buffer to temporarily store the piece content.
Content *bytes.Buffer `json:"-"`
Content *pool.Buffer `json:"-"`

// length the length of the content.
length int64

// autoReset automatically reset content after reading.
autoReset bool
}

// RawContent returns raw contents,
// If the piece has wrapper, and the piece content will remove the head and tail.
func (p *Piece) RawContent(noWrapper bool) *bytes.Buffer {
contents := p.Content.Bytes()
length := len(contents)
defer func() {
if p.autoReset {
p.ResetContent()
}
}()

if noWrapper {
return bytes.NewBuffer(contents[:])
Expand All @@ -69,13 +81,33 @@ func (p *Piece) RawContent(noWrapper bool) *bytes.Buffer {
return nil
}

// ContentLength returns the content length.
func (p *Piece) ContentLength() int64 {
if p.length <= 0 && p.Content != nil {
p.length = int64(p.Content.Len())
}
return p.length
}

func (p *Piece) String() string {
if b, e := json.Marshal(p); e == nil {
return string(b)
}
return ""
}

// ResetContent reset contents and returns it back to buffer pool.
func (p *Piece) ResetContent() {
if p.Content == nil {
return
}
if p.length == 0 {
p.length = int64(p.Content.Len())
}
pool.ReleaseBuffer(p.Content)
p.Content = nil
}

// NewPiece creates a Piece.
func NewPiece(taskID, node, dstCid, pieceRange string, result, status int, cdnSource apiTypes.CdnSource) *Piece {
return &Piece{
Expand All @@ -85,7 +117,8 @@ func NewPiece(taskID, node, dstCid, pieceRange string, result, status int, cdnSo
Range: pieceRange,
Result: result,
Status: status,
Content: &bytes.Buffer{},
Content: nil,
autoReset: true,
}
}

Expand All @@ -96,16 +129,14 @@ func NewPieceSimple(taskID string, node string, status int, cdnSource apiTypes.C
SuperNode: node,
Status: status,
Result: constants.ResultInvalid,
Content: &bytes.Buffer{},
Content: nil,
autoReset: true,
}
}

// NewPieceContent creates a Piece with specified content.
func NewPieceContent(taskID, node, dstCid, pieceRange string,
result, status int, contents *bytes.Buffer, cdnSource apiTypes.CdnSource) *Piece {
if contents == nil {
contents = &bytes.Buffer{}
}
result, status int, contents *pool.Buffer, cdnSource apiTypes.CdnSource) *Piece {
return &Piece{
TaskID: taskID,
SuperNode: node,
Expand All @@ -114,5 +145,7 @@ func NewPieceContent(taskID, node, dstCid, pieceRange string,
Result: result,
Status: status,
Content: contents,
length: int64(contents.Len()),
autoReset: true,
}
}
8 changes: 5 additions & 3 deletions dfget/core/downloader/p2p_downloader/piece_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"

"github.com/go-check/check"

"github.com/dragonflyoss/Dragonfly/pkg/pool"
)

type PieceTestSuite struct {
Expand All @@ -35,9 +37,9 @@ func (s *PieceTestSuite) TestRawContent(c *check.C) {
noWrapper bool
expected *bytes.Buffer
}{
{piece: &Piece{Content: bytes.NewBufferString("")}, noWrapper: false, expected: nil},
{piece: &Piece{Content: bytes.NewBufferString("000010")}, noWrapper: false, expected: bytes.NewBufferString("1")},
{piece: &Piece{Content: bytes.NewBufferString("000020")}, noWrapper: true, expected: bytes.NewBufferString("000020")},
{piece: &Piece{Content: pool.NewBufferString("")}, noWrapper: false, expected: nil},
{piece: &Piece{Content: pool.NewBufferString("000010")}, noWrapper: false, expected: bytes.NewBufferString("1")},
{piece: &Piece{Content: pool.NewBufferString("000020")}, noWrapper: true, expected: bytes.NewBufferString("000020")},
}

for _, v := range cases {
Expand Down
14 changes: 11 additions & 3 deletions dfget/core/downloader/p2p_downloader/power_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
"github.com/dragonflyoss/Dragonfly/pkg/limitreader"
"github.com/dragonflyoss/Dragonfly/pkg/netutils"
"github.com/dragonflyoss/Dragonfly/pkg/pool"
"github.com/dragonflyoss/Dragonfly/pkg/queue"
"github.com/dragonflyoss/Dragonfly/pkg/ratelimiter"

Expand Down Expand Up @@ -113,7 +114,7 @@ func (pc *PowerClient) ClientError() *types.ClientErrorRequest {
return pc.clientError
}

func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {
func (pc *PowerClient) downloadPiece() (content *pool.Buffer, e error) {
dstIP := pc.pieceTask.PeerIP
peerPort := pc.pieceTask.PeerPort

Expand Down Expand Up @@ -149,7 +150,14 @@ func (pc *PowerClient) downloadPiece() (content *bytes.Buffer, e error) {
// start to read data from resp
// use limitReader to limit the download speed
limitReader := limitreader.NewLimitReaderWithLimiter(pc.rateLimiter, resp.Body, pieceMD5 != "")
content = &bytes.Buffer{}
content = pool.AcquireBufferSize(int(pc.pieceTask.PieceSize))
defer func() {
// if an error happened, the content cannot be released outside.
if e != nil {
pool.ReleaseBuffer(content)
content = nil
}
}()
if pc.total, e = content.ReadFrom(limitReader); e != nil {
return nil, e
}
Expand Down Expand Up @@ -193,7 +201,7 @@ func (pc *PowerClient) createDownloadRequest() *api.DownloadRequest {
}
}

func (pc *PowerClient) successPiece(content *bytes.Buffer) *Piece {
func (pc *PowerClient) successPiece(content *pool.Buffer) *Piece {
piece := NewPieceContent(pc.taskID, pc.node, pc.pieceTask.Cid, pc.pieceTask.Range,
constants.ResultSemiSuc, constants.TaskStatusRunning, content, pc.cdnSource)
piece.PieceSize = pc.pieceTask.PieceSize
Expand Down
3 changes: 2 additions & 1 deletion dfget/core/downloader/p2p_downloader/power_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func (s *PowerClientTestSuite) TestDownloadPiece(c *check.C) {
return resp, nil
}
content, err = s.powerClient.downloadPiece()
c.Check(content, check.DeepEquals, bytes.NewBufferString("hello"))
c.Check(content, check.NotNil)
c.Check(content.String(), check.Equals, "hello")
c.Check(err, check.IsNil)
}

Expand Down
Loading

0 comments on commit 88cb000

Please sign in to comment.