From 2e9d3ed75c4d275aa91ebb2db87dad7e8e44db3d Mon Sep 17 00:00:00 2001 From: frrist Date: Mon, 19 Nov 2018 10:38:26 -0800 Subject: [PATCH] add tracing to routing - partially addresses https://github.com/ipfs/go-ipfs/issues/5783 License: MIT Signed-off-by: frrist --- routing.go | 80 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/routing.go b/routing.go index c777d2d53..bf556812e 100644 --- a/routing.go +++ b/routing.go @@ -10,7 +10,6 @@ import ( cid "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" - logging "github.com/ipfs/go-log" pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" inet "github.com/libp2p/go-libp2p-net" @@ -36,14 +35,9 @@ var asyncQueryBuffer = 10 // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) { - eip := log.EventBegin(ctx, "PutValue") - defer func() { - eip.Append(loggableKey(key)) - if err != nil { - eip.SetError(err) - } - eip.Done() - }() + ctx = log.Start(ctx, "PutValue") + log.SetTag(ctx, "key", key) + defer func() { log.FinishWithErr(ctx, err) }() log.Debugf("PutValue %s", key) // don't even allow local users to put bad values. @@ -111,14 +105,9 @@ type RecvdVal struct { // GetValue searches for the value corresponding to given Key. func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) { - eip := log.EventBegin(ctx, "GetValue") - defer func() { - eip.Append(loggableKey(key)) - if err != nil { - eip.SetError(err) - } - eip.Done() - }() + ctx = log.Start(ctx, "GetValue") + log.SetTag(ctx, "key", key) + defer func() { log.FinishWithErr(ctx, err) }() // apply defaultQuorum if relevant var cfg ropts.Options @@ -165,8 +154,13 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O } out := make(chan []byte) + ctx = log.Start(ctx, "SearchValue") + log.SetTag(ctx, "key", key) go func() { - defer close(out) + defer func() { + close(out) + log.Finish(ctx) + }() maxVals := responsesNeeded if maxVals < 0 { @@ -251,14 +245,17 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O // GetValues gets nvals values corresponding to the given key. func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { - eip := log.EventBegin(ctx, "GetValues") - - eip.Append(loggableKey(key)) - defer eip.Done() + ctx = log.Start(ctx, "GetValues") + log.SetTags(ctx, map[string]interface{}{ + "key": key, + "nvals": nvals, + }) + defer func() { + log.FinishWithErr(ctx, err) + }() valCh, err := dht.getValues(ctx, key, nvals) if err != nil { - eip.SetError(err) return nil, err } @@ -396,13 +393,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { - eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst}) - defer func() { - if err != nil { - eip.SetError(err) - } - eip.Done() - }() + ctx = log.Start(ctx, "Provide") + log.SetTags(ctx, map[string]interface{}{ + "key": key.String(), + "broadcast": brdcst, + }) + defer func() { log.FinishWithErr(ctx, err) }() // add self locally dht.providers.AddProvider(ctx, key, dht.self) @@ -472,8 +468,15 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i } func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan pstore.PeerInfo) { - defer log.EventBegin(ctx, "findProvidersAsync", key).Done() - defer close(peerOut) + ctx = log.Start(ctx, "findProvidersAsyncRoutine") + log.SetTags(ctx, map[string]interface{}{ + "key": key.String(), + "count": count, + }) + defer func() { + close(peerOut) + log.Finish(ctx) + }() ps := pset.NewLimited(count) provs := dht.providers.GetProviders(ctx, key) @@ -568,13 +571,9 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) { - eip := log.EventBegin(ctx, "FindPeer", id) - defer func() { - if err != nil { - eip.SetError(err) - } - eip.Done() - }() + ctx = log.Start(ctx, "FindPeer") + log.SetTag(ctx, "id", id.String()) + defer func() { log.FinishWithErr(ctx, err) }() // Check if were already connected to them if pi := dht.FindLocal(id); pi.ID != "" { @@ -698,6 +697,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< // run it! run it asynchronously to gen peers as results are found. // this does no error checking + ctx = log.Start(ctx, "FindPeersConnectedToPeer") + log.SetTag(ctx, "id", id.String()) go func() { if _, err := query.Run(ctx, peers); err != nil { log.Debug(err) @@ -705,6 +706,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< // close the peerchan channel when done. close(peerchan) + log.Finish(ctx) }() return peerchan, nil