Skip to content

Commit

Permalink
Add slice pool for memory efficiency #33
Browse files Browse the repository at this point in the history
  • Loading branch information
rlmcpherson committed Oct 7, 2014
1 parent 0706fb0 commit 7138fa9
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 65 deletions.
74 changes: 42 additions & 32 deletions getter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package s3gof3r

import (
"bytes"
"crypto/md5"
"fmt"
"hash"
Expand Down Expand Up @@ -37,21 +36,21 @@ type getter struct {
quit chan struct{}
qWait map[int]*chunk

bp *bp
sp *sp

closed bool
c *Config

md5 hash.Hash
md5 hash.Hash
cIdx int64
}

type chunk struct {
id int
header http.Header
start int64
size int64
b *bytes.Buffer
len int64
b []byte
}

func newGetter(getURL url.URL, c *Config, b *Bucket) (io.ReadCloser, http.Header, error) {
Expand Down Expand Up @@ -82,11 +81,12 @@ func newGetter(getURL url.URL, c *Config, b *Bucket) (io.ReadCloser, http.Header
g.chunkTotal = int((g.contentLen + g.bufsz - 1) / g.bufsz) // round up, integer division
logger.debugPrintf("object size: %3.2g MB", float64(g.contentLen)/float64((1*mb)))

g.bp = newBufferPool(g.bufsz)
g.sp = newSlicePool(g.bufsz)

for i := 0; i < g.c.Concurrency; i++ {
go g.worker()
}
logger.debugPrintf("started %d workers", g.c.Concurrency)
go g.initChunks()
return g, resp.Header, nil
}
Expand Down Expand Up @@ -130,10 +130,11 @@ func (g *getter) initChunks() {
start: i,
size: size,
b: nil,
len: 0}
}
i += size
id++
g.wg.Add(1)
logger.debugPrintf("initialized chunk %d to get range %v", c.id, c.header)
g.getCh <- c
}
close(g.getCh)
Expand All @@ -149,22 +150,21 @@ func (g *getter) worker() {
func (g *getter) retryGetChunk(c *chunk) {
defer g.wg.Done()
var err error
c.b = <-g.bp.get
c.b = <-g.sp.get
for i := 0; i < g.c.NTry; i++ {
time.Sleep(time.Duration(math.Exp2(float64(i))) * 100 * time.Millisecond) // exponential back-off
err = g.getChunk(c)
if err == nil {
return
}
logger.debugPrintf("error on attempt %d: retrying chunk: %v, error: %s", i, c, err)
logger.debugPrintf("error on attempt %d: retrying chunk: %v, error: %s", i, c.id, err)
}
g.err = err
close(g.quit) // out of tries, ensure quit by closing channel
}

func (g *getter) getChunk(c *chunk) error {
// ensure buffer is empty
c.b.Reset()

r, err := http.NewRequest("GET", g.url.String(), nil)
if err != nil {
Expand All @@ -180,11 +180,12 @@ func (g *getter) getChunk(c *chunk) error {
if resp.StatusCode != 206 {
return newRespError(resp)
}
n, err := c.b.ReadFrom(resp.Body)
n, err := io.ReadAtLeast(resp.Body, c.b, int(c.size))
if err != nil {
return err
}
if n != c.size {
logger.debugPrintf("downloaded chunk %d", c.id)
if int64(n) != c.size {
return fmt.Errorf("chunk %d: Expected %d bytes, received %d",
c.id, c.size, n)
}
Expand All @@ -200,30 +201,34 @@ func (g *getter) Read(p []byte) (int, error) {
if g.err != nil {
return 0, g.err
}
if g.rChunk == nil {
g.rChunk, err = g.nextChunk()
if err != nil {
return 0, err
nw := 0
for nw < len(p) {
if g.rChunk == nil { // get next chunk
logger.debugPrintf("getting chunk %d", g.chunkID)
g.rChunk, err = g.nextChunk()
if err != nil {
return 0, err
}
g.cIdx = 0
}
}

n, err := g.rChunk.b.Read(p)
if g.c.Md5Check {
g.md5.Write(p[0:n])
}
n := copy(p[nw:], g.rChunk.b[g.cIdx:g.rChunk.size])
g.cIdx += int64(n)
nw += n
g.bytesRead += int64(n)

// Empty buffer, move on to next
if err == io.EOF {
// Do not send EOF for each chunk.
if !(g.rChunk.id == g.chunkTotal-1 && g.rChunk.b.Len() == 0) {
err = nil
if g.bytesRead == g.contentLen {
logger.debugPrintf("complete: %d bytes read", g.bytesRead)
return n, io.EOF
}
if g.cIdx >= g.rChunk.size-1 {
g.sp.give <- g.rChunk.b // recycle buffer
g.chunkID++
g.rChunk = nil
}
g.bp.give <- g.rChunk.b // recycle buffer
g.rChunk = nil
g.chunkID++
}
g.bytesRead = g.bytesRead + int64(n)
return n, err
return nw, nil

}

func (g *getter) nextChunk() (*chunk, error) {
Expand All @@ -233,6 +238,11 @@ func (g *getter) nextChunk() (*chunk, error) {
c := g.qWait[g.chunkID]
if c != nil {
delete(g.qWait, g.chunkID)
if g.c.Md5Check {
if _, err := g.md5.Write(c.b[:c.size]); err != nil {
return nil, err
}
}
return c, nil
}
// if next chunk not in qWait, read from channel
Expand All @@ -254,7 +264,7 @@ func (g *getter) Close() error {
}
g.wg.Wait()
g.closed = true
close(g.bp.quit)
close(g.sp.quit)
if g.bytesRead != g.contentLen {
return fmt.Errorf("read error: %d bytes read. expected: %d", g.bytesRead, g.contentLen)
}
Expand Down
71 changes: 38 additions & 33 deletions putter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math"
"net/http"
"net/url"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -30,7 +31,7 @@ const (
type part struct {
r io.ReadSeeker
len int64
b *bytes.Buffer
b []byte

// read by xml encoder
PartNumber int
Expand All @@ -46,7 +47,8 @@ type putter struct {
c *Config

bufsz int64
buf *bytes.Buffer
buf []byte
bufIdx int
ch chan *part
part int
closed bool
Expand All @@ -56,14 +58,15 @@ type putter struct {
md5 hash.Hash
ETag string

bp *bp
sp *sp

makes int
UploadId string // casing matches s3 xml
xml struct {
XMLName string `xml:"CompleteMultipartUpload"`
Part []*part
}
putsz int64
}

// Sends an S3 multipart upload initiation request.
Expand Down Expand Up @@ -97,7 +100,7 @@ func newPutter(url url.URL, h http.Header, c *Config, b *Bucket) (p *putter, err
p.md5OfParts = md5.New()
p.md5 = md5.New()

p.bp = newBufferPool(p.bufsz)
p.sp = newSlicePool(p.bufsz)

return p, nil
}
Expand All @@ -111,28 +114,32 @@ func (p *putter) Write(b []byte) (int, error) {
p.abort()
return 0, p.err
}
if p.buf == nil {
p.buf = <-p.bp.get
// grow to bufsz, allocating overhead to avoid slice growth
p.buf.Grow(int(p.bufsz + 100*kb))
}
n, err := p.buf.Write(b)
if err != nil {
p.abort()
return n, err
}
nw := 0
for nw < len(b) {
if p.buf == nil {
p.buf = <-p.sp.get
p.bufIdx = 0
if int64(cap(p.buf)) < p.bufsz {
p.buf = make([]byte, p.bufsz)
runtime.GC()
}
}
n := copy(p.buf[p.bufIdx:], b[nw:])
p.bufIdx += n
nw += n

if int64(p.buf.Len()) >= p.bufsz {
p.flush()
if len(p.buf) == p.bufIdx {
p.flush()
}
}
return n, nil
return nw, nil
}

func (p *putter) flush() {
p.wg.Add(1)
p.part++
b := *p.buf
part := &part{bytes.NewReader(b.Bytes()), int64(b.Len()), p.buf, p.part, "", ""}
p.putsz += int64(p.bufIdx)
part := &part{bytes.NewReader(p.buf[:p.bufIdx]), int64(p.bufIdx), p.buf, p.part, "", ""}
var err error
part.contentMd5, part.ETag, err = p.md5Content(part.r)
if err != nil {
Expand All @@ -142,12 +149,11 @@ func (p *putter) flush() {
p.xml.Part = append(p.xml.Part, part)
p.ch <- part
p.buf = nil
// double buffer size every 1000 parts to
// avoid exceeding the 10000-part AWS limit
// while still reaching the 5 Terabyte max object size
if p.part%1000 == 0 && growPartSize(p.part, p.bufsz) {
// if necessary, double buffer size every 2000 parts due to the 10000-part AWS limit
// to reach the 5 Terabyte max object size, initial part size must be ~85 MB
if p.part%100 == 0 && growPartSize(p.part, p.bufsz, p.putsz) {
p.bufsz = min64(p.bufsz*2, maxPartSize)
p.bp.makeSize = p.bufsz
p.sp.sizech <- p.bufsz // update pool buffer size
logger.debugPrintf("part size doubled to %d", p.bufsz)

}
Expand All @@ -168,10 +174,11 @@ func (p *putter) retryPutPart(part *part) {
time.Sleep(time.Duration(math.Exp2(float64(i))) * 100 * time.Millisecond) // exponential back-off
err = p.putPart(part)
if err == nil {
p.bp.give <- part.b
p.sp.give <- part.b
part.b = nil
return
}
logger.debugPrintf("Error on attempt %d: Retrying part: %v, Error: %s", i, part, err)
logger.debugPrintf("Error on attempt %d: Retrying part: %d, Error: %s", i, part.PartNumber, err)
}
p.err = err
}
Expand Down Expand Up @@ -213,15 +220,14 @@ func (p *putter) Close() (err error) {
return syscall.EINVAL
}
if p.buf != nil {
buf := *p.buf
if buf.Len() > 0 {
if p.bufIdx > 0 {
p.flush()
}
}
p.wg.Wait()
close(p.ch)
p.closed = true
close(p.bp.quit)
close(p.sp.quit)

if p.part == 0 {
p.abort()
Expand Down Expand Up @@ -274,7 +280,6 @@ func (p *putter) Close() (err error) {
break
}
}
return
}
return
}
Expand Down Expand Up @@ -370,7 +375,7 @@ func (p *putter) retryRequest(method, urlStr string, body io.ReadSeeker, h http.
}

// returns true unless partSize is large enough
// to achieve maxObjSize
func growPartSize(partIndex int, partSize int64) bool {
return maxObjSize/(maxNPart-int64(partIndex)) > partSize
// to achieve maxObjSize with remaining parts
func growPartSize(partIndex int, partSize, putsz int64) bool {
return (maxObjSize-putsz)/(maxNPart-int64(partIndex)) > partSize
}
Loading

0 comments on commit 7138fa9

Please sign in to comment.