Skip to content

Commit

Permalink
Merge pull request #24609 from andreimatei/cherrypick-read-spans
Browse files Browse the repository at this point in the history
cherry-pick 2.0: distsqlrun: generate TxnCoordMeta in processors that weren't doing it
  • Loading branch information
andreimatei authored Apr 9, 2018
2 parents bcd1aba + f31f553 commit 3e44219
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 35 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/importccl/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ func (cp *readCSVProcessor) Run(wg *sync.WaitGroup) {
return nil
})
if err := group.Wait(); err != nil {
distsqlrun.DrainAndClose(ctx, cp.output, err)
distsqlrun.DrainAndClose(ctx, cp.output, err, func(context.Context) {} /* pushTrailingMeta */)
return
}

Expand Down Expand Up @@ -1653,7 +1653,8 @@ func (sp *sstWriter) Run(wg *sync.WaitGroup) {
}
return nil
}()
distsqlrun.DrainAndClose(ctx, sp.output, err, sp.input)
distsqlrun.DrainAndClose(
ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input)
}

type importResumer struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,3 +1243,8 @@ func (txn *Txn) IsSerializablePushAndRefreshNotPossible() bool {
return txn.Proto().Isolation == enginepb.SERIALIZABLE &&
isTxnPushed && txn.mu.Proto.OrigTimestampWasObserved
}

