Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
35688: sql: specify hash or merge join in EXPLAIN r=RaduBerinde a=RaduBerinde

Replace `join` with `hash-join` or `merge-join` in `EXPLAIN` output.

Fixes #35683.

Release note (sql change): EXPLAIN tree now uses `hash-join` or
`merge-join` instead of `join`.

35719: roachpb: prevent data race on Transaction r=nvanbenschoten a=nvanbenschoten

Fixes #34241.

This PR starts off with a series of small cleanups related to ownership of `roachpb.Transaction` objects and the use of deep and shallow clones. This makes up the first 5 commits.

Next, the 6th commit removes redundant calls to `Transaction.UpdateObservedTimestamp`, reducing it down to have just a single caller that is easy to reason about.

The 7th commit is what actually fixes the referenced issue. It adds in the proto clone that was missing from `BatchResponse_Header.combine` and allowing a `BatchRequest` to reference the same `Transaction` object as a `BatchResponse`. I confirmed a number of times that this stops the assertion from firing, so the commit also re-enables the skipped roachtest and removes the assertion.

~The final two commits are the two that we might consider waiting on and not including in this release, but they're also the most exciting. By making `Transaction.ObservedTimestamps` immutable (which it almost already was), we can prohibit all interior mutability of references within `Transaction`, give it value semantics, and eliminate the distinction between "shallow" and "deep" object cloning. This reduces the cost of a clone to a single straightforward allocation and makes working with the object easier to think about.~

EDIT: these two were removed from this PR.

Past this point, I think the only other change we might want to make here is making a clone of `ba.Txn` in `internalClientAdapter` and then declare that the `Batch` handler goroutine has exclusive ownership over its `ba.Txn`. This would allow us to avoid a few conservative clones that would no longer be necessary, like [here](https://github.com/cockroachdb/cockroach/blob/57e825a7940495b67e0cc7213a5fabc24e12be0e/pkg/storage/store.go#L2827) and [here](https://github.com/cockroachdb/cockroach/blob/57e825a7940495b67e0cc7213a5fabc24e12be0e/pkg/storage/replica.go#L1309). I did not make this change here.

35736: opt: catch all pgerror.Error in optbuilder r=RaduBerinde a=RaduBerinde

We now catch all `pgerror.Error`s in optbuilder, which means that we
don't need to use the `buildError` wrapper with them. The wrapper
still exists when external calls (e.g. for name resolution) return a
generic error.

The main motivation is that optbuilder calls into the factory which
can panic internally. We will want to switch those panics to assertion
errors as well, but they are in different packages. The existing
approach would have required a shared, exported wrapper.

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed Mar 14, 2019
4 parents 1f179aa + 1832862 + 05a93a6 + c576561 commit 06ed80d
Show file tree
Hide file tree
Showing 60 changed files with 1,246 additions and 1,330 deletions.
3 changes: 1 addition & 2 deletions pkg/cmd/roachtest/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ func registerKV(r *registry) {
// Configs with large nodes.
{nodes: 3, cpus: 96, readPercent: 0},
{nodes: 3, cpus: 96, readPercent: 95},
// Skipped: https://github.com/cockroachdb/cockroach/issues/34241.
// {nodes: 4, cpus: 96, readPercent: 50, batchSize: 64},
{nodes: 4, cpus: 96, readPercent: 50, batchSize: 64},

// Configs with encryption.
{nodes: 1, cpus: 8, readPercent: 0, encryption: true},
Expand Down
1 change: 0 additions & 1 deletion pkg/internal/client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func TestSetPriority(t *testing.T) {
}

br := &roachpb.BatchResponse{}
br.Txn = &roachpb.Transaction{}
br.Txn.Update(ba.Txn) // copy
return br, nil
}), clock)
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,9 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Update the transaction from the response. Note that this wouldn't happen
// on the asynchronous path, but if we have newer information it's good to
// use it.
ba.UpdateTxn(resp.reply.Txn)
if !lastRange {
ba.UpdateTxn(resp.reply.Txn)
}

mightStopEarly := ba.MaxSpanRequestKeys > 0 || stopAtRangeBoundary
// Check whether we've received enough responses to exit query loop.
Expand Down
42 changes: 1 addition & 41 deletions pkg/kv/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@ package kv

