Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: option to not read size of blocks for want-have requests #672

Merged
merged 10 commits into from
Sep 27, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes:

### Added

* `boxo/bitswap/server`:
* A new `WithWantHaveReplaceSize(n)` option can be used with `bitswap.New`. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests.

### Changed

### Removed
Expand Down
8 changes: 8 additions & 0 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@
return Option{server.WithTaskComparator(comparator)}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which the bitswap server will replace a WantHave with a WantBlock response.
// Setting this to 0 disables this WantHave replacement and means that block
// sizes are not read when processing WantHave requests.
func WithWantHaveReplaceSize(size int) Option {
return Option{server.WithWantHaveReplaceSize(size)}

Check warning on line 79 in bitswap/options.go

View check run for this annotation

Codecov / codecov/patch

bitswap/options.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return Option{client.ProviderSearchDelay(newProvSearchDelay)}
}
Expand Down
36 changes: 36 additions & 0 deletions bitswap/server/internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,42 @@
return res, nil
}

func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) {
if len(ks) == 0 {
return nil, nil
}

Check warning on line 127 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L126-L127

Added lines #L126 - L127 were not covered by tests
hasBlocks := make([]bool, len(ks))

var count atomic.Int32
err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) {
has, err := bsm.bs.Has(ctx, c)
if err != nil {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
return
}

Check warning on line 137 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L134-L137

Added lines #L134 - L137 were not covered by tests
if has {
hasBlocks[i] = true
count.Add(1)
}
})
if err != nil {
return nil, err
}

Check warning on line 145 in bitswap/server/internal/decision/blockstoremanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/blockstoremanager.go#L144-L145

Added lines #L144 - L145 were not covered by tests
results := count.Load()
if results == 0 {
return nil, nil
}

res := make(map[cid.Cid]struct{}, results)
for i, ok := range hasBlocks {
if ok {
res[ks[i]] = struct{}{}
}
}
return res, nil
}

func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
if len(ks) == 0 {
return nil, nil
Expand Down
15 changes: 4 additions & 11 deletions bitswap/server/internal/decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}

sizes, err := bsm.getBlockSizes(ctx, cids)
hasBlocks, err := bsm.hasBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
if len(hasBlocks) != len(blks)-1 {
t.Fatal("Wrong response length")
}

for _, c := range cids {
expSize := len(exp[c].RawData())
size, ok := sizes[c]

// Only the last key should be missing
_, ok := hasBlocks[c]
if c.Equals(cids[len(cids)-1]) {
if ok {
t.Fatal("Non-existent block should not be in sizes map")
}
} else {
if !ok {
t.Fatal("Block should be in sizes map")
}
if size != expSize {
t.Fatal("Block has wrong size")
t.Fatal("Block should be in hasBlocks")
}
}
}
Expand Down
132 changes: 79 additions & 53 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@
// on their behalf.
queuedTagWeight = 10

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock = 1024
// defaultWantHave ReplaceSize is the default maximum size of the block in
// bytes up to which we will replace a WantHave with a WantBlock response.
defaultWantHaveReplaceSize = 1024
)

// Envelope contains a message for a Peer.
Expand Down Expand Up @@ -202,9 +202,9 @@

targetMessageSize int

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
// wantHaveReplaceSize is the maximum size of the block in bytes up to
// which to replace a WantHave with a WantBlock.
wantHaveReplaceSize int

sendDontHaves bool

Expand Down Expand Up @@ -343,6 +343,14 @@
}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which to replace a WantHave with a WantBlock response.
func WithWantHaveReplaceSize(size int) Option {
return func(e *Engine) {
e.wantHaveReplaceSize = size
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
Expand All @@ -369,32 +377,14 @@
}

// NewEngine creates a new block sending engine for the given block store.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
// work already outstanding.
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
// more tasks if it has some maximum work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxBlockSizeReplaceHasWithBlock,
opts...,
)
}

func newEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
opts ...Option,
) *Engine {
e := &Engine{
scoreLedger: NewDefaultScoreLedger(),
Expand All @@ -404,7 +394,7 @@
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
wantHaveReplaceSize: defaultWantHaveReplaceSize,
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
sendDontHaves: true,
self: self,
Expand Down Expand Up @@ -445,6 +435,12 @@

e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...)

if e.wantHaveReplaceSize == 0 {
log.Info("Replace WantHave with WantBlock is disabled")
} else {
log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize)
}

return e
}

Expand Down Expand Up @@ -689,16 +685,38 @@
return true
}

noReplace := e.wantHaveReplaceSize == 0

// Get block sizes for unique CIDs.
wantKs := cid.NewSet()
wantKs := make([]cid.Cid, 0, len(wants))
var haveKs []cid.Cid
for _, entry := range wants {
wantKs.Add(entry.Cid)
if noReplace && entry.WantType == pb.Message_Wantlist_Have {
haveKs = append(haveKs, entry.Cid)
} else {
wantKs = append(wantKs, entry.Cid)
}
}
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}
if len(haveKs) != 0 {
hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs)
if err != nil {
log.Info("aborting message processing", err)
return false
}

Check warning on line 710 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L708-L710

Added lines #L708 - L710 were not covered by tests
if len(hasBlocks) != 0 {
if blockSizes == nil {
blockSizes = make(map[cid.Cid]int, len(hasBlocks))
}
for blkCid := range hasBlocks {
blockSizes[blkCid] = 0
}
}
}

e.lock.Lock()

Expand All @@ -707,20 +725,7 @@
}

var overflow []bsmsg.Entry
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
wants = filteredWants
}
wants, overflow = e.filterOverflow(p, wants, overflow)

if len(overflow) != 0 {
log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow))
Expand Down Expand Up @@ -764,7 +769,7 @@
sendDontHave(entry)
}

// For each want-have / want-block
// For each want-block
for _, entry := range wants {
c := entry.Cid
blockSize, found := blockSizes[c]
Expand All @@ -776,7 +781,10 @@
continue
}
// The block was found, add it to the queue
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

// Check if this is a want-block or a have-block that can be converted
// to a want-block.
isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize)

log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)

Expand Down Expand Up @@ -810,6 +818,25 @@
return false
}

func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if len(wants) == 0 {
return wants, overflow
}

filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
return filteredWants, overflow
}

// handleOverflow processes incoming wants that could not be addded to the peer
// ledger without exceeding the peer want limit. These are handled by trying to
// make room by canceling existing wants for which there is no block. If this
Expand Down Expand Up @@ -913,17 +940,17 @@
continue
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

// Do not take more wants that can be handled.
if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) {
wants = append(wants, et)
Expand Down Expand Up @@ -1057,8 +1084,7 @@
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
isWantBlock := wantType == pb.Message_Wantlist_Block
return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
return wantType == pb.Message_Wantlist_Block || blockSize <= e.wantHaveReplaceSize
}

func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Expand Down
12 changes: 3 additions & 9 deletions bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,11 @@ func newEngineForTesting(
bs blockstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
wantHaveReplaceSize int,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
maxReplaceSize,
opts...,
)
opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize))
return NewEngine(ctx, bs, peerTagger, self, opts...)
}

func TestOutboxClosedWhenEngineClosed(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@
}
}

// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
// which to replace a WantHave with a WantBlock. Setting to 0 disables this
// WantHave replacement and means that block sizes are not read when processing
// WantHave requests.
func WithWantHaveReplaceSize(size int) Option {
if size < 0 {
size = 0
}
return func(bs *Server) {
bs.engineOptions = append(bs.engineOptions, decision.WithWantHaveReplaceSize(size))
}

Check warning on line 264 in bitswap/server/server.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/server.go#L258-L264

Added lines #L258 - L264 were not covered by tests
}

// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid {
Expand Down