// Type returns the transaction's type.
func (txn *Txn) Type() TxnType {
return txn.typ
}
43 changes: 40 additions & 3 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (

opentracing "github.com/opentracing/opentracing-go"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

const rowChannelBufSize = 16
Expand Down Expand Up @@ -200,12 +202,37 @@ func getTraceData(ctx context.Context) []tracing.RecordedSpan {
return nil
}

// sendTraceData collects the tracing information from the ctx and pushes it to
// dst. The ConsumerStatus returned by dst is ignored.
//
// Note that the tracing data is distinct between different processors, since
// each one gets its own trace "recording group".
func sendTraceData(ctx context.Context, dst RowReceiver) {
if rec := getTraceData(ctx); rec != nil {
dst.Push(nil /* row */, &ProducerMetadata{TraceData: rec})
}
}

// sendTxnCoordMetaMaybe reads the txn metadata from a leaf transactions and
// sends it to dst, so that it eventually makes it to the root txn. The
// ConsumerStatus returned by dst is ignored.
//
// If the txn is a root txn, this is a no-op.
//
// NOTE(andrei): As of 04/2018, the txn is shared by all processors scheduled on
// a node, and so it's possible for multiple processors to send the same
// TxnCoordMeta. The root TxnCoordSender doesn't care if it receives the same
// thing multiple times.
func sendTxnCoordMetaMaybe(txn *client.Txn, dst RowReceiver) {
if txn.Type() == client.RootTxn {
return
}
txnMeta := txn.GetTxnCoordMeta()
if txnMeta.Txn.ID != (uuid.UUID{}) {
dst.Push(nil /* row */, &ProducerMetadata{TxnMeta: &txnMeta})
}
}

// DrainAndClose is a version of DrainAndForwardMetadata that drains multiple
// sources. These sources are assumed to be the only producers left for dst, so
// dst is closed once they're all exhausted (this is different from
Expand All @@ -215,10 +242,20 @@ func sendTraceData(ctx context.Context, dst RowReceiver) {
// metadata. This is intended to have been the error, if any, that caused the
// draining.
//
// pushTrailingMeta is called after draining the sources and before calling
// dst.ProducerDone(). It gives the caller the opportunity to push some trailing
// metadata (e.g. tracing information and txn updates, if applicable).
//
// srcs can be nil.
//
// All errors are forwarded to the producer.
func DrainAndClose(ctx context.Context, dst RowReceiver, cause error, srcs ...RowSource) {
func DrainAndClose(
ctx context.Context,
dst RowReceiver,
cause error,
pushTrailingMeta func(context.Context),
srcs ...RowSource,
) {
if cause != nil {
// We ignore the returned ConsumerStatus and rely on the
// DrainAndForwardMetadata() calls below to close srcs in all cases.
Expand All @@ -236,7 +273,7 @@ func DrainAndClose(ctx context.Context, dst RowReceiver, cause error, srcs ...Ro
DrainAndForwardMetadata(ctx, srcs[0], dst)
wg.Wait()
}
sendTraceData(ctx, dst)
pushTrailingMeta(ctx)
dst.ProducerDone()
}

Expand Down Expand Up @@ -559,7 +596,7 @@ func (rb *RowBuffer) Push(row sqlbase.EncDatumRow, meta *ProducerMetadata) Consu
return status
}

// ProducerDone is part of the interface.
// ProducerDone is part of the RowSource interface.
func (rb *RowBuffer) ProducerDone() {
if rb.ProducerClosed {
panic("RowBuffer already closed")
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
defer wg.Done()
}

pushTrailingMeta := func(ctx context.Context) {
sendTraceData(ctx, h.out.output)
}

ctx := log.WithLogTag(h.flowCtx.Ctx, "HashJoiner", nil)
ctx, span := processorSpan(ctx, "hash joiner")
defer tracing.FinishSpan(span)
Expand Down Expand Up @@ -212,7 +216,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
// the consumer.
log.Infof(ctx, "buffer phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource)
return
}

Expand All @@ -229,7 +233,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
// the consumer.
log.Infof(ctx, "build phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource)
return
}
defer storedRows.Close(ctx)
Expand All @@ -241,7 +245,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
if row != nil {
if err := storedRows.AddRow(ctx, row); err != nil {
log.Infof(ctx, "unable to add row to disk %s", err)
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource)
return
}
}
Expand All @@ -262,7 +266,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
// point.
log.Infof(ctx, "probe phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, srcToClose)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, srcToClose)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsqlrun/interleaved_reader_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ func (irj *interleavedReaderJoiner) Run(wg *sync.WaitGroup) {

irj.sendMisplannedRangesMetadata(ctx)
sendTraceData(ctx, irj.out.output)
sendTxnCoordMetaMaybe(irj.flowCtx.txn, irj.out.output)
irj.out.Close()
}

Expand Down
104 changes: 100 additions & 4 deletions pkg/sql/distsqlrun/interleaved_reader_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// min and max are inclusive bounds on the root table's ID.
Expand Down Expand Up @@ -399,8 +400,9 @@ func TestInterleavedReaderJoiner(t *testing.T) {
Ctx: context.Background(),
EvalCtx: evalCtx,
Settings: s.ClusterSettings(),
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
// Run in a RootTxn so that there's no txn metadata produced.
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
}

out := &RowBuffer{}
Expand Down Expand Up @@ -527,8 +529,9 @@ func TestInterleavedReaderJoinerErrors(t *testing.T) {
flowCtx := FlowCtx{
EvalCtx: evalCtx,
Settings: s.ClusterSettings(),
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
// Run in a RootTxn so that there's no txn metadata produced.
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
}

out := &RowBuffer{}
Expand All @@ -543,3 +546,96 @@ func TestInterleavedReaderJoinerErrors(t *testing.T) {
})
}
}

func TestInterleavedReaderJoinerTrailingMetadata(t *testing.T) {
defer leaktest.AfterTest(t)()

s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.TODO())

sqlutils.CreateTable(t, sqlDB, "parent",
"id INT PRIMARY KEY",
0, // numRows
func(row int) []tree.Datum { return nil },
)

sqlutils.CreateTableInterleaved(t, sqlDB, "child",
"pid INT, id INT, PRIMARY KEY (pid, id)",
"parent (pid)",
0,
func(row int) []tree.Datum { return nil },
)

pd := sqlbase.GetTableDescriptor(kvDB, sqlutils.TestDB, "parent")
cd := sqlbase.GetTableDescriptor(kvDB, sqlutils.TestDB, "child")

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
defer evalCtx.Stop(context.Background())

// Run the flow in a snowball trace so that we can test for tracing info.
tracer := tracing.NewTracer()
ctx, sp, err := tracing.StartSnowballTrace(context.Background(), tracer, "test flow ctx")
if err != nil {
t.Fatal(err)
}
defer sp.Finish()

flowCtx := FlowCtx{
Ctx: ctx,
EvalCtx: evalCtx,
Settings: s.ClusterSettings(),
// Run in a LeafTxn so that txn metadata is produced.
txn: client.NewTxn(s.DB(), s.NodeID(), client.LeafTxn),
nodeID: s.NodeID(),
}

innerJoinSpec := InterleavedReaderJoinerSpec{
Tables: []InterleavedReaderJoinerSpec_Table{
{
Desc: *pd,
Ordering: Ordering{Columns: []Ordering_Column{{ColIdx: 0, Direction: Ordering_Column_ASC}}},
Spans: []TableReaderSpan{{Span: pd.PrimaryIndexSpan()}},
},
{
Desc: *cd,
Ordering: Ordering{Columns: []Ordering_Column{{ColIdx: 0, Direction: Ordering_Column_ASC}}},
Spans: []TableReaderSpan{{Span: cd.PrimaryIndexSpan()}},
},
},
Type: sqlbase.InnerJoin,
}

out := &RowBuffer{}
irj, err := newInterleavedReaderJoiner(&flowCtx, &innerJoinSpec, &PostProcessSpec{}, out)
if err != nil {
t.Fatal(err)
}
irj.Run(nil)
if !out.ProducerClosed {
t.Fatalf("output RowReceiver not closed")
}

// Check for trailing metadata.
var traceSeen, txnMetaSeen bool
for {
row, meta := out.Next()
if row != nil {
t.Fatalf("row was pushed unexpectedly: %s", row.String(oneIntCol))
}
if meta == nil {
break
}
if meta.TraceData != nil {
traceSeen = true
}
if meta.TxnMeta != nil {
txnMetaSeen = true
}
}
if !traceSeen {
t.Fatal("missing tracing trailing metadata")
}
if !txnMetaSeen {
t.Fatal("missing txn trailing metadata")
}
}
26 changes: 18 additions & 8 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ func (jr *joinReader) generateKey(
return sqlbase.MakeKeyFromEncDatums(types, keyRow, &jr.desc, index, primaryKeyPrefix, alloc)
}

