diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 4372770e0b88..945b5e7a0a12 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -1382,6 +1382,11 @@ func (w *workerCoordinator) performRequestAsync( // unnecessary blocking (due to sequential evaluation of sub-batches // by the DistSender). For the initial implementation it doesn't // seem important though. + + // Note that we don't add a separate log.VEventf here before calling + // Send since we create a separate tracing span for each async + // request which is sufficient to highlight where the handoff from + // SQL occurred. br, pErr := w.txn.Send(ctx, ba) if pErr != nil { // TODO(yuzefovich): if err is diff --git a/pkg/sql/delete_range.go b/pkg/sql/delete_range.go index e599bbb13d06..b0036475f2ad 100644 --- a/pkg/sql/delete_range.go +++ b/pkg/sql/delete_range.go @@ -122,6 +122,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { b.Header.MaxSpanRequestKeys = row.TableTruncateChunkSize b.Header.LockTimeout = params.SessionData().LockTimeout d.deleteSpans(params, b, spans) + log.VEventf(ctx, 2, "fast delete: processing %d spans", len(spans)) if err := params.p.txn.Run(ctx, b); err != nil { return row.ConvertBatchError(ctx, d.desc, b) } @@ -143,6 +144,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { b := params.p.txn.NewBatch() b.Header.LockTimeout = params.SessionData().LockTimeout d.deleteSpans(params, b, spans) + log.VEventf(ctx, 2, "fast delete: processing %d spans and committing", len(spans)) if err := params.p.txn.CommitInBatch(ctx, b); err != nil { return row.ConvertBatchError(ctx, d.desc, b) } diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 4300a06768d9..234c03272656 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -304,6 +304,7 @@ func (n *insertFastPathNode) runUniqChecks(params runParams) error { // Run the uniqueness checks batch. ba := n.run.uniqBatch.ShallowCopy() + log.VEventf(params.ctx, 2, "uniqueness check: sending a batch with %d requests", len(ba.Requests)) br, err := params.p.txn.Send(params.ctx, ba) if err != nil { return err.GoError() @@ -331,6 +332,7 @@ func (n *insertFastPathNode) runFKChecks(params runParams) error { // Run the FK checks batch. ba := n.run.fkBatch.ShallowCopy() + log.VEventf(params.ctx, 2, "fk check: sending a batch with %d requests", len(ba.Requests)) br, err := params.p.txn.Send(params.ctx, ba) if err != nil { return err.GoError() @@ -362,6 +364,7 @@ func (n *insertFastPathNode) runFKUniqChecks(params runParams) error { // Run the combined uniqueness and FK checks batch. ba := n.run.uniqBatch.ShallowCopy() + log.VEventf(params.ctx, 2, "fk / uniqueness check: sending a batch with %d requests", len(ba.Requests)) br, err := params.p.txn.Send(params.ctx, ba) if err != nil { return err.GoError() diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 36e28aec3412..f471f4a3e2ff 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -646,6 +646,7 @@ func (rf *Fetcher) StartInconsistentScan( } } + log.VEventf(ctx, 2, "inconsistent scan: sending a batch with %d requests", len(ba.Requests)) res, err := txn.Send(ctx, ba) if err != nil { return nil, err.GoError() diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 43a98b7666c0..b761ba6e2e8c 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -277,6 +277,7 @@ func makeTxnKVFetcherDefaultSendFunc(txn *kv.Txn, batchRequestsIssued *int64) se ctx context.Context, ba *kvpb.BatchRequest, ) (*kvpb.BatchResponse, error) { + log.VEventf(ctx, 2, "kv fetcher: sending a batch with %d requests", len(ba.Requests)) res, err := txn.Send(ctx, ba) if err != nil { return nil, err.GoError() diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 695dea198b9a..aab3c3a6cc27 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" ) @@ -70,6 +71,7 @@ func newTxnKVFetcher( } else { negotiated := false sendFn = func(ctx context.Context, ba *kvpb.BatchRequest) (br *kvpb.BatchResponse, _ error) { + log.VEventf(ctx, 2, "kv fetcher (bounded staleness): sending a batch with %d requests", len(ba.Requests)) ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST var pErr *kvpb.Error // Only use NegotiateAndSend if we have not yet negotiated a timestamp. diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 40bb221185e9..016e1203f06d 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -192,6 +192,7 @@ func (tb *tableWriterBase) setRowsWrittenLimit(sd *sessiondata.SessionData) { // flushAndStartNewBatch shares the common flushAndStartNewBatch() code between // tableWriters. func (tb *tableWriterBase) flushAndStartNewBatch(ctx context.Context) error { + log.VEventf(ctx, 2, "writing batch with %d requests", len(tb.b.Requests())) if err := tb.txn.Run(ctx, tb.b); err != nil { return row.ConvertBatchError(ctx, tb.desc, tb.b) } @@ -221,11 +222,13 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) { // before committing. !tb.txn.DeadlineLikelySufficient() { log.Event(ctx, "autocommit enabled") + log.VEventf(ctx, 2, "writing batch with %d requests and committing", len(tb.b.Requests())) // An auto-txn can commit the transaction with the batch. This is an // optimization to avoid an extra round-trip to the transaction // coordinator. err = tb.txn.CommitInBatch(ctx, tb.b) } else { + log.VEventf(ctx, 2, "writing batch with %d requests", len(tb.b.Requests())) err = tb.txn.Run(ctx, tb.b) } tb.lastBatchSize = tb.currentBatchSize