Skip to content

Commit

Permalink
roachpb: clone Txn object in BatchResponse_Header.combine
Browse files Browse the repository at this point in the history
Fixes cockroachdb#34241.

This missing clone is what allows the request transaction to enter the
response transaction and create the data race that was causing the panic.

This commit adds the necessary clone and removes the corresponding
assertion.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 14, 2019
1 parent 8ece976 commit be197a2
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 75 deletions.
3 changes: 1 addition & 2 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func registerKV(r *registry) {
// Configs with large nodes.
{nodes: 3, cpus: 96, readPercent: 0},
{nodes: 3, cpus: 96, readPercent: 95},
// Skipped: https://github.com/cockroachdb/cockroach/issues/34241.
// {nodes: 4, cpus: 96, readPercent: 50, batchSize: 64},
{nodes: 4, cpus: 96, readPercent: 50, batchSize: 64},

// Configs with encryption.
{nodes: 1, cpus: 8, readPercent: 0, encryption: true},
Expand Down
42 changes: 1 addition & 41 deletions pkg/kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@ package kv

import (
"context"
"encoding/hex"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -156,38 +152,6 @@ func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool {
return len(resurrect) > 0
}

func withMarshalingDebugging(ctx context.Context, ba roachpb.BatchRequest, f func()) {
nPre := ba.Size()
defer func() {
// TODO(ajwerner): re-enable the pre-emptive panic case below when the sizes
// do not match. The case is being disabled temporarily to reduce the
// rate of panics in the upcoming release. A more holistic fix which
// eliminates the shallow copies of transactions is coming soon.
nPost := ba.Size()
if r := recover(); r != nil /* || nPre != nPost */ {
var buf strings.Builder
_, _ = fmt.Fprintf(&buf, "batch size %d -> %d bytes\n", nPre, nPost)
func() {
defer func() {
if rInner := recover(); rInner != nil {
_, _ = fmt.Fprintln(&buf, "panic while re-marshaling:", rInner)
}
}()
data, mErr := protoutil.Marshal(&ba)
if mErr != nil {
_, _ = fmt.Fprintln(&buf, "while re-marshaling:", mErr)
} else {
_, _ = fmt.Fprintln(&buf, "re-marshaled protobuf:")
_, _ = fmt.Fprintln(&buf, hex.Dump(data))
}
}()
_, _ = fmt.Fprintln(&buf, "original panic: ", r)
panic(buf.String())
}
}()
f()
}

// SendNext invokes the specified RPC on the supplied client when the
// client is ready. On success, the reply is sent on the channel;
// otherwise an error is sent.
Expand All @@ -201,11 +165,7 @@ func (gt *grpcTransport) SendNext(
}

ba.Replica = client.replica
var reply *roachpb.BatchResponse

withMarshalingDebugging(ctx, ba, func() {
reply, err = gt.sendBatch(ctx, client.replica.NodeID, iface, ba)
})
reply, err := gt.sendBatch(ctx, client.replica.NodeID, iface, ba)

// NotLeaseHolderErrors can be retried.
var retryable bool
Expand Down
28 changes: 0 additions & 28 deletions pkg/kv/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -173,30 +172,3 @@ func (m *mockInternalClient) RangeFeed(
) (roachpb.Internal_RangeFeedClient, error) {
return nil, fmt.Errorf("unsupported RangeFeed call")
}

func TestWithMarshalingDebugging(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip(fmt.Sprintf("Skipped until #34241 is resolved"))

ctx := context.Background()

exp := `batch size 23 -> 28 bytes
re-marshaled protobuf:
00000000 0a 0a 0a 00 12 06 08 00 10 00 18 00 12 0e 3a 0c |..............:.|
00000010 0a 0a 1a 03 66 6f 6f 22 03 62 61 72 |....foo".bar|
original panic: <nil>
`

assert.PanicsWithValue(t, exp, func() {
var ba roachpb.BatchRequest
ba.Add(&roachpb.ScanRequest{
RequestHeader: roachpb.RequestHeader{
Key: []byte("foo"),
},
})
withMarshalingDebugging(ctx, ba, func() {
ba.Requests[0].GetInner().(*roachpb.ScanRequest).EndKey = roachpb.Key("bar")
})
})
}
11 changes: 7 additions & 4 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,13 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error {
)
}
h.Timestamp.Forward(o.Timestamp)
if h.Txn == nil {
h.Txn = o.Txn
} else {
h.Txn.Update(o.Txn)
if txn := o.Txn; txn != nil {
if h.Txn == nil {
txnClone := txn.Clone()
h.Txn = &txnClone
} else {
h.Txn.Update(txn)
}
}
h.Now.Forward(o.Now)
h.CollectedSpans = append(h.CollectedSpans, o.CollectedSpans...)
Expand Down

0 comments on commit be197a2

Please sign in to comment.