func (jr *joinReader) pushTrailingMeta(ctx context.Context) {
sendTraceData(ctx, jr.out.output)
sendTxnCoordMetaMaybe(jr.flowCtx.txn, jr.out.output)
}

// mainLoop runs the mainLoop and returns any error.
//
// If no error is returned, the input has been drained and the output has been
Expand Down Expand Up @@ -267,7 +272,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {
if meta.Err != nil {
return meta.Err
}
if !emitHelper(ctx, &jr.out, nil /* row */, meta, jr.input) {
if !emitHelper(ctx, &jr.out, nil /* row */, meta, jr.pushTrailingMeta, jr.input) {
return nil
}
continue
Expand Down Expand Up @@ -312,7 +317,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {

if len(spans) != joinReaderBatchSize {
// This was the last batch.
sendTraceData(ctx, jr.out.output)
jr.pushTrailingMeta(ctx)
jr.out.Close()
return nil
}
Expand All @@ -325,9 +330,12 @@ func (jr *joinReader) isLookupJoin() bool {
return len(jr.lookupCols) > 0
}

// Index lookup iterates through all matches of the given `spans`. A `row`
// which corresponds to the given span (of size 1) can be provided and
// then the lookup will emit the concatenation of the rows from both tables.
// Index lookup iterates through all matches of the given spans and emits the
// corresponding row.
//
// Returns false if more rows need to be produced, true otherwise. If true is
// returned, both the inputs and the output have been drained and closed, except
// if an error is returned.
func (jr *joinReader) indexLookup(
ctx context.Context,
txn *client.Txn,
Expand Down Expand Up @@ -372,7 +380,7 @@ func (jr *joinReader) indexLookup(

if !jr.isLookupJoin() {
// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.input) {
if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.pushTrailingMeta, jr.input) {
return true, nil
}
} else {
Expand All @@ -386,7 +394,9 @@ func (jr *joinReader) indexLookup(
}

// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, jr.renderedRow, nil /* meta */, jr.input) {
if !emitHelper(
ctx, &jr.out, jr.renderedRow, nil /* meta */, jr.pushTrailingMeta, jr.input,
) {
return true, nil
}
}
Expand All @@ -409,6 +419,6 @@ func (jr *joinReader) Run(wg *sync.WaitGroup) {

err := jr.mainLoop(ctx)
if err != nil {
DrainAndClose(ctx, jr.out.output, err /* cause */, jr.input)
DrainAndClose(ctx, jr.out.output, err /* cause */, jr.pushTrailingMeta, jr.input)
}
}
Loading

0 comments on commit 3e44219

Please sign in to comment.