Skip to content

Commit

Permalink
add tracing to routing
Browse files Browse the repository at this point in the history
- partially addresses ipfs/kubo#5783

License: MIT
Signed-off-by: frrist <[email protected]>
  • Loading branch information
frrist committed Nov 19, 2018
1 parent d70e927 commit 2e9d3ed
Showing 1 changed file with 41 additions and 39 deletions.
80 changes: 41 additions & 39 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

cid "github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
inet "github.com/libp2p/go-libp2p-net"
Expand All @@ -36,14 +35,9 @@ var asyncQueryBuffer = 10
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
eip := log.EventBegin(ctx, "PutValue")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
ctx = log.Start(ctx, "PutValue")
log.SetTag(ctx, "key", key)
defer func() { log.FinishWithErr(ctx, err) }()
log.Debugf("PutValue %s", key)

// don't even allow local users to put bad values.
Expand Down Expand Up @@ -111,14 +105,9 @@ type RecvdVal struct {

// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
eip := log.EventBegin(ctx, "GetValue")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
ctx = log.Start(ctx, "GetValue")
log.SetTag(ctx, "key", key)
defer func() { log.FinishWithErr(ctx, err) }()

// apply defaultQuorum if relevant
var cfg ropts.Options
Expand Down Expand Up @@ -165,8 +154,13 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O
}

out := make(chan []byte)
ctx = log.Start(ctx, "SearchValue")
log.SetTag(ctx, "key", key)
go func() {
defer close(out)
defer func() {
close(out)
log.Finish(ctx)
}()

maxVals := responsesNeeded
if maxVals < 0 {
Expand Down Expand Up @@ -251,14 +245,17 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O

// GetValues gets nvals values corresponding to the given key.
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
eip := log.EventBegin(ctx, "GetValues")

eip.Append(loggableKey(key))
defer eip.Done()
ctx = log.Start(ctx, "GetValues")
log.SetTags(ctx, map[string]interface{}{
"key": key,
"nvals": nvals,
})
defer func() {
log.FinishWithErr(ctx, err)
}()

valCh, err := dht.getValues(ctx, key, nvals)
if err != nil {
eip.SetError(err)
return nil, err
}

Expand Down Expand Up @@ -396,13 +393,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha

// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
ctx = log.Start(ctx, "Provide")
log.SetTags(ctx, map[string]interface{}{
"key": key.String(),
"broadcast": brdcst,
})
defer func() { log.FinishWithErr(ctx, err) }()

// add self locally
dht.providers.AddProvider(ctx, key, dht.self)
Expand Down Expand Up @@ -472,8 +468,15 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
}

func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan pstore.PeerInfo) {
defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
defer close(peerOut)
ctx = log.Start(ctx, "findProvidersAsyncRoutine")
log.SetTags(ctx, map[string]interface{}{
"key": key.String(),
"count": count,
})
defer func() {
close(peerOut)
log.Finish(ctx)
}()

ps := pset.NewLimited(count)
provs := dht.providers.GetProviders(ctx, key)
Expand Down Expand Up @@ -568,13 +571,9 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,

// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
eip := log.EventBegin(ctx, "FindPeer", id)
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
ctx = log.Start(ctx, "FindPeer")
log.SetTag(ctx, "id", id.String())
defer func() { log.FinishWithErr(ctx, err) }()

// Check if were already connected to them
if pi := dht.FindLocal(id); pi.ID != "" {
Expand Down Expand Up @@ -698,13 +697,16 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<

// run it! run it asynchronously to gen peers as results are found.
// this does no error checking
ctx = log.Start(ctx, "FindPeersConnectedToPeer")
log.SetTag(ctx, "id", id.String())
go func() {
if _, err := query.Run(ctx, peers); err != nil {
log.Debug(err)
}

// close the peerchan channel when done.
close(peerchan)
log.Finish(ctx)
}()

return peerchan, nil
Expand Down

0 comments on commit 2e9d3ed

Please sign in to comment.