Skip to content

Commit

Permalink
Merge #37559
Browse files Browse the repository at this point in the history
37559: kv: remove withMarshalingDebugging function r=nvanbenschoten a=nvanbenschoten

This debugging function was removed when we fixed #34241 and added back
in when #35803 appeared because it was clear that we hadn't fully fixed
the issue. It's been about 2 months since #35762 merged and we haven't
seen any issues since, so this can now be removed.

I don't think we meant to keep this in for the 19.1 release. We should
backport this commit.

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed May 17, 2019
2 parents 7b26514 + a3deaf2 commit 4b34198
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 75 deletions.
5 changes: 2 additions & 3 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,11 @@ func truncate(ba roachpb.BatchRequest, rs roachpb.RSpan) (roachpb.BatchRequest,
if newSpan.EqualValue(oldHeader.Span()) {
truncBA.Requests = append(truncBA.Requests, ba.Requests[pos])
} else {
var union roachpb.RequestUnion
oldHeader.SetSpan(newSpan)
shallowCopy := inner.ShallowCopy()
shallowCopy.SetHeader(oldHeader)
union.MustSetInner(shallowCopy)
truncBA.Requests = append(truncBA.Requests, union)
truncBA.Requests = append(truncBA.Requests, roachpb.RequestUnion{})
truncBA.Requests[len(truncBA.Requests)-1].MustSetInner(shallowCopy)
}
positions = append(positions, pos)
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ func (ds *DistSender) Send(

tracing.AnnotateTrace()

// TODO(nvanbenschoten): This causes ba to escape to the heap. Either
// commit to passing BatchRequests by reference or return an updated
// value from this method instead.
if pErr := ds.initAndVerifyBatch(ctx, &ba); pErr != nil {
return nil, pErr
}
Expand Down Expand Up @@ -1295,7 +1298,8 @@ func fillSkippedResponses(
hdr.ResumeSpan.Key = origSpan.Key
} else if roachpb.RKey(origSpan.Key).Less(nextKey) {
// Some keys have yet to be processed.
hdr.ResumeSpan = &origSpan
hdr.ResumeSpan = new(roachpb.Span)
*hdr.ResumeSpan = origSpan
if nextKey.Less(roachpb.RKey(origSpan.EndKey)) {
// The original span has been partially processed.
hdr.ResumeSpan.EndKey = nextKey.AsRawKey()
Expand All @@ -1315,7 +1319,8 @@ func fillSkippedResponses(
// latter case.
if nextKey.Less(roachpb.RKey(origSpan.EndKey)) {
// Some keys have yet to be processed.
hdr.ResumeSpan = &origSpan
hdr.ResumeSpan = new(roachpb.Span)
*hdr.ResumeSpan = origSpan
if roachpb.RKey(origSpan.Key).Less(nextKey) {
// The original span has been partially processed.
hdr.ResumeSpan.Key = nextKey.AsRawKey()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"sync"

"github.com/biogo/store/llrb"
Expand Down Expand Up @@ -162,7 +163,7 @@ func (rdc *RangeDescriptorCache) String() string {
}

func (rdc *RangeDescriptorCache) stringLocked() string {
var buf bytes.Buffer
var buf strings.Builder
rdc.rangeCache.cache.Do(func(k, v interface{}) bool {
fmt.Fprintf(&buf, "key=%s desc=%+v\n", roachpb.Key(k.(rangeCacheKey)), v)
return false
Expand Down
38 changes: 1 addition & 37 deletions pkg/kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@ package kv

import (
"context"
"encoding/hex"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -156,34 +152,6 @@ func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool {
return len(resurrect) > 0
}

func withMarshalingDebugging(ctx context.Context, ba roachpb.BatchRequest, f func()) {
nPre := ba.Size()
defer func() {
nPost := ba.Size()
if r := recover(); r != nil || nPre != nPost {
var buf strings.Builder
_, _ = fmt.Fprintf(&buf, "batch size %d -> %d bytes\n", nPre, nPost)
func() {
defer func() {
if rInner := recover(); rInner != nil {
_, _ = fmt.Fprintln(&buf, "panic while re-marshaling:", rInner)
}
}()
data, mErr := protoutil.Marshal(&ba)
if mErr != nil {
_, _ = fmt.Fprintln(&buf, "while re-marshaling:", mErr)
} else {
_, _ = fmt.Fprintln(&buf, "re-marshaled protobuf:")
_, _ = fmt.Fprintln(&buf, hex.Dump(data))
}
}()
_, _ = fmt.Fprintln(&buf, "original panic: ", r)
panic(buf.String())
}
}()
f()
}

// SendNext invokes the specified RPC on the supplied client when the
// client is ready. On success, the reply is sent on the channel;
// otherwise an error is sent.
Expand All @@ -197,11 +165,7 @@ func (gt *grpcTransport) SendNext(
}

ba.Replica = client.replica
var reply *roachpb.BatchResponse

withMarshalingDebugging(ctx, ba, func() {
reply, err = gt.sendBatch(ctx, client.replica.NodeID, iface, ba)
})
reply, err := gt.sendBatch(ctx, client.replica.NodeID, iface, ba)

// NotLeaseHolderErrors can be retried.
var retryable bool
Expand Down
28 changes: 0 additions & 28 deletions pkg/kv/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -173,30 +172,3 @@ func (m *mockInternalClient) RangeFeed(
) (roachpb.Internal_RangeFeedClient, error) {
return nil, fmt.Errorf("unsupported RangeFeed call")
}

func TestWithMarshalingDebugging(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip(fmt.Sprintf("Skipped until #34241 is resolved"))

ctx := context.Background()

exp := `batch size 23 -> 28 bytes
re-marshaled protobuf:
00000000 0a 0a 0a 00 12 06 08 00 10 00 18 00 12 0e 3a 0c |..............:.|
00000010 0a 0a 1a 03 66 6f 6f 22 03 62 61 72 |....foo".bar|
original panic: <nil>
`

assert.PanicsWithValue(t, exp, func() {
var ba roachpb.BatchRequest
ba.Add(&roachpb.ScanRequest{
RequestHeader: roachpb.RequestHeader{
Key: []byte("foo"),
},
})
withMarshalingDebugging(ctx, ba, func() {
ba.Requests[0].GetInner().(*roachpb.ScanRequest).EndKey = roachpb.Key("bar")
})
})
}
8 changes: 4 additions & 4 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,11 +716,11 @@ func (tc *TxnCoordSender) commitReadOnlyTxnLocked(
ctx context.Context, ba roachpb.BatchRequest,
) *roachpb.Error {
deadline := ba.Requests[0].GetEndTransaction().Deadline
txn := tc.mu.txn
if deadline != nil && !txn.Timestamp.Less(*deadline) {
pErr := generateTxnDeadlineExceededErr(&txn, *deadline)
if deadline != nil && !tc.mu.txn.Timestamp.Less(*deadline) {
txn := tc.mu.txn.Clone()
pErr := generateTxnDeadlineExceededErr(txn, *deadline)
// We need to bump the epoch and transform this retriable error.
ba.Txn = &txn
ba.Txn = txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
}
tc.mu.txnState = txnFinalized
Expand Down

0 comments on commit 4b34198

Please sign in to comment.