Skip to content

Commit

Permalink
feat: option to not read size of blocks for want-have requests (ipfs#672
Browse files Browse the repository at this point in the history
)

* Do not fetch size for WantHave blocks
* Option to set replaceHasWithBlockMaxSize
* Log if replace logic enabled/disabled
* docs: WithWantHaveReplaceSize

---------

Co-authored-by: Marcin Rataj <[email protected]>
  • Loading branch information
2 people authored and wenyue committed Oct 17, 2024
1 parent facb2ab commit 35e3375
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 74 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:

### Added

* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
- `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671)

### Changed
Expand Down
3 changes: 3 additions & 0 deletions bitswap/internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ const (
// RebroadcastDelay is the default delay to trigger broadcast of
// random CIDs in the wantlist.
RebroadcastDelay = time.Minute

// DefaultWantHaveReplaceSize controls the implicit behavior of WithWantHaveReplaceSize.
DefaultWantHaveReplaceSize = 1024
)
7 changes: 7 additions & 0 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func WithTaskComparator(comparator server.TaskComparator) Option {
return Option{server.WithTaskComparator(comparator)}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which the bitswap server will replace a WantHave with a WantBlock response.
// See [server.WithWantHaveReplaceSize] for details.
func WithWantHaveReplaceSize(size int) Option {
return Option{server.WithWantHaveReplaceSize(size)}
}

func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return Option{client.ProviderSearchDelay(newProvSearchDelay)}
}
Expand Down
36 changes: 36 additions & 0 deletions bitswap/server/internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,42 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (
return res, nil
}

func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) {
if len(ks) == 0 {
return nil, nil
}
hasBlocks := make([]bool, len(ks))

var count atomic.Int32
err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) {
has, err := bsm.bs.Has(ctx, c)
if err != nil {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.Has(%c) error: %s", c, err)
return
}
if has {
hasBlocks[i] = true
count.Add(1)
}
})
if err != nil {
return nil, err
}
results := count.Load()
if results == 0 {
return nil, nil
}

res := make(map[cid.Cid]struct{}, results)
for i, ok := range hasBlocks {
if ok {
res[ks[i]] = struct{}{}
}
}
return res, nil
}

func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
if len(ks) == 0 {
return nil, nil
Expand Down
15 changes: 4 additions & 11 deletions bitswap/server/internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}

sizes, err := bsm.getBlockSizes(ctx, cids)
hasBlocks, err := bsm.hasBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
if len(hasBlocks) != len(blks)-1 {
t.Fatal("Wrong response length")
}

for _, c := range cids {
expSize := len(exp[c].RawData())
size, ok := sizes[c]

// Only the last key should be missing
_, ok := hasBlocks[c]
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in sizes map")
}
} else {
if !ok {
t.Fatal("Block should be in sizes map")
}
if size != expSize {
t.Fatal("Block has wrong size")
t.Fatal("Block should be in hasBlocks")
}
}
}
Expand Down
130 changes: 76 additions & 54 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ const (
// queuedTagWeight is the default weight for peers that have work queued
// on their behalf.
queuedTagWeight = 10

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock = 1024
)

// Envelope contains a message for a Peer.
Expand Down Expand Up @@ -202,9 +198,9 @@ type Engine struct {

targetMessageSize int

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
// wantHaveReplaceSize is the maximum size of the block in bytes up to
// which to replace a WantHave with a WantBlock.
wantHaveReplaceSize int

sendDontHaves bool

Expand Down Expand Up @@ -343,6 +339,14 @@ func WithSetSendDontHave(send bool) Option {
}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which to replace a WantHave with a WantBlock response.
func WithWantHaveReplaceSize(size int) Option {
return func(e *Engine) {
e.wantHaveReplaceSize = size
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
Expand All @@ -369,32 +373,14 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
}

// NewEngine creates a new block sending engine for the given block store.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
// work already outstanding.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
// more tasks if it has some maximum work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxBlockSizeReplaceHasWithBlock,
opts...,
)
}

func newEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
opts ...Option,
) *Engine {
e := &Engine{
scoreLedger: NewDefaultScoreLedger(),
Expand All @@ -404,7 +390,7 @@ func newEngine(
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize,
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
sendDontHaves: true,
self: self,
Expand Down Expand Up @@ -445,6 +431,12 @@ func newEngine(

e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...)

if e.wantHaveReplaceSize == 0 {
log.Info("Replace WantHave with WantBlock is disabled")
} else {
log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize)
}

return e
}

Expand Down Expand Up @@ -689,16 +681,38 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
return true
}

noReplace := e.wantHaveReplaceSize == 0

// Get block sizes for unique CIDs.
wantKs := cid.NewSet()
wantKs := make([]cid.Cid, 0, len(wants))
var haveKs []cid.Cid
for _, entry := range wants {
wantKs.Add(entry.Cid)
if noReplace && entry.WantType == pb.Message_Wantlist_Have {
haveKs = append(haveKs, entry.Cid)
} else {
wantKs = append(wantKs, entry.Cid)
}
}
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}
if len(haveKs) != 0 {
hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}
if len(hasBlocks) != 0 {
if blockSizes == nil {
blockSizes = make(map[cid.Cid]int, len(hasBlocks))
}
for blkCid := range hasBlocks {
blockSizes[blkCid] = 0
}
}
}

e.lock.Lock()

Expand All @@ -707,20 +721,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
}

var overflow []bsmsg.Entry
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
wants = filteredWants
}
wants, overflow = e.filterOverflow(p, wants, overflow)

if len(overflow) != 0 {
log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow))
Expand Down Expand Up @@ -764,7 +765,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
sendDontHave(entry)
}

// For each want-have / want-block
// For each want-block
for _, entry := range wants {
c := entry.Cid
blockSize, found := blockSizes[c]
Expand All @@ -776,7 +777,10 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
continue
}
// The block was found, add it to the queue
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

// Check if this is a want-block or a have-block that can be converted
// to a want-block.
isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize)

log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)

Expand Down Expand Up @@ -810,6 +814,25 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
return false
}

func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if len(wants) == 0 {
return wants, overflow
}

filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
return filteredWants, overflow
}

// handleOverflow processes incoming wants that could not be addded to the peer
// ledger without exceeding the peer want limit. These are handled by trying to
// make room by canceling existing wants for which there is no block. If this
Expand Down Expand Up @@ -913,17 +936,17 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
continue
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

// Do not take more wants that can be handled.
if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) {
wants = append(wants, et)
Expand Down Expand Up @@ -1057,8 +1080,7 @@ func (e *Engine) PeerDisconnected(p peer.ID) {
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
isWantBlock := wantType == pb.Message_Wantlist_Block
return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
return wantType == pb.Message_Wantlist_Block || blockSize <= e.wantHaveReplaceSize
}

func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Expand Down
12 changes: 3 additions & 9 deletions bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,11 @@ func newEngineForTesting(
bs blockstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
wantHaveReplaceSize int,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxReplaceSize,
opts...,
)
opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize))
return NewEngine(ctx, bs, peerTagger, self, opts...)
}

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
Expand Down
Loading

0 comments on commit 35e3375

Please sign in to comment.