Skip to content

Commit

Permalink
Merge #109402
Browse files Browse the repository at this point in the history
109402: colrpc, flowinfra, kvcoord, server: wrap sends and recvs in tracing r=yuzefovich a=michae2

**colrpc, flowinfra: wrap send and recv in tracing**

This commit adds trace events immediately before DistSQL sends and immediately after DistSQL recvs so that we can better determine where the latency is during performance incidents.

Informs: #108790

Epic: None

Release note: None

---

**kvcoord, server: wrap BatchRequest send and recv in tracing**

Add trace events immediately before and after kvclient / kvserver BatchRequest send and receive so that we can better determine where latency is during performance incidents.

Informs: #108790

Epic: None

Release note: None

Co-authored-by: Michael Erickson <[email protected]>
  • Loading branch information
craig[bot] and michae2 committed Sep 8, 2023
2 parents f5dde7a + a4aaa15 commit 4ef8a89
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 11 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func (gt *grpcTransport) sendBatch(
if rpc.IsLocal(iface) {
gt.opts.metrics.LocalSentCount.Inc(1)
}
log.VEvent(ctx, 2, "sending batch request")
reply, err := iface.Batch(ctx, ba)
log.VEvent(ctx, 2, "received batch response")
// If we queried a remote node, perform extra validation.
if reply != nil && !rpc.IsLocal(iface) {
if err == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,7 @@ func (n *Node) batchInternal(
}()
if log.HasSpanOrEvent(ctx) {
log.Eventf(ctx, "node received request: %s", args.Summary())
defer log.Event(ctx, "node sending response")
}

tStart := timeutil.Now()
Expand Down
12 changes: 9 additions & 3 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,14 @@ func (i *Inbox) Next() coldata.Batch {
// Regardless of the cause we want to propagate such an error as
// expected one in all cases so that the caller could decide on how
// to handle it.
log.VEventf(i.Ctx, 2, "Inbox communication error: %v", err)
err = pgerror.Wrap(err, pgcode.InternalConnectionFailure, "inbox communication error")
i.errCh <- err
ungracefulStreamTermination = true
colexecerror.ExpectedError(err)
}
if len(m.Data.Metadata) != 0 {
log.VEvent(i.Ctx, 2, "Inbox received metadata")
// If an error was encountered, it needs to be propagated
// immediately. All other metadata will simply be buffered and
// returned in DrainMeta.
Expand Down Expand Up @@ -385,6 +387,7 @@ func (i *Inbox) Next() coldata.Batch {
// Continue until we get the next batch or EOF.
continue
}
log.VEvent(i.Ctx, 2, "Inbox received batch")
if len(m.Data.RawBytes) == 0 {
// Protect against Deserialization panics by skipping empty messages.
continue
Expand Down Expand Up @@ -477,11 +480,14 @@ func (i *Inbox) DrainMeta() []execinfrapb.ProducerMetadata {
if err == io.EOF {
break
}
if log.V(1) {
log.Warningf(i.Ctx, "Inbox Recv connection error while draining metadata: %+v", err)
}
log.VEventf(i.Ctx, 1, "Inbox communication error while draining metadata: %v", err)
return allMeta
}
if len(msg.Data.Metadata) == 0 {
log.VEvent(i.Ctx, 2, "Inbox received batch while draining metadata, ignoring")
continue
}
log.VEvent(i.Ctx, 2, "Inbox received metadata while draining metadata")
for _, remoteMeta := range msg.Data.Metadata {
meta, ok := execinfrapb.RemoteProducerMetaToLocalMeta(i.Ctx, remoteMeta)
if !ok {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colflow/colrpc/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func (o *Outbox) sendBatches(
// o.scratch.msg can be reused as soon as Send returns since it returns as
// soon as the message is written to the control buffer. The message is
// marshaled (bytes are copied) before writing.
log.VEvent(ctx, 2, "Outbox sending batch")
if err := stream.Send(o.scratch.msg); err != nil {
flowinfra.HandleStreamErr(ctx, "Send (batches)", err, flowCtxCancel, outboxCtxCancel)
return
Expand Down Expand Up @@ -360,6 +361,7 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT
if len(msg.Data.Metadata) == 0 {
return nil
}
log.VEvent(ctx, 2, "Outbox sending metadata")
return stream.Send(msg)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,11 +713,13 @@ func (ds *ServerImpl) flowStreamInt(
if err == io.EOF {
return errors.AssertionFailedf("missing header message")
}
log.VEventf(ctx, 2, "FlowStream (server) error while receiving header: %v", err)
return err
}
if msg.Header == nil {
return errors.AssertionFailedf("no header in first message")
}
log.VEvent(ctx, 2, "FlowStream (server) received header")
flowID := msg.Header.FlowID
streamID := msg.Header.StreamID
if log.V(1) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/flowinfra/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func processInboundStreamHelper(
if err != nil {
if err != io.EOF {
// Communication error.
log.VEventf(ctx, 2, "Inbox communication error: %v", err)
err = pgerror.Wrap(err, pgcode.InternalConnectionFailure, "inbox communication error")
sendErrToConsumer(err)
errChan <- err
Expand All @@ -152,6 +153,7 @@ func processInboundStreamHelper(
return
}

log.VEvent(ctx, 2, "Inbox received message")
if res := processProducerMessage(
ctx, f, stream, dst, &sd, &draining, msg,
); res.err != nil || res.consumerClosed {
Expand Down
12 changes: 4 additions & 8 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,7 @@ func (m *Outbox) flush(ctx context.Context) error {
}
msg := m.encoder.FormMessage(ctx)

if log.V(3) {
log.Infof(ctx, "flushing outbox")
}
log.VEvent(ctx, 2, "Outbox flushing")
sendErr := m.stream.Send(msg)
if m.statsCollectionEnabled {
m.streamStats.NetTx.BytesSent.Add(int64(msg.Size()))
Expand All @@ -191,11 +189,9 @@ func (m *Outbox) flush(ctx context.Context) error {
HandleStreamErr(ctx, "flushing", sendErr, m.flowCtxCancel, m.outboxCtxCancel)
// Make sure the stream is not used any more.
m.stream = nil
if log.V(1) {
log.Errorf(ctx, "outbox flush error: %s", sendErr)
}
} else if log.V(3) {
log.Infof(ctx, "outbox flushed")
log.VErrEventf(ctx, 1, "Outbox flush error: %s", sendErr)
} else {
log.VEvent(ctx, 2, "Outbox flushed")
}
if sendErr != nil {
return sendErr
Expand Down

0 comments on commit 4ef8a89

Please sign in to comment.