Skip to content

Commit

Permalink
Gather graphsync metrics on provider side as well
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Oct 19, 2021
1 parent 32a855b commit d2e9d21
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 38 deletions.
79 changes: 41 additions & 38 deletions node/modules/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"context"
"time"

"go.opencensus.io/stats"
"go.uber.org/fx"

"github.com/ipfs/go-graphsync"
graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"go.opencensus.io/stats"
"go.uber.org/fx"

"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/config"
Expand Down Expand Up @@ -56,43 +55,47 @@ func Graphsync(parallelTransfersForStorage uint64, parallelTransfersForRetrieval
}
})

stopStats := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go func() {
t := time.NewTicker(10 * time.Second)
for {
select {
case <-t.C:
graphsyncStats(mctx, lc, gs)

return gs, nil
}
}

func graphsyncStats(mctx helpers.MetricsCtx, lc fx.Lifecycle, gs dtypes.Graphsync) {
stopStats := make(chan struct{})
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go func() {
t := time.NewTicker(10 * time.Second)
for {
select {
case <-t.C:

st := gs.Stats()
stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations)))
st := gs.Stats()
stats.Record(mctx, metrics.GraphsyncReceivingPeersCount.M(int64(st.OutgoingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingActiveCount.M(int64(st.OutgoingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncReceivingCountCount.M(int64(st.OutgoingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalMemoryAllocated.M(int64(st.IncomingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncReceivingTotalPendingAllocations.M(int64(st.IncomingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncReceivingPeersPending.M(int64(st.IncomingResponses.NumPeersWithPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersCount.M(int64(st.IncomingRequests.TotalPeers)))
stats.Record(mctx, metrics.GraphsyncSendingActiveCount.M(int64(st.IncomingRequests.Active)))
stats.Record(mctx, metrics.GraphsyncSendingCountCount.M(int64(st.IncomingRequests.Pending)))
stats.Record(mctx, metrics.GraphsyncSendingTotalMemoryAllocated.M(int64(st.OutgoingResponses.TotalAllocatedAllPeers)))
stats.Record(mctx, metrics.GraphsyncSendingTotalPendingAllocations.M(int64(st.OutgoingResponses.TotalPendingAllocations)))
stats.Record(mctx, metrics.GraphsyncSendingPeersPending.M(int64(st.OutgoingResponses.NumPeersWithPendingAllocations)))

case <-stopStats:
return
}
case <-stopStats:
return
}
}()
}
}()

return nil
},
OnStop: func(ctx context.Context) error {
close(stopStats)
return nil
},
})

return gs, nil
}
return nil
},
OnStop: func(ctx context.Context) error {
close(stopStats)
return nil
},
})
}
2 changes: 2 additions & 0 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ func StagingGraphsync(parallelTransfersForStorage uint64, parallelTransfersForRe
graphsyncimpl.MaxLinksPerIncomingRequests(config.MaxTraversalLinks),
graphsyncimpl.MaxLinksPerOutgoingRequests(config.MaxTraversalLinks))

graphsyncStats(mctx, lc, gs)

return gs
}
}
Expand Down

0 comments on commit d2e9d21

Please sign in to comment.