import (
"context"
"encoding/hex"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -156,38 +152,6 @@ func (gt *grpcTransport) maybeResurrectRetryablesLocked() bool {
return len(resurrect) > 0
}

func withMarshalingDebugging(ctx context.Context, ba roachpb.BatchRequest, f func()) {
nPre := ba.Size()
defer func() {
// TODO(ajwerner): re-enable the pre-emptive panic case below when the sizes
// do not match. The case is being disabled temporarily to reduce the
// rate of panics in the upcoming release. A more holistic fix which
// eliminates the shallow copies of transactions is coming soon.
nPost := ba.Size()
if r := recover(); r != nil /* || nPre != nPost */ {
var buf strings.Builder
_, _ = fmt.Fprintf(&buf, "batch size %d -> %d bytes\n", nPre, nPost)
func() {
defer func() {
if rInner := recover(); rInner != nil {
_, _ = fmt.Fprintln(&buf, "panic while re-marshaling:", rInner)
}
}()
data, mErr := protoutil.Marshal(&ba)
if mErr != nil {
_, _ = fmt.Fprintln(&buf, "while re-marshaling:", mErr)
} else {
_, _ = fmt.Fprintln(&buf, "re-marshaled protobuf:")
_, _ = fmt.Fprintln(&buf, hex.Dump(data))
}
}()
_, _ = fmt.Fprintln(&buf, "original panic: ", r)
panic(buf.String())
}
}()
f()
}

// SendNext invokes the specified RPC on the supplied client when the
// client is ready. On success, the reply is sent on the channel;
// otherwise an error is sent.
Expand All @@ -201,11 +165,7 @@ func (gt *grpcTransport) SendNext(
}

ba.Replica = client.replica
var reply *roachpb.BatchResponse

withMarshalingDebugging(ctx, ba, func() {
reply, err = gt.sendBatch(ctx, client.replica.NodeID, iface, ba)
})
reply, err := gt.sendBatch(ctx, client.replica.NodeID, iface, ba)

// NotLeaseHolderErrors can be retried.
var retryable bool
Expand Down
28 changes: 0 additions & 28 deletions pkg/kv/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -173,30 +172,3 @@ func (m *mockInternalClient) RangeFeed(
) (roachpb.Internal_RangeFeedClient, error) {
return nil, fmt.Errorf("unsupported RangeFeed call")
}

func TestWithMarshalingDebugging(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip(fmt.Sprintf("Skipped until #34241 is resolved"))

ctx := context.Background()

exp := `batch size 23 -> 28 bytes
re-marshaled protobuf:
00000000 0a 0a 0a 00 12 06 08 00 10 00 18 00 12 0e 3a 0c |..............:.|
00000010 0a 0a 1a 03 66 6f 6f 22 03 62 61 72 |....foo".bar|
original panic: <nil>
`

assert.PanicsWithValue(t, exp, func() {
var ba roachpb.BatchRequest
ba.Add(&roachpb.ScanRequest{
RequestHeader: roachpb.RequestHeader{
Key: []byte("foo"),
},
})
withMarshalingDebugging(ctx, ba, func() {
ba.Requests[0].GetInner().(*roachpb.ScanRequest).EndKey = roachpb.Key("bar")
})
})
}
12 changes: 4 additions & 8 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,8 +905,7 @@ func (tc *TxnCoordSender) UpdateStateOnRemoteRetryableErr(
// handleRetryableErrLocked().
if err.Transaction.ID == txnID {
// This is where we get a new epoch.
cp := err.Transaction.Clone()
tc.mu.txn.Update(&cp)
tc.mu.txn.Update(&err.Transaction)
}
return roachpb.NewError(err)
}
Expand Down Expand Up @@ -1036,8 +1035,7 @@ func (tc *TxnCoordSender) updateStateLocked(
tc.mu.storedErr = pErr

// Cleanup.
cp := pErr.GetTxn().Clone()
tc.mu.txn.Update(&cp)
tc.mu.txn.Update(pErr.GetTxn())
tc.cleanupTxnLocked(ctx)
return pErr
}
Expand All @@ -1056,8 +1054,7 @@ func (tc *TxnCoordSender) updateStateLocked(
// handleRetryableErrLocked().
if err.Transaction.ID == ba.Txn.ID {
// This is where we get a new epoch.
cp := err.Transaction.Clone()
tc.mu.txn.Update(&cp)
tc.mu.txn.Update(&err.Transaction)
}
return roachpb.NewError(err)
}
Expand All @@ -1069,8 +1066,7 @@ func (tc *TxnCoordSender) updateStateLocked(
PrevError: pErr.String(),
})
// Cleanup.
cp := errTxn.Clone()
tc.mu.txn.Update(&cp)
tc.mu.txn.Update(errTxn)
tc.cleanupTxnLocked(ctx)
}
return pErr
Expand Down
11 changes: 7 additions & 4 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,13 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error {
)
}
h.Timestamp.Forward(o.Timestamp)
if h.Txn == nil {
h.Txn = o.Txn
} else {
h.Txn.Update(o.Txn)
if txn := o.Txn; txn != nil {
if h.Txn == nil {
txnClone := txn.Clone()
h.Txn = &txnClone
} else {
h.Txn.Update(txn)
}
}
h.Now.Forward(o.Now)
h.CollectedSpans = append(h.CollectedSpans, o.CollectedSpans...)
Expand Down
11 changes: 5 additions & 6 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
// UpdateTxn updates the batch transaction from the supplied one in
// a copy-on-write fashion, i.e. without mutating an existing
// Transaction struct.
func (ba *BatchRequest) UpdateTxn(otherTxn *Transaction) {
if otherTxn == nil {
func (ba *BatchRequest) UpdateTxn(o *Transaction) {
if o == nil {
return
}
otherTxn.AssertInitialized(context.TODO())
o.AssertInitialized(context.TODO())
if ba.Txn == nil {
ba.Txn = otherTxn
ba.Txn = o
return
}
clonedTxn := ba.Txn.Clone()
clonedTxn.Update(otherTxn)
clonedTxn.Update(o)
ba.Txn = &clonedTxn
}

Expand Down Expand Up @@ -424,7 +424,6 @@ func (br *BatchResponse) Combine(otherBatch *BatchResponse, positions []int) err
return errors.Errorf("can not combine %T and %T", valLeft, valRight)
}
}
br.Txn.Update(otherBatch.Txn)
return nil
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (e *Error) GoError() error {

// SetTxn sets the txn and resets the error message. txn is cloned before being
// stored in the Error.
// TODO(kaneda): Unexpose this method and make callers use NewErrorWithTxn.
func (e *Error) SetTxn(txn *Transaction) {
e.UnexposedTxn = txn
if txn != nil {
Expand All @@ -172,9 +171,9 @@ func (e *Error) GetTxn() *Transaction {
return e.UnexposedTxn
}

// UpdateTxn updates the txn.
// UpdateTxn updates the error transaction.
func (e *Error) UpdateTxn(o *Transaction) {
if e == nil {
if o == nil {
return
}
if e.UnexposedTxn == nil {
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/logictest/testdata/planner_test/aggregate
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ group · ·
│ scalar · · ·
└── render · · ("((k, v, w, s) AS k, v, w, s)") ·
│ render 0 ((a.k, a.v, a.w, a.s) AS k, v, w, s) · ·
└── join · · (k, v, w, s, k[omitted], v[omitted], w[omitted], s[omitted]) ·
└── hash-join · · (k, v, w, s, k[omitted], v[omitted], w[omitted], s[omitted]) ·
│ type cross · ·
├── scan · · (k, v, w, s) k!=NULL; key(k)
│ table kv@primary · ·
Expand All @@ -103,7 +103,7 @@ group · ·
│ render 2 a.w
│ render 3 a.s
│ render 4 b.k
└── join · ·
└── hash-join · ·
│ type cross
├── scan · ·
│ table kv@primary
Expand All @@ -127,7 +127,7 @@ group · ·
│ render 3 a.w
│ render 4 a.s
│ render 5 b.k
└── join · ·
└── hash-join · ·
│ type cross
├── scan · ·
│ table kv@primary
Expand All @@ -151,7 +151,7 @@ group · ·
│ render 2 a.w
│ render 3 a.s
│ render 4 b.k
└── join · ·
└── hash-join · ·
│ type cross
├── scan · ·
│ table kv@primary
Expand All @@ -174,7 +174,7 @@ group · ·
│ render 1 a.v
│ render 2 a.w
│ render 3 a.s
└── join · ·
└── hash-join · ·
│ type cross
├── scan · ·
│ table kv@primary
Expand Down Expand Up @@ -626,7 +626,7 @@ group · ·
│ render 2 test.public.kv.w
│ render 3 test.public.kv.s
│ render 4 test.public.abc.d
└── join · ·
└── hash-join · ·
│ type inner
│ pred test.public.kv.k >= test.public.abc.d
├── scan · ·
Expand Down Expand Up @@ -738,7 +738,7 @@ render · ·
│ render 1 test.public.ab.a
│ render 2 test.public.ab.b
│ render 3 test.public.xy.y
└── join · ·
└── hash-join · ·
│ type cross
├── scan · ·
│ table ab@primary
Expand Down Expand Up @@ -853,7 +853,7 @@ group · · (z, max)
query TTTTT
EXPLAIN (VERBOSE) SELECT * FROM (SELECT x, max(y) FROM group_ord GROUP BY x) JOIN (SELECT z, min(y) FROM group_ord@foo GROUP BY z) ON x = z
----
join · · (x, max, z, min) x=z; x!=NULL
merge-join · · (x, max, z, min) x=z; x!=NULL
│ type inner · ·
│ equality (x) = (z) · ·
│ mergeJoinOrder +"(x=z)" · ·
Expand Down
52 changes: 26 additions & 26 deletions pkg/sql/logictest/testdata/planner_test/distsql_interleaved_join
Original file line number Diff line number Diff line change
Expand Up @@ -254,20 +254,20 @@ EXPLAIN SELECT * FROM grandchild2 JOIN parent1 USING(pid1) WHERE
OR pid1 >= 19 AND pid1 <= 21
OR pid1 >= 31 AND pid1 <= 33
----
render · ·
└── join · ·
│ type inner
│ equality (pid1) = (pid1)
│ mergeJoinOrder +"(pid1=pid1)"
├── scan · ·
│ table grandchild2@primary
│ spans /11/#/56/1-/13/#/56/2 /19/#/56/1-/21/#/56/2 /31/#/56/1-/33/#/56/2
│ filter ((pid1 <= 13) OR ((pid1 >= 19) AND (pid1 <= 21))) OR (pid1 >= 31)
└── scan · ·
· table parent1@primary
· spans /11-/13/# /19-/21/# /31-/33/#
· parallel ·
· filter ((pid1 <= 13) OR ((pid1 >= 19) AND (pid1 <= 21))) OR (pid1 >= 31)
render · ·
└── merge-join · ·
type inner
equality (pid1) = (pid1)
mergeJoinOrder +"(pid1=pid1)"
├── scan · ·
table grandchild2@primary
spans /11/#/56/1-/13/#/56/2 /19/#/56/1-/21/#/56/2 /31/#/56/1-/33/#/56/2
filter ((pid1 <= 13) OR ((pid1 >= 19) AND (pid1 <= 21))) OR (pid1 >= 31)
└── scan · ·
· table parent1@primary
· spans /11-/13/# /19-/21/# /31-/33/#
· parallel ·
· filter ((pid1 <= 13) OR ((pid1 >= 19) AND (pid1 <= 21))) OR (pid1 >= 31)

# Join on multiple interleaved columns with an overarching ancestor (parent1).
# Note there are 5 nodes because the filter cid2 >= 12 AND cid2 <= 14
Expand Down Expand Up @@ -297,17 +297,17 @@ EXPLAIN
OR child2.cid2 >= 12 AND child2.cid2 <= 14
OR gcid2 >= 49 AND gcid2 <= 51
----
join · ·
│ type inner
│ equality (pid1, cid2, cid3) = (pid1, cid2, cid3)
│ mergeJoinOrder +"(pid1=pid1)",+"(cid2=cid2)",+"(cid3=cid3)"
├── scan · ·
│ table child2@primary
│ spans ALL
└── scan · ·
· table grandchild2@primary
· spans ALL
· filter (((pid1 >= 5) AND (pid1 <= 7)) OR ((cid2 >= 12) AND (cid2 <= 14))) OR ((gcid2 >= 49) AND (gcid2 <= 51))
merge-join · ·
type inner
equality (pid1, cid2, cid3) = (pid1, cid2, cid3)
mergeJoinOrder +"(pid1=pid1)",+"(cid2=cid2)",+"(cid3=cid3)"
├── scan · ·
table child2@primary
spans ALL
└── scan · ·
· table grandchild2@primary
· spans ALL
· filter (((pid1 >= 5) AND (pid1 <= 7)) OR ((cid2 >= 12) AND (cid2 <= 14))) OR ((gcid2 >= 49) AND (gcid2 <= 51))

# Aggregation over parent and child keys.
query T
Expand Down Expand Up @@ -449,7 +449,7 @@ query TTT
EXPLAIN SELECT * FROM parent1 JOIN (SELECT * FROM child1 ORDER BY cid1) USING (pid1)
----
render · ·
└── join · ·
└── hash-join · ·
│ type inner
│ equality (pid1) = (pid1)
├── scan · ·
Expand Down
Loading

0 comments on commit 06ed80d

Please sign in to comment.