Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove entries from wantlists when their related requests are cancelled #3182

Merged
merged 2 commits into from
Sep 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
flags "github.com/ipfs/go-ipfs/flags"
"github.com/ipfs/go-ipfs/thirdparty/delay"
loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
Expand Down Expand Up @@ -88,7 +87,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
process: px,
newBlocks: make(chan blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize),
Expand Down Expand Up @@ -131,7 +130,7 @@ type Bitswap struct {
notifications notifications.PubSub

// send keys to a worker to find and connect to providers for them
findKeys chan *wantlist.Entry
findKeys chan *blockRequest

engine *decision.Engine

Expand All @@ -148,8 +147,8 @@ type Bitswap struct {
}

type blockRequest struct {
key key.Key
ctx context.Context
Key key.Key
Ctx context.Context
}

// GetBlock attempts to retrieve a particular block from peers within the
Expand Down Expand Up @@ -235,13 +234,50 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
req := &wantlist.Entry{
req := &blockRequest{
Key: keys[0],
Ctx: ctx,
}

remaining := make(map[key.Key]struct{})
for _, k := range keys {
remaining[k] = struct{}{}
}

out := make(chan blocks.Block)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(out)
defer func() {
var toCancel []key.Key
for k, _ := range remaining {
toCancel = append(toCancel, k)
}
bs.CancelWants(toCancel)
}()
for {
select {
case blk, ok := <-promise:
if !ok {
return
}

delete(remaining, blk.Key())
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()

select {
case bs.findKeys <- req:
return promise, nil
return out, nil
case <-ctx.Done():
return nil, ctx.Err()
}
Expand Down
78 changes: 73 additions & 5 deletions exchange/bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ func TestDoubleGet(t *testing.T) {
blocks := bg.Blocks(1)

ctx1, cancel1 := context.WithCancel(context.Background())

blkch1, err := instances[1].Exchange.GetBlocks(ctx1, []key.Key{blocks[0].Key()})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -362,11 +361,15 @@ func TestDoubleGet(t *testing.T) {
t.Fatal(err)
}

blk, ok := <-blkch2
if !ok {
t.Fatal("expected to get the block here")
select {
case blk, ok := <-blkch2:
if !ok {
t.Fatal("expected to get the block here")
}
t.Log(blk)
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting on block")
}
t.Log(blk)

for _, inst := range instances {
err := inst.Exchange.Close()
Expand All @@ -375,3 +378,68 @@ func TestDoubleGet(t *testing.T) {
}
}
}

func TestWantlistCleanup(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()

instances := sg.Instances(1)[0]
bswap := instances.Exchange
blocks := bg.Blocks(20)

var keys []key.Key
for _, b := range blocks {
keys = append(keys, b.Key())
}

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := bswap.GetBlock(ctx, keys[0])
if err != context.DeadlineExceeded {
t.Fatal("shouldnt have fetched any blocks")
}

time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err = bswap.GetBlocks(ctx, keys[:10])
if err != nil {
t.Fatal(err)
}

<-ctx.Done()
time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

_, err = bswap.GetBlocks(context.Background(), keys[:1])
if err != nil {
t.Fatal(err)
}

ctx, cancel = context.WithCancel(context.Background())
_, err = bswap.GetBlocks(ctx, keys[10:])
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) != 11 {
t.Fatal("should have 11 keys in wantlist")
}

cancel()
time.Sleep(time.Millisecond * 50)
if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
t.Fatal("should only have keys[0] in wantlist")
}
}
2 changes: 1 addition & 1 deletion exchange/bitswap/decision/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
q.Push(&wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
}
}
4 changes: 2 additions & 2 deletions exchange/bitswap/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
return e
}

func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) {
e.lock.Lock()
partner, ok := e.ledgerMap[p]
if ok {
Expand Down Expand Up @@ -218,7 +218,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {

for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("cancel %s", entry.Key)
log.Debugf("%s cancel %s", p, entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (l *ledger) CancelWant(k key.Key) {
l.wantList.Remove(k)
}

func (l *ledger) WantListContains(k key.Key) (wl.Entry, bool) {
func (l *ledger) WantListContains(k key.Key) (*wl.Entry, bool) {
return l.wantList.Contains(k)
}

Expand Down
6 changes: 3 additions & 3 deletions exchange/bitswap/decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(entry wantlist.Entry, to peer.ID)
Push(entry *wantlist.Entry, to peer.ID)
Remove(k key.Key, p peer.ID)

// NB: cannot expose simply expose taskQueue.Len because trashed elements
Expand Down Expand Up @@ -45,7 +45,7 @@ type prq struct {
}

// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
tl.lock.Lock()
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
Expand Down Expand Up @@ -166,7 +166,7 @@ func (tl *prq) thawRound() {
}

type peerRequestTask struct {
Entry wantlist.Entry
Entry *wantlist.Entry
Target peer.ID

// A callback to signal that this task has been completed
Expand Down
10 changes: 5 additions & 5 deletions exchange/bitswap/decision/peer_request_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPushPop(t *testing.T) {
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
letter := alphabet[index]
t.Log(partner.String())
prq.Push(wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
prq.Push(&wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
}
for _, consonant := range consonants {
prq.Remove(key.Key(consonant), partner)
Expand Down Expand Up @@ -78,10 +78,10 @@ func TestPeerRepeats(t *testing.T) {
// Have each push some blocks

for i := 0; i < 5; i++ {
prq.Push(wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(wantlist.Entry{Key: key.Key(i)}, d)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, a)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, b)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, c)
prq.Push(&wantlist.Entry{Key: key.Key(i)}, d)
}

// now, pop off four entries, there should be one from each
Expand Down
4 changes: 2 additions & 2 deletions exchange/bitswap/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newMsg(full bool) *impl {
}

type Entry struct {
wantlist.Entry
*wantlist.Entry
Cancel bool
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
e.Cancel = cancel
} else {
m.wantlist[k] = Entry{
Entry: wantlist.Entry{
Entry: &wantlist.Entry{
Key: k,
Priority: priority,
},
Expand Down
Loading