diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index a41e1a77e656..1154961438a1 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -110,12 +110,11 @@ func truncate(ba roachpb.BatchRequest, rs roachpb.RSpan) (roachpb.BatchRequest, if newSpan.EqualValue(oldHeader.Span()) { truncBA.Requests = append(truncBA.Requests, ba.Requests[pos]) } else { - var union roachpb.RequestUnion oldHeader.SetSpan(newSpan) shallowCopy := inner.ShallowCopy() shallowCopy.SetHeader(oldHeader) - union.MustSetInner(shallowCopy) - truncBA.Requests = append(truncBA.Requests, union) + truncBA.Requests = append(truncBA.Requests, roachpb.RequestUnion{}) + truncBA.Requests[len(truncBA.Requests)-1].MustSetInner(shallowCopy) } positions = append(positions, pos) } diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 16a0003d805d..bdad90dc3c9e 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -664,6 +664,9 @@ func (ds *DistSender) Send( tracing.AnnotateTrace() + // TODO(nvanbenschoten): This causes ba to escape to the heap. Either + // commit to passing BatchRequests by reference or return an updated + // value from this method instead. if pErr := ds.initAndVerifyBatch(ctx, &ba); pErr != nil { return nil, pErr } @@ -1295,7 +1298,8 @@ func fillSkippedResponses( hdr.ResumeSpan.Key = origSpan.Key } else if roachpb.RKey(origSpan.Key).Less(nextKey) { // Some keys have yet to be processed. - hdr.ResumeSpan = &origSpan + hdr.ResumeSpan = new(roachpb.Span) + *hdr.ResumeSpan = origSpan if nextKey.Less(roachpb.RKey(origSpan.EndKey)) { // The original span has been partially processed. hdr.ResumeSpan.EndKey = nextKey.AsRawKey() @@ -1315,7 +1319,8 @@ func fillSkippedResponses( // latter case. if nextKey.Less(roachpb.RKey(origSpan.EndKey)) { // Some keys have yet to be processed. - hdr.ResumeSpan = &origSpan + hdr.ResumeSpan = new(roachpb.Span) + *hdr.ResumeSpan = origSpan if roachpb.RKey(origSpan.Key).Less(nextKey) { // The original span has been partially processed. hdr.ResumeSpan.Key = nextKey.AsRawKey() diff --git a/pkg/kv/range_cache.go b/pkg/kv/range_cache.go index fcaddd342cae..fd6c01b1300c 100644 --- a/pkg/kv/range_cache.go +++ b/pkg/kv/range_cache.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "strconv" + "strings" "sync" "github.com/biogo/store/llrb" @@ -162,7 +163,7 @@ func (rdc *RangeDescriptorCache) String() string { } func (rdc *RangeDescriptorCache) stringLocked() string { - var buf bytes.Buffer + var buf strings.Builder rdc.rangeCache.cache.Do(func(k, v interface{}) bool { fmt.Fprintf(&buf, "key=%s desc=%+v\n", roachpb.Key(k.(rangeCacheKey)), v) return false diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index 9b095859bfb1..827fe9b7ccf7 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -17,10 +17,7 @@ package kv import ( "context" - "encoding/hex" - "fmt" "sort" - "strings" "time" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -28,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/opentracing/opentracing-go" @@ -156,34 +152,6 @@ func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool { return len(resurrect) > 0 } -func withMarshalingDebugging(ctx context.Context, ba roachpb.BatchRequest, f func()) { - nPre := ba.Size() - defer func() { - nPost := ba.Size() - if r := recover(); r != nil || nPre != nPost { - var buf strings.Builder - _, _ = fmt.Fprintf(&buf, "batch size %d -> %d bytes\n", nPre, nPost) - func() { - defer func() { - if rInner := recover(); rInner != nil { - _, _ = fmt.Fprintln(&buf, "panic while re-marshaling:", rInner) - } - }() - data, mErr := protoutil.Marshal(&ba) - if mErr != nil { - _, _ = fmt.Fprintln(&buf, "while re-marshaling:", mErr) - } else { - _, _ = fmt.Fprintln(&buf, "re-marshaled protobuf:") - _, _ = fmt.Fprintln(&buf, hex.Dump(data)) - } - }() - _, _ = fmt.Fprintln(&buf, "original panic: ", r) - panic(buf.String()) - } - }() - f() -} - // SendNext invokes the specified RPC on the supplied client when the // client is ready. On success, the reply is sent on the channel; // otherwise an error is sent. @@ -197,11 +165,7 @@ func (gt *grpcTransport) SendNext( } ba.Replica = client.replica - var reply *roachpb.BatchResponse - - withMarshalingDebugging(ctx, ba, func() { - reply, err = gt.sendBatch(ctx, client.replica.NodeID, iface, ba) - }) + reply, err := gt.sendBatch(ctx, client.replica.NodeID, iface, ba) // NotLeaseHolderErrors can be retried. var retryable bool diff --git a/pkg/kv/transport_test.go b/pkg/kv/transport_test.go index 259efafa5e15..b9c4f8bd23e1 100644 --- a/pkg/kv/transport_test.go +++ b/pkg/kv/transport_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" opentracing "github.com/opentracing/opentracing-go" - "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -173,30 +172,3 @@ func (m *mockInternalClient) RangeFeed( ) (roachpb.Internal_RangeFeedClient, error) { return nil, fmt.Errorf("unsupported RangeFeed call") } - -func TestWithMarshalingDebugging(t *testing.T) { - defer leaktest.AfterTest(t)() - t.Skip(fmt.Sprintf("Skipped until #34241 is resolved")) - - ctx := context.Background() - - exp := `batch size 23 -> 28 bytes -re-marshaled protobuf: -00000000 0a 0a 0a 00 12 06 08 00 10 00 18 00 12 0e 3a 0c |..............:.| -00000010 0a 0a 1a 03 66 6f 6f 22 03 62 61 72 |....foo".bar| - -original panic: -` - - assert.PanicsWithValue(t, exp, func() { - var ba roachpb.BatchRequest - ba.Add(&roachpb.ScanRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: []byte("foo"), - }, - }) - withMarshalingDebugging(ctx, ba, func() { - ba.Requests[0].GetInner().(*roachpb.ScanRequest).EndKey = roachpb.Key("bar") - }) - }) -} diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index 4ef786c7e392..2bb6d5d431e6 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -716,11 +716,11 @@ func (tc *TxnCoordSender) commitReadOnlyTxnLocked( ctx context.Context, ba roachpb.BatchRequest, ) *roachpb.Error { deadline := ba.Requests[0].GetEndTransaction().Deadline - txn := tc.mu.txn - if deadline != nil && !txn.Timestamp.Less(*deadline) { - pErr := generateTxnDeadlineExceededErr(&txn, *deadline) + if deadline != nil && !tc.mu.txn.Timestamp.Less(*deadline) { + txn := tc.mu.txn.Clone() + pErr := generateTxnDeadlineExceededErr(txn, *deadline) // We need to bump the epoch and transform this retriable error. - ba.Txn = &txn + ba.Txn = txn return tc.updateStateLocked(ctx, ba, nil /* br */, pErr) } tc.mu.txnState = txnFinalized