Skip to content

Commit

Permalink
Merge pull request #204 from ipfs/feat/memory_pressure_incoming_messages
Browse files Browse the repository at this point in the history
Back pressure incoming responses
  • Loading branch information
aarshkshah1992 authored Aug 23, 2021
2 parents e1842d1 + 13e0ad8 commit 2532859
Show file tree
Hide file tree
Showing 20 changed files with 1,531 additions and 377 deletions.
24 changes: 22 additions & 2 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure-missing-blocks", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesMissingTopLevelBlock(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
Expand Down Expand Up @@ -173,10 +176,10 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
wg.Add(1)
go func(j int) {
defer wg.Done()
for _ = range responseChan {
for range responseChan {
}
for err := range errChan {
b.Fatalf("received error on request: %s", err.Error())
b.Logf("Error during network traversal: %s", err.Error())
}
}(j)
}
Expand Down Expand Up @@ -302,6 +305,23 @@ func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLeve
}
}

func allFilesMissingTopLevelBlock(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel, useRawNodes)
ds := merkledag.NewDAGService(blockservice.New(prov.BlockStore, offline.Exchange(prov.BlockStore)))
lnks, err := ds.GetLinks(ctx, c)
require.NoError(b, err)
randLink := lnks[rand.Intn(len(lnks))]
err = ds.Remove(ctx, randLink.Cid)
require.NoError(b, err)
cids = append(cids, c)
}
return cids
}
}

type tempDirMaker struct {
tdm string
tempDirSeq int32
Expand Down
50 changes: 36 additions & 14 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ type GraphSync struct {
}

type graphsyncConfigOptions struct {
totalMaxMemory uint64
maxMemoryPerPeer uint64
maxInProgressRequests uint64
registerDefaultValidator bool
totalMaxMemoryResponder uint64
maxMemoryPerPeerResponder uint64
totalMaxMemoryRequestor uint64
maxMemoryPerPeerRequestor uint64
maxInProgressRequests uint64
registerDefaultValidator bool
}

// Option defines the functional option type that can be used to configure
Expand All @@ -85,15 +87,31 @@ func RejectAllRequestsByDefault() Option {
// may consume queueing up messages for a response in total
func MaxMemoryResponder(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemory = totalMaxMemory
gs.totalMaxMemoryResponder = totalMaxMemory
}
}

// MaxMemoryPerPeerResponder defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeer = maxMemoryPerPeer
gs.maxMemoryPerPeerResponder = maxMemoryPerPeer
}
}

// MaxMemoryRequestor defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
func MaxMemoryRequestor(totalMaxMemory uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.totalMaxMemoryRequestor = totalMaxMemory
}
}

// MaxMemoryPerPeerRequestor defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerRequestor(maxMemoryPerPeer uint64) Option {
return func(gs *graphsyncConfigOptions) {
gs.maxMemoryPerPeerRequestor = maxMemoryPerPeer
}
}

Expand All @@ -112,10 +130,12 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ctx, cancel := context.WithCancel(parent)

gsConfig := &graphsyncConfigOptions{
totalMaxMemory: defaultTotalMaxMemory,
maxMemoryPerPeer: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
totalMaxMemoryResponder: defaultTotalMaxMemory,
maxMemoryPerPeerResponder: defaultMaxMemoryPerPeer,
totalMaxMemoryRequestor: defaultTotalMaxMemory,
maxMemoryPerPeerRequestor: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
}
for _, option := range options {
option(gsConfig)
Expand All @@ -136,12 +156,14 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
if gsConfig.registerDefaultValidator {
incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
}
allocator := allocator.NewAllocator(gsConfig.totalMaxMemory, gsConfig.maxMemoryPerPeer)
responseAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryResponder, gsConfig.maxMemoryPerPeerResponder)
createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
return messagequeue.New(ctx, p, network, allocator)
return messagequeue.New(ctx, p, network, responseAllocator)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)

asyncLoader := asyncloader.New(ctx, loader, storer, requestAllocator)
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
Expand Down Expand Up @@ -171,7 +193,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
persistenceOptions: persistenceOptions,
ctx: ctx,
cancel: cancel,
allocator: allocator,
allocator: responseAllocator,
}

asyncLoader.Startup()
Expand Down
62 changes: 37 additions & 25 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/metadata"
Expand All @@ -25,6 +26,12 @@ type alternateQueue struct {
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

// Allocator indicates a mechanism for tracking memory used by a given peer
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleaseBlockMemory(p peer.ID, amount uint64) error
}

// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
type AsyncLoader struct {
Expand All @@ -40,12 +47,13 @@ type AsyncLoader struct {
alternateQueues map[string]alternateQueue
responseCache *responsecache.ResponseCache
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
allocator Allocator
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer)
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer, allocator Allocator) *AsyncLoader {
responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer, allocator)
ctx, cancel := context.WithCancel(ctx)
return &AsyncLoader{
ctx: ctx,
Expand All @@ -59,6 +67,7 @@ func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoad
alternateQueues: make(map[string]alternateQueue),
responseCache: responseCache,
loadAttemptQueue: loadAttemptQueue,
allocator: allocator,
}
}

Expand Down Expand Up @@ -103,8 +112,16 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp

// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
totalMemoryAllocated := uint64(0)
for _, blk := range blks {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated):
case <-al.ctx.Done():
}
select {
case <-al.ctx.Done():
case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
Expand All @@ -113,10 +130,10 @@ func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadat

// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
func (al *AsyncLoader) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
resultChan := make(chan types.AsyncLoadResult, 1)
response := make(chan error, 1)
lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
lr := loadattemptqueue.NewLoadRequest(p, requestID, link, resultChan)
_ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response)
return resultChan
}
Expand Down Expand Up @@ -258,7 +275,7 @@ func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
if existing {
return errors.New("already registerd a persistence option with this name")
}
responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer)
responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer, al.allocator)
al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue}
return nil
}
Expand Down Expand Up @@ -347,32 +364,27 @@ func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
al.responseCache.FinishRequest(crm.requestID)
}

func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {
func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer, allocator Allocator) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

unverifiedBlockStore := unverifiedblockstore.New(storer)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
loadAttemptQueue := loadattemptqueue.New(func(p peer.ID, requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link)
if data == nil && err == nil {
// fall back to local store
stream, loadErr := loader(link, ipld.LinkContext{})
if stream != nil && loadErr == nil {
localData, loadErr := ioutil.ReadAll(stream)
if loadErr == nil && localData != nil {
return types.AsyncLoadResult{
Data: localData,
Err: nil,
Local: true,
}
}
}
if err != nil {
return types.AsyncLoadResult{Err: err, Local: false}
}
return types.AsyncLoadResult{
Data: data,
Err: err,
Local: false,
if data != nil {
allocator.ReleaseBlockMemory(p, uint64(len(data)))
return types.AsyncLoadResult{Data: data, Local: false}
}
// fall back to local store
if stream, err := loader(link, ipld.LinkContext{}); stream != nil && err == nil {
if localData, err := ioutil.ReadAll(stream); err == nil && localData != nil {
return types.AsyncLoadResult{Data: localData, Local: true}
}
}
return types.AsyncLoadResult{Local: false}
})

return responseCache, loadAttemptQueue
Expand Down
Loading

0 comments on commit 2532859

Please sign in to comment.