diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 1d22b5124413..566a834715e0 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "net" + "runtime/pprof" "time" "unsafe" @@ -373,10 +374,7 @@ func (t *RaftTransport) Stop(storeID roachpb.StoreID) { // lost and a new instance of processQueue will be started by the next message // to be sent. func (t *RaftTransport) processQueue( - nodeID roachpb.NodeID, - ch chan *kvserverpb.RaftMessageRequest, - stream MultiRaft_RaftMessageBatchClient, - class rpc.ConnectionClass, + ch chan *kvserverpb.RaftMessageRequest, stream MultiRaft_RaftMessageBatchClient, ) error { errCh := make(chan error, 1) @@ -566,11 +564,14 @@ func (t *RaftTransport) startProcessNewQueue( return } - if err := t.processQueue(toNodeID, ch, stream, class); err != nil { + if err := t.processQueue(ch, stream); err != nil { log.Warningf(ctx, "while processing outgoing Raft queue to node %d: %s:", toNodeID, err) } } - err := t.stopper.RunAsyncTask(ctx, "storage.RaftTransport: sending messages", worker) + err := t.stopper.RunAsyncTask(ctx, "storage.RaftTransport: sending/receiving messages", + func(ctx context.Context) { + pprof.Do(ctx, pprof.Labels("remote_node_id", toNodeID.String()), worker) + }) if err != nil { t.queues[class].Delete(int64(toNodeID)) return false