Skip to content

Commit

Permalink
Merge pull request #284 from kfarnung/chunkwriting_buffers
Browse files Browse the repository at this point in the history
When the buffer isn't filled to capacity a new slice is created that's
smaller than the original. In that case the smaller slice is returned to
the pool which prevents the rest of the capacity from being used again.

The solution is to pass the original slice through and attach the
length. This allows the original slice to be returned to the pool once
the operation is complete.

This change also simplifies the `sendChunk` method, ensuring that the
buffer is returned to the TransferManager even when no bytes were read
from the reader.
  • Loading branch information
zezha-msft authored Apr 27, 2022
2 parents 609d45c + 7a7d02e commit 69bf9eb
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions azblob/chunkwriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type copier struct {
type copierChunk struct {
buffer []byte
id string
length int
}

// getErr returns an error by priority. First, if a function set an error, it returns that error. Next, if the Context has an error
Expand All @@ -125,37 +126,31 @@ func (c *copier) sendChunk() error {
}

n, err := io.ReadFull(c.reader, buffer)
switch {
case err == nil && n == 0:
return nil
case err == nil:
if n > 0 {
// Some data was read, schedule the write.
id := c.id.next()
c.wg.Add(1)
c.o.TransferManager.Run(
func() {
defer c.wg.Done()
c.write(copierChunk{buffer: buffer[0:n], id: id})
c.write(copierChunk{buffer: buffer, id: id, length: n})
},
)
return nil
case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0:
return io.EOF
} else {
// Return the unused buffer to the manager.
c.o.TransferManager.Put(buffer)
}

if err == io.EOF || err == io.ErrUnexpectedEOF {
id := c.id.next()
c.wg.Add(1)
c.o.TransferManager.Run(
func() {
defer c.wg.Done()
c.write(copierChunk{buffer: buffer[0:n], id: id})
},
)
if err == nil {
return nil
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
return io.EOF
}
if err := c.getErr(); err != nil {
return err

if cerr := c.getErr(); cerr != nil {
return cerr
}

return err
}

Expand All @@ -167,7 +162,7 @@ func (c *copier) write(chunk copierChunk) {
return
}

_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions)
_, err := c.to.StageBlock(c.ctx, chunk.id, bytes.NewReader(chunk.buffer[:chunk.length]), c.o.AccessConditions.LeaseAccessConditions, nil, c.o.ClientProvidedKeyOptions)
if err != nil {
c.errCh <- fmt.Errorf("write error: %w", err)
return
Expand Down

0 comments on commit 69bf9eb

Please sign in to comment.