Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add optional allocator #344

Merged
merged 5 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go_import_path: github.com/pkg/sftp
# current and previous stable releases, plus tip
# remember to exclude previous and tip for macs below
go:
- 1.12.x
- 1.13.x
- 1.14.x
- tip

os:
Expand All @@ -15,7 +15,7 @@ os:
matrix:
exclude:
- os: osx
go: 1.12.x
go: 1.13.x
- os: osx
go: tip

Expand All @@ -35,6 +35,12 @@ script:
- go test -integration -v ./...
- go test -testserver -v ./...
- go test -integration -testserver -v ./...
- go test -integration -allocator -v ./...
- go test -testserver -allocator -v ./...
- go test -integration -testserver -allocator -v ./...
- go test -race -integration -v ./...
- go test -race -testserver -v ./...
- go test -race -integration -testserver -v ./...
- go test -race -integration -allocator -v ./...
- go test -race -testserver -allocator -v ./...
- go test -race -integration -allocator -testserver -v ./...
18 changes: 12 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
integration:
go test -integration -v
go test -testserver -v
go test -integration -testserver -v
go test -integration -v ./...
go test -testserver -v ./...
go test -integration -testserver -v ./...
go test -integration -allocator -v ./...
go test -testserver -allocator -v ./...
go test -integration -testserver -allocator -v ./...

integration_w_race:
go test -race -integration -v
go test -race -testserver -v
go test -race -integration -testserver -v
go test -race -integration -v ./...
go test -race -testserver -v ./...
go test -race -integration -testserver -v ./...
go test -race -integration -allocator -v ./...
go test -race -testserver -allocator -v ./...
go test -race -integration -allocator -testserver -v ./...


96 changes: 96 additions & 0 deletions allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package sftp

import (
"sync"
)

type allocator struct {
sync.Mutex
available [][]byte
// map key is the request order
used map[uint32][][]byte
}

func newAllocator() *allocator {
return &allocator{
// micro optimization: initialize available pages with an initial capacity
available: make([][]byte, 0, SftpServerWorkerCount*2),
used: make(map[uint32][][]byte),
}
}

// GetPage returns a previously allocated and unused []byte or create a new one.
// The slice have a fixed size = maxMsgLength, this value is suitable for both
// receiving new packets and reading the files to serve
func (a *allocator) GetPage(requestOrderID uint32) []byte {
a.Lock()
defer a.Unlock()

var result []byte

// get an available page and remove it from the available ones.
if len(a.available) > 0 {
truncLength := len(a.available) - 1
result = a.available[truncLength]

a.available[truncLength] = nil // clear out the internal pointer
a.available = a.available[:truncLength] // truncate the slice
}

// no preallocated slice found, just allocate a new one
if result == nil {
result = make([]byte, maxMsgLength)
}

// put result in used pages
a.used[requestOrderID] = append(a.used[requestOrderID], result)

return result
}

// ReleasePages marks unused all pages in use for the given requestID
func (a *allocator) ReleasePages(requestOrderID uint32) {
a.Lock()
defer a.Unlock()

if used := a.used[requestOrderID]; len(used) > 0 {
a.available = append(a.available, used...)
}
delete(a.used, requestOrderID)
}

// Free removes all the used and available pages.
// Call this method when the allocator is not needed anymore
func (a *allocator) Free() {
a.Lock()
defer a.Unlock()

a.available = nil
a.used = make(map[uint32][][]byte)
puellanivis marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *allocator) countUsedPages() int {
a.Lock()
defer a.Unlock()

num := 0
for _, p := range a.used {
num += len(p)
}
return num
}

func (a *allocator) countAvailablePages() int {
a.Lock()
defer a.Unlock()

return len(a.available)
}

func (a *allocator) isRequestOrderIDUsed(requestOrderID uint32) bool {
a.Lock()
defer a.Unlock()

_, ok := a.used[requestOrderID]
return ok
}
135 changes: 135 additions & 0 deletions allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package sftp

import (
"strconv"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAllocator(t *testing.T) {
allocator := newAllocator()
// get a page for request order id 1
page := allocator.GetPage(1)
page[1] = uint8(1)
assert.Equal(t, maxMsgLength, len(page))
assert.Equal(t, 1, allocator.countUsedPages())
// get another page for request order id 1, we now have 2 used pages
page = allocator.GetPage(1)
page[0] = uint8(2)
assert.Equal(t, 2, allocator.countUsedPages())
// get another page for request order id 1, we now have 3 used pages
page = allocator.GetPage(1)
page[2] = uint8(3)
assert.Equal(t, 3, allocator.countUsedPages())
// release the page for request order id 1, we now have 3 available pages
allocator.ReleasePages(1)
assert.NotContains(t, allocator.used, 1)
assert.Equal(t, 3, allocator.countAvailablePages())
// get a page for request order id 2
// we get the latest released page, let's verify that by checking the previously written values
// so we are sure we are reusing a previously allocated page
page = allocator.GetPage(2)
assert.Equal(t, uint8(3), page[2])
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 1, allocator.countUsedPages())
page = allocator.GetPage(2)
assert.Equal(t, uint8(2), page[0])
assert.Equal(t, 1, allocator.countAvailablePages())
assert.Equal(t, 2, allocator.countUsedPages())
page = allocator.GetPage(2)
assert.Equal(t, uint8(1), page[1])
// we now have 3 used pages for request order id 2 and no available pages
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// release some request order id with no allocated pages, should have no effect
allocator.ReleasePages(1)
allocator.ReleasePages(3)
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// now get some pages for another request order id
allocator.GetPage(3)
// we now must have 3 used pages for request order id 2 and 1 used page for request order id 3
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 4, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.True(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// get another page for request order id 3
allocator.GetPage(3)
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 5, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.True(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
// now release the pages for request order id 3
allocator.ReleasePages(3)
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be used")
assert.False(t, allocator.isRequestOrderIDUsed(1), "page with request order id 1 must be not used")
assert.False(t, allocator.isRequestOrderIDUsed(3), "page with request order id 3 must be not used")
// again check we are reusing previously allocated pages.
// We have written nothing to the 2 last requested page so release them and get the third one
allocator.ReleasePages(2)
assert.Equal(t, 5, allocator.countAvailablePages())
assert.Equal(t, 0, allocator.countUsedPages())
assert.False(t, allocator.isRequestOrderIDUsed(2), "page with request order id 2 must be not used")
allocator.GetPage(4)
allocator.GetPage(4)
page = allocator.GetPage(4)
assert.Equal(t, uint8(3), page[2])
assert.Equal(t, 2, allocator.countAvailablePages())
assert.Equal(t, 3, allocator.countUsedPages())
assert.True(t, allocator.isRequestOrderIDUsed(4), "page with request order id 4 must be used")
// free the allocator
allocator.Free()
assert.Equal(t, 0, allocator.countAvailablePages())
assert.Equal(t, 0, allocator.countUsedPages())
}

func BenchmarkAllocatorSerial(b *testing.B) {
allocator := newAllocator()
for i := 0; i < b.N; i++ {
benchAllocator(allocator, uint32(i))
}
}

func BenchmarkAllocatorParallel(b *testing.B) {
var counter uint32
allocator := newAllocator()
for i := 1; i <= 8; i *= 2 {
b.Run(strconv.Itoa(i), func(b *testing.B) {
b.SetParallelism(i)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
benchAllocator(allocator, atomic.AddUint32(&counter, 1))
}
})
})
}
}

func benchAllocator(allocator *allocator, requestOrderID uint32) {
// simulates the page requested in recvPacket
allocator.GetPage(requestOrderID)
// simulates the page requested in fileget for downloads
allocator.GetPage(requestOrderID)
// release the allocated pages
allocator.ReleasePages(requestOrderID)
}

// useful for debug
func printAllocatorContents(allocator *allocator) {
for o, u := range allocator.used {
debug("used order id: %v, values: %+v", o, u)
}
for _, v := range allocator.available {
debug("available, values: %+v", v)
}
}
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (c *Client) nextID() uint32 {
}

func (c *Client) recvVersion() error {
typ, data, err := c.recvPacket()
typ, data, err := c.recvPacket(0)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (
type conn struct {
io.Reader
io.WriteCloser
// this is the same allocator used in packet manager
alloc *allocator
sync.Mutex // used to serialise writes to sendPacket
// sendPacketTest is needed to replicate packet issues in testing
sendPacketTest func(w io.Writer, m encoding.BinaryMarshaler) error
}

func (c *conn) recvPacket() (uint8, []byte, error) {
return recvPacket(c)
// the orderID is used in server mode if the allocator is enabled.
// For the client mode just pass 0
func (c *conn) recvPacket(orderID uint32) (uint8, []byte, error) {
return recvPacket(c, c.alloc, orderID)
}

func (c *conn) sendPacket(m encoding.BinaryMarshaler) error {
Expand Down Expand Up @@ -76,7 +80,7 @@ func (c *clientConn) recv() error {
c.conn.Close()
}()
for {
typ, data, err := c.recvPacket()
typ, data, err := c.recvPacket(0)
if err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions packet-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type packetManager struct {
sender packetSender // connection object
working *sync.WaitGroup
packetCount uint32
// it is not nil if the allocator is enabled
alloc *allocator
}

type packetSender interface {
Expand All @@ -44,6 +46,14 @@ func (s *packetManager) newOrderID() uint32 {
return s.packetCount
}

// returns the next orderID without incrementing it.
// This is used before receiving a new packet, with the allocator enabled, to associate
// the slice allocated for the received packet with the orderID that will be used to mark
// the allocated slices for reuse once the request is served
func (s *packetManager) getNextOrderID() uint32 {
return s.packetCount + 1
}

type orderedRequest struct {
requestPacket
orderid uint32
Expand Down Expand Up @@ -174,6 +184,10 @@ func (s *packetManager) maybeSendPackets() {
if in.orderID() == out.orderID() {
debug("Sending packet: %v", out.id())
s.sender.sendPacket(out.(encoding.BinaryMarshaler))
if s.alloc != nil {
// mark for reuse the slices allocated for this request
s.alloc.ReleasePages(in.orderID())
}
// pop off heads
copy(s.incoming, s.incoming[1:]) // shift left
s.incoming[len(s.incoming)-1] = nil // clear last
Expand Down
Loading