Skip to content

Commit

Permalink
blockservice & exchange & bitswap: add non variadic NotifyNewBlock
Browse files Browse the repository at this point in the history
Variadicts in go are just syntactic sugar around passing a slice, that
means all go memory reachability rules apply, this force the compiler to
heap allocate the variadic slice for virtual call, because the
implementation is allowed to leak the slice (and go's interprocedural
optimisations do not cover virtuals).

Passing a block without variadic will pass the itab either on the stack
or decomposed through registers. Skipping having to allocate a slice.
  • Loading branch information
Jorropo authored and hacdias committed Apr 5, 2023
1 parent 085ed9d commit 311cd1f
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 7 deletions.
7 changes: 7 additions & 0 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
return bs
}

func (bs *Bitswap) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlock(ctx, blk),
bs.Server.NotifyNewBlock(ctx, blk),
)
}

func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlocks(ctx, blks...),
Expand Down
10 changes: 10 additions & 0 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
return session.GetBlocks(ctx, keys)
}

// NotifyNewBlock announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Client) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
Expand Down
11 changes: 11 additions & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ func (bs *Server) Stat() (Stat, error) {
return s, nil
}

// NotifyNewBlock announces the existence of block to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Server) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
Expand Down
12 changes: 5 additions & 7 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Debugf("BlockService.BlockAdded %s", c)

if s.exchange != nil {
if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
if err := s.exchange.NotifyNewBlock(ctx, o); err != nil {
logger.Errorf("NotifyNewBlock: %s", err.Error())
}
}

Expand Down Expand Up @@ -254,7 +254,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget fun
if err != nil {
return nil, err
}
err = f.NotifyNewBlocks(ctx, blk)
err = f.NotifyNewBlock(ctx, blk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -334,7 +334,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
return
}

var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
select {
Expand All @@ -355,13 +354,11 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget
}

// inform the exchange that the blocks are available
cache[0] = b
err = f.NotifyNewBlocks(ctx, cache[:]...)
err = f.NotifyNewBlock(ctx, b)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}
cache[0] = nil // early gc

select {
case out <- b:
Expand Down Expand Up @@ -391,6 +388,7 @@ func (s *blockService) Close() error {
}

type notifier interface {
NotifyNewBlock(context.Context, blocks.Block) error
NotifyNewBlocks(context.Context, ...blocks.Block) error
}

Expand Down
5 changes: 5 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ type notifyCountingExchange struct {
notifyCount int
}

func (n *notifyCountingExchange) NotifyNewBlock(ctx context.Context, blocks blocks.Block) error {
n.notifyCount++
return n.Interface.NotifyNewBlock(ctx, blocks)
}

func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
n.notifyCount += len(blocks)
return n.Interface.NotifyNewBlocks(ctx, blocks...)
Expand Down
4 changes: 4 additions & 0 deletions examples/gateway/proxy/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (e *proxyExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan b
return ch, nil
}

func (e *proxyExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error {
return e.NotifyNewBlocks(ctx, block)
}

func (e *proxyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Note: while not required this function could be used optimistically to prevent fetching
// of data that the client has retrieved already even though a Get call is in progress.
Expand Down
2 changes: 2 additions & 0 deletions exchange/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type Interface interface { // type Exchanger interface
Fetcher

// NotifyNewBlock tells the exchange that a new block is available and can be served.
NotifyNewBlock(ctx context.Context, blocks blocks.Block) error
// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error

Expand Down
6 changes: 6 additions & 0 deletions exchange/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func (e *offlineExchange) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block
return blk, err
}

// NotifyNewBlock tells the exchange that a new block is available and can be served.
func (e *offlineExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error {
// as an offline exchange we have nothing to do
return nil
}

// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
func (e *offlineExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// as an offline exchange we have nothing to do
Expand Down

0 comments on commit 311cd1f

Please sign in to comment.