From 3e8a3a2001623e1efc883b5557c5d4ab37090b3b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 28 May 2024 16:04:50 -0700 Subject: [PATCH] sql: add some logging on SQL / KV boundary This commit audits all main entry points from SQL to KV and adds simple log messages (hidden behind verbosity level 2) right before performing the KV operation. We do so both on the write and read paths. This could be helpful when investigating latency spikes with gaps in the traces (although it's still likely to be insufficient in practice - i.e. Go execution traces are likely to be needed, yet this commit could make things a bit clearer). Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 5 +++++ pkg/sql/delete_range.go | 2 ++ pkg/sql/insert_fast_path.go | 3 +++ pkg/sql/row/fetcher.go | 1 + pkg/sql/row/kv_batch_fetcher.go | 1 + pkg/sql/row/kv_fetcher.go | 2 ++ pkg/sql/tablewriter.go | 3 +++ 7 files changed, 17 insertions(+) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 4372770e0b88..945b5e7a0a12 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -1382,6 +1382,11 @@ func (w *workerCoordinator) performRequestAsync( // unnecessary blocking (due to sequential evaluation of sub-batches // by the DistSender). For the initial implementation it doesn't // seem important though. + + // Note that we don't add a separate log.VEventf here before calling + // Send since we create a separate tracing span for each async + // request which is sufficient to highlight where the handoff from + // SQL occurred. br, pErr := w.txn.Send(ctx, ba) if pErr != nil { // TODO(yuzefovich): if err is diff --git a/pkg/sql/delete_range.go b/pkg/sql/delete_range.go index e599bbb13d06..b0036475f2ad 100644 --- a/pkg/sql/delete_range.go +++ b/pkg/sql/delete_range.go @@ -122,6 +122,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { b.Header.MaxSpanRequestKeys = row.TableTruncateChunkSize b.Header.LockTimeout = params.SessionData().LockTimeout d.deleteSpans(params, b, spans) + log.VEventf(ctx, 2, "fast delete: processing %d spans", len(spans)) if err := params.p.txn.Run(ctx, b); err != nil { return row.ConvertBatchError(ctx, d.desc, b) } @@ -143,6 +144,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { b := params.p.txn.NewBatch() b.Header.LockTimeout = params.SessionData().LockTimeout d.deleteSpans(params, b, spans) + log.VEventf(ctx, 2, "fast delete: processing %d spans and committing", len(spans)) if err := params.p.txn.CommitInBatch(ctx, b); err != nil { return row.ConvertBatchError(ctx, d.desc, b) } diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index 4300a06768d9..234c03272656 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -304,6 +304,7 @@ func (n *insertFastPathNode) runUniqChecks(params runParams) error { // Run the uniqueness checks batch. ba := n.run.uniqBatch.ShallowCopy() + log.VEventf(params.ctx, 2, "uniqueness check: sending a batch with %d requests", len(ba.Requests)) br, err := params.p.txn.Send(params.ctx, ba) if err != nil { return err.GoError() @@ -331,6 +332,7 @@ func (n *insertFastPathNode) runFKChecks(params runParams) error { // Run the FK checks batch. ba := n.run.fkBatch.ShallowCopy() + log.VEventf(params.ctx, 2, "fk check: sending a batch with %d requests", len(ba.Requests)) br, err := params.p.txn.Send(params.ctx, ba) if err != nil { return err.GoError() @@ -362,6 +364,7 @@ func (n *insertFastPathNode) runFKUniqChecks(params runParams) error { // Run the combined uniqueness and FK checks batch. ba := n.run.uniqBatch.ShallowCopy() + log.VEventf(params.ctx, 2, "fk / uniqueness check: sending a batch with %d requests", len(ba.Requests)) br, err := params.p.txn.Send(params.ctx, ba) if err != nil { return err.GoError() diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 36e28aec3412..f471f4a3e2ff 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -646,6 +646,7 @@ func (rf *Fetcher) StartInconsistentScan( } } + log.VEventf(ctx, 2, "inconsistent scan: sending a batch with %d requests", len(ba.Requests)) res, err := txn.Send(ctx, ba) if err != nil { return nil, err.GoError() diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 43a98b7666c0..b761ba6e2e8c 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -277,6 +277,7 @@ func makeTxnKVFetcherDefaultSendFunc(txn *kv.Txn, batchRequestsIssued *int64) se ctx context.Context, ba *kvpb.BatchRequest, ) (*kvpb.BatchResponse, error) { + log.VEventf(ctx, 2, "kv fetcher: sending a batch with %d requests", len(ba.Requests)) res, err := txn.Send(ctx, ba) if err != nil { return nil, err.GoError() diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 695dea198b9a..aab3c3a6cc27 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" ) @@ -70,6 +71,7 @@ func newTxnKVFetcher( } else { negotiated := false sendFn = func(ctx context.Context, ba *kvpb.BatchRequest) (br *kvpb.BatchResponse, _ error) { + log.VEventf(ctx, 2, "kv fetcher (bounded staleness): sending a batch with %d requests", len(ba.Requests)) ba.RoutingPolicy = kvpb.RoutingPolicy_NEAREST var pErr *kvpb.Error // Only use NegotiateAndSend if we have not yet negotiated a timestamp. diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 40bb221185e9..016e1203f06d 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -192,6 +192,7 @@ func (tb *tableWriterBase) setRowsWrittenLimit(sd *sessiondata.SessionData) { // flushAndStartNewBatch shares the common flushAndStartNewBatch() code between // tableWriters. func (tb *tableWriterBase) flushAndStartNewBatch(ctx context.Context) error { + log.VEventf(ctx, 2, "writing batch with %d requests", len(tb.b.Requests())) if err := tb.txn.Run(ctx, tb.b); err != nil { return row.ConvertBatchError(ctx, tb.desc, tb.b) } @@ -221,11 +222,13 @@ func (tb *tableWriterBase) finalize(ctx context.Context) (err error) { // before committing. !tb.txn.DeadlineLikelySufficient() { log.Event(ctx, "autocommit enabled") + log.VEventf(ctx, 2, "writing batch with %d requests and committing", len(tb.b.Requests())) // An auto-txn can commit the transaction with the batch. This is an // optimization to avoid an extra round-trip to the transaction // coordinator. err = tb.txn.CommitInBatch(ctx, tb.b) } else { + log.VEventf(ctx, 2, "writing batch with %d requests", len(tb.b.Requests())) err = tb.txn.Run(ctx, tb.b) } tb.lastBatchSize = tb.currentBatchSize