Skip to content

Commit

Permalink
Merge pull request #344 from drakkan/allocator_pr
Browse files Browse the repository at this point in the history
add optional allocator
  • Loading branch information
puellanivis authored Jul 16, 2020
2 parents 7d72039 + 118ca57 commit 95fa324
Show file tree
Hide file tree
Showing 16 changed files with 471 additions and 66 deletions.
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go_import_path: github.com/pkg/sftp
# current and previous stable releases, plus tip
# remember to exclude previous and tip for macs below
go:
- 1.12.x
- 1.13.x
- 1.14.x
- tip

os:
Expand All @@ -15,7 +15,7 @@ os:
matrix:
exclude:
- os: osx
go: 1.12.x
go: 1.13.x
- os: osx
go: tip

Expand All @@ -35,6 +35,12 @@ script:
- go test -integration -v ./...
- go test -testserver -v ./...
- go test -integration -testserver -v ./...
- go test -integration -allocator -v ./...
- go test -testserver -allocator -v ./...
- go test -integration -testserver -allocator -v ./...
- go test -race -integration -v ./...
- go test -race -testserver -v ./...
- go test -race -integration -testserver -v ./...
- go test -race -integration -allocator -v ./...
- go test -race -testserver -allocator -v ./...
- go test -race -integration -allocator -testserver -v ./...
18 changes: 12 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
integration:
go test -integration -v
go test -testserver -v
go test -integration -testserver -v
go test -integration -v ./...
go test -testserver -v ./...
go test -integration -testserver -v ./...
go test -integration -allocator -v ./...
go test -testserver -allocator -v ./...
go test -integration -testserver -allocator -v ./...

integration_w_race:
go test -race -integration -v
go test -race -testserver -v
go test -race -integration -testserver -v
go test -race -integration -v ./...
go test -race -testserver -v ./...
go test -race -integration -testserver -v ./...
go test -race -integration -allocator -v ./...
go test -race -testserver -allocator -v ./...
go test -race -integration -allocator -testserver -v ./...


96 changes: 96 additions & 0 deletions allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package sftp

import (
"sync"
)

type allocator struct {
sync.Mutex
available [][]byte
// map key is the request order
used map[uint32][][]byte
}

func newAllocator() *allocator {
return &allocator{
// micro optimization: initialize available pages with an initial capacity
available: make([][]byte, 0, SftpServerWorkerCount*2),
used: make(map[uint32][][]byte),
}
}

// GetPage returns a previously allocated and unused []byte or create a new one.
// The slice have a fixed size = maxMsgLength, this value is suitable for both
// receiving new packets and reading the files to serve
func (a *allocator) GetPage(requestOrderID uint32) []byte {
a.Lock()
defer a.Unlock()

var result []byte

// get an available page and remove it from the available ones.
if len(a.available) > 0 {
truncLength := len(a.available) - 1
result = a.available[truncLength]

a.available[truncLength] = nil // clear out the internal pointer
a.available = a.available[:truncLength] // truncate the slice
}

// no preallocated slice found, just allocate a new one
if result == nil {
result = make([]byte, maxMsgLength)
}

// put result in used pages
a.used[requestOrderID] = append(a.used[requestOrderID], result)

return result
}

// ReleasePages marks unused all pages in use for the given requestID
func (a *allocator) ReleasePages(requestOrderID uint32) {
a.Lock()
defer a.Unlock()

if used := a.used[requestOrderID]; len(used) > 0 {
a.available = append(a.available, used...)
}
delete(a.used, requestOrderID)
}

// Free removes all the used and available pages.
// Call this method when the allocator is not needed anymore
func (a *allocator) Free() {
a.Lock()
defer a.Unlock()

a.available = nil
a.used = make(map[uint32][][]byte)
}

func (a *allocator) countUsedPages() int {
a.Lock()
defer a.Unlock()

num := 0
for _, p := range a.used {
num += len(p)
}
return num
}

func (a *allocator) countAvailablePages() int {
a.Lock()
defer a.Unlock()

return len(a.available)
}

func (a *allocator) isRequestOrderIDUsed(requestOrderID uint32) bool {
a.Lock()
defer a.Unlock()

_, ok := a.used[requestOrderID]
return ok
}
135 changes: 135 additions & 0 deletions allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package sftp

import (
"strconv"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAllocator(t *testing.T) {
allocator := newAllocator()
// get a page for request order id 1
page := allocator.GetPage(1)
page[1] = uint8(1)
assert.Equal(t, maxMsgLength, len(page))
assert.Equal(t, 1, allocator.countUsedPages())
// get another page for request order id 1, we now have 2 used pages
page = allocator.GetPage(1)
page[0] = uint8(2)
assert.Equal(t, 2, allocator.countUsedPages())
// get another page for request order id 1, we now have 3 used pages
page = allocator.GetPage(1)
page[2] = uint8(3)
assert.Equal(t, 3, allocator.countUsedPages())
// release the page for request order id 1, we now have 3 available pages
allocator.ReleasePages(1)
assert.NotContains(t, allocator.used, 1)
assert.Equal(t, 3, allocator.countAvailablePages())
// get a page for request order id 2
// we get the latest released page, let's verify that by checking the previously written values
// so we are sure we are reusing a previously allocated page
page = allocator.GetPage(2)
assert.Equal(t, uint8(3), page[2])
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 1, allocator.countUsedPages())
page = allocator.GetPage(2)
assert.Equal(t, uint8(2), page[0])
assert.Equal(t, 1, allocator.countAvailablePages())
assert.Equal(t, 2, allocator.countUsedPages())
page = allocator.GetPage(2)
assert.Equal(t, uint8(1), page[1])
// we now have 3 used pages for request order id 2 and no available pages
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// release some request order id with no allocated pages, should have no effect
allocator.ReleasePages(1)
allocator.ReleasePages(3)
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// now get some pages for another request order id
allocator.GetPage(3)
// we now must have 3 used pages for request order id 2 and 1 used page for request order id 3
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 4, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.True(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// get another page for request order id 3
allocator.GetPage(3)
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 5, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.True(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// now release the pages for request order id 3
allocator.ReleasePages(3)
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
assert.False(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be not used")
// again check we are reusing previously allocated pages.
// We have written nothing to the 2 last requested page so release them and get the third one
allocator.ReleasePages(2)
assert.Equal(t, 5, allocator.countAvailablePages())
assert.Equal(t, 0, allocator.countUsedPages())
assert.False(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be not used")
allocator.GetPage(4)
allocator.GetPage(4)
page = allocator.GetPage(4)
assert.Equal(t, uint8(3), page[2])
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(4), "page with request order id 4 must be used")
// free the allocator
allocator.Free()
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 0, allocator.countUsedPages())
}

func BenchmarkAllocatorSerial(b *testing.B) {
allocator := newAllocator()
for i := 0; i < b.N; i++ {
benchAllocator(allocator, uint32(i))
}
}

func BenchmarkAllocatorParallel(b *testing.B) {
var counter uint32
allocator := newAllocator()
for i := 1; i <= 8; i *= 2 {
b.Run(strconv.Itoa(i), func(b *testing.B) {
b.SetParallelism(i)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
benchAllocator(allocator, atomic.AddUint32(&counter, 1))
}
})
})
}
}

func benchAllocator(allocator *allocator, requestOrderID uint32) {
// simulates the page requested in recvPacket
allocator.GetPage(requestOrderID)
// simulates the page requested in fileget for downloads
allocator.GetPage(requestOrderID)
// release the allocated pages
allocator.ReleasePages(requestOrderID)
}

// useful for debug
func printAllocatorContents(allocator *allocator) {
for o, u := range allocator.used {
debug("used order id: %v, values: %+v", o, u)
}
for _, v := range allocator.available {
debug("available, values: %+v", v)
}
}
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (c *Client) nextID() uint32 {
}

func (c *Client) recvVersion() error {
typ, data, err := c.recvPacket()
typ, data, err := c.recvPacket(0)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
type conn struct {
io.Reader
io.WriteCloser
// this is the same allocator used in packet manager
alloc *allocator
sync.Mutex // used to serialise writes to sendPacket
// sendPacketTest is needed to replicate packet issues in testing
sendPacketTest func(w io.Writer, m encoding.BinaryMarshaler) error
}

func (c *conn) recvPacket() (uint8, []byte, error) {
return recvPacket(c)
// the orderID is used in server mode if the allocator is enabled.
// For the client mode just pass 0
func (c *conn) recvPacket(orderID uint32) (uint8, []byte, error) {
return recvPacket(c, c.alloc, orderID)
}

func (c *conn) sendPacket(m encoding.BinaryMarshaler) error {
Expand Down Expand Up @@ -76,7 +80,7 @@ func (c *clientConn) recv() error {
c.conn.Close()
}()
for {
typ, data, err := c.recvPacket()
typ, data, err := c.recvPacket(0)
if err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions packet-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type packetManager struct {
sender packetSender // connection object
working *sync.WaitGroup
packetCount uint32
// it is not nil if the allocator is enabled
alloc *allocator
}

type packetSender interface {
Expand All @@ -44,6 +46,14 @@ func (s *packetManager) newOrderID() uint32 {
return s.packetCount
}

// returns the next orderID without incrementing it.
// This is used before receiving a new packet, with the allocator enabled, to associate
// the slice allocated for the received packet with the orderID that will be used to mark
// the allocated slices for reuse once the request is served
func (s *packetManager) getNextOrderID() uint32 {
return s.packetCount + 1
}

type orderedRequest struct {
requestPacket
orderid uint32
Expand Down Expand Up @@ -174,6 +184,10 @@ func (s *packetManager) maybeSendPackets() {
if in.orderID() == out.orderID() {
debug("Sending packet: %v", out.id())
s.sender.sendPacket(out.(encoding.BinaryMarshaler))
if s.alloc != nil {
// mark for reuse the slices allocated for this request
s.alloc.ReleasePages(in.orderID())
}
// pop off heads
copy(s.incoming, s.incoming[1:]) // shift left
s.incoming[len(s.incoming)-1] = nil // clear last
Expand Down
Loading

0 comments on commit 95fa324

Please sign in to comment.