From 05a93a697eb884d3fd04ded8a550eecfe302376a Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 13 Mar 2019 21:48:03 -0400 Subject: [PATCH] roachpb: clone Txn object in BatchResponse_Header.combine Fixes #34241. This missing clone is what allows the request transaction to enter the response transaction and create the data race that was causing the panic. This commit adds the necessary clone and removes the corresponding assertion. Release note: None --- pkg/cmd/roachtest/kv.go | 3 +-- pkg/kv/transport.go | 42 +--------------------------------------- pkg/kv/transport_test.go | 28 --------------------------- pkg/roachpb/api.go | 11 +++++++---- 4 files changed, 9 insertions(+), 75 deletions(-) diff --git a/pkg/cmd/roachtest/kv.go b/pkg/cmd/roachtest/kv.go index dc91acfbd198..5d99c7b2b9eb 100644 --- a/pkg/cmd/roachtest/kv.go +++ b/pkg/cmd/roachtest/kv.go @@ -100,8 +100,7 @@ func registerKV(r *registry) { // Configs with large nodes. {nodes: 3, cpus: 96, readPercent: 0}, {nodes: 3, cpus: 96, readPercent: 95}, - // Skipped: https://github.com/cockroachdb/cockroach/issues/34241. - // {nodes: 4, cpus: 96, readPercent: 50, batchSize: 64}, + {nodes: 4, cpus: 96, readPercent: 50, batchSize: 64}, // Configs with encryption. {nodes: 1, cpus: 8, readPercent: 0, encryption: true}, diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index 0db536ebfe8e..827fe9b7ccf7 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -17,10 +17,7 @@ package kv import ( "context" - "encoding/hex" - "fmt" "sort" - "strings" "time" "github.com/cockroachdb/cockroach/pkg/internal/client" @@ -28,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/opentracing/opentracing-go" @@ -156,38 +152,6 @@ func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool { return len(resurrect) > 0 } -func withMarshalingDebugging(ctx context.Context, ba roachpb.BatchRequest, f func()) { - nPre := ba.Size() - defer func() { - // TODO(ajwerner): re-enable the pre-emptive panic case below when the sizes - // do not match. The case is being disabled temporarily to reduce the - // rate of panics in the upcoming release. A more holistic fix which - // eliminates the shallow copies of transactions is coming soon. - nPost := ba.Size() - if r := recover(); r != nil /* || nPre != nPost */ { - var buf strings.Builder - _, _ = fmt.Fprintf(&buf, "batch size %d -> %d bytes\n", nPre, nPost) - func() { - defer func() { - if rInner := recover(); rInner != nil { - _, _ = fmt.Fprintln(&buf, "panic while re-marshaling:", rInner) - } - }() - data, mErr := protoutil.Marshal(&ba) - if mErr != nil { - _, _ = fmt.Fprintln(&buf, "while re-marshaling:", mErr) - } else { - _, _ = fmt.Fprintln(&buf, "re-marshaled protobuf:") - _, _ = fmt.Fprintln(&buf, hex.Dump(data)) - } - }() - _, _ = fmt.Fprintln(&buf, "original panic: ", r) - panic(buf.String()) - } - }() - f() -} - // SendNext invokes the specified RPC on the supplied client when the // client is ready. On success, the reply is sent on the channel; // otherwise an error is sent. @@ -201,11 +165,7 @@ func (gt *grpcTransport) SendNext( } ba.Replica = client.replica - var reply *roachpb.BatchResponse - - withMarshalingDebugging(ctx, ba, func() { - reply, err = gt.sendBatch(ctx, client.replica.NodeID, iface, ba) - }) + reply, err := gt.sendBatch(ctx, client.replica.NodeID, iface, ba) // NotLeaseHolderErrors can be retried. var retryable bool diff --git a/pkg/kv/transport_test.go b/pkg/kv/transport_test.go index 259efafa5e15..b9c4f8bd23e1 100644 --- a/pkg/kv/transport_test.go +++ b/pkg/kv/transport_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" opentracing "github.com/opentracing/opentracing-go" - "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -173,30 +172,3 @@ func (m *mockInternalClient) RangeFeed( ) (roachpb.Internal_RangeFeedClient, error) { return nil, fmt.Errorf("unsupported RangeFeed call") } - -func TestWithMarshalingDebugging(t *testing.T) { - defer leaktest.AfterTest(t)() - t.Skip(fmt.Sprintf("Skipped until #34241 is resolved")) - - ctx := context.Background() - - exp := `batch size 23 -> 28 bytes -re-marshaled protobuf: -00000000 0a 0a 0a 00 12 06 08 00 10 00 18 00 12 0e 3a 0c |..............:.| -00000010 0a 0a 1a 03 66 6f 6f 22 03 62 61 72 |....foo".bar| - -original panic: -` - - assert.PanicsWithValue(t, exp, func() { - var ba roachpb.BatchRequest - ba.Add(&roachpb.ScanRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: []byte("foo"), - }, - }) - withMarshalingDebugging(ctx, ba, func() { - ba.Requests[0].GetInner().(*roachpb.ScanRequest).EndKey = roachpb.Key("bar") - }) - }) -} diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 4175e7ecd93a..1d0a6a133110 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -383,10 +383,13 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error { ) } h.Timestamp.Forward(o.Timestamp) - if h.Txn == nil { - h.Txn = o.Txn - } else { - h.Txn.Update(o.Txn) + if txn := o.Txn; txn != nil { + if h.Txn == nil { + txnClone := txn.Clone() + h.Txn = &txnClone + } else { + h.Txn.Update(txn) + } } h.Now.Forward(o.Now) h.CollectedSpans = append(h.CollectedSpans, o.CollectedSpans...)