Skip to content

Commit

Permalink
distsqlrun: generate TxnCoordMeta in processors that weren't doing it
Browse files Browse the repository at this point in the history
Every processor that uses the flow's txn needs to send metadata about
its reads to the root client.Txn/TxnCoordSender. The most critical thing
in that metadata is the read spans. Only the TableReader was doing it,
but more processors need to.

Fixes #24385

Release note: None
  • Loading branch information
andreimatei committed Apr 9, 2018
1 parent 4cb8a1a commit f31f553
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 35 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/importccl/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ func (cp *readCSVProcessor) Run(wg *sync.WaitGroup) {
return nil
})
if err := group.Wait(); err != nil {
distsqlrun.DrainAndClose(ctx, cp.output, err)
distsqlrun.DrainAndClose(ctx, cp.output, err, func(context.Context) {} /* pushTrailingMeta */)
return
}

Expand Down Expand Up @@ -1653,7 +1653,8 @@ func (sp *sstWriter) Run(wg *sync.WaitGroup) {
}
return nil
}()
distsqlrun.DrainAndClose(ctx, sp.output, err, sp.input)
distsqlrun.DrainAndClose(
ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input)
}

type importResumer struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,3 +1243,8 @@ func (txn *Txn) IsSerializablePushAndRefreshNotPossible() bool {
return txn.Proto().Isolation == enginepb.SERIALIZABLE &&
isTxnPushed && txn.mu.Proto.OrigTimestampWasObserved
}

// Type returns the transaction's type.
func (txn *Txn) Type() TxnType {
return txn.typ
}
43 changes: 40 additions & 3 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (

opentracing "github.com/opentracing/opentracing-go"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

const rowChannelBufSize = 16
Expand Down Expand Up @@ -200,12 +202,37 @@ func getTraceData(ctx context.Context) []tracing.RecordedSpan {
return nil
}

// sendTraceData collects the tracing information from the ctx and pushes it to
// dst. The ConsumerStatus returned by dst is ignored.
//
// Note that the tracing data is distinct between different processors, since
// each one gets its own trace "recording group".
func sendTraceData(ctx context.Context, dst RowReceiver) {
if rec := getTraceData(ctx); rec != nil {
dst.Push(nil /* row */, &ProducerMetadata{TraceData: rec})
}
}

// sendTxnCoordMetaMaybe reads the txn metadata from a leaf transactions and
// sends it to dst, so that it eventually makes it to the root txn. The
// ConsumerStatus returned by dst is ignored.
//
// If the txn is a root txn, this is a no-op.
//
// NOTE(andrei): As of 04/2018, the txn is shared by all processors scheduled on
// a node, and so it's possible for multiple processors to send the same
// TxnCoordMeta. The root TxnCoordSender doesn't care if it receives the same
// thing multiple times.
func sendTxnCoordMetaMaybe(txn *client.Txn, dst RowReceiver) {
if txn.Type() == client.RootTxn {
return
}
txnMeta := txn.GetTxnCoordMeta()
if txnMeta.Txn.ID != (uuid.UUID{}) {
dst.Push(nil /* row */, &ProducerMetadata{TxnMeta: &txnMeta})
}
}

// DrainAndClose is a version of DrainAndForwardMetadata that drains multiple
// sources. These sources are assumed to be the only producers left for dst, so
// dst is closed once they're all exhausted (this is different from
Expand All @@ -215,10 +242,20 @@ func sendTraceData(ctx context.Context, dst RowReceiver) {
// metadata. This is intended to have been the error, if any, that caused the
// draining.
//
// pushTrailingMeta is called after draining the sources and before calling
// dst.ProducerDone(). It gives the caller the opportunity to push some trailing
// metadata (e.g. tracing information and txn updates, if applicable).
//
// srcs can be nil.
//
// All errors are forwarded to the producer.
func DrainAndClose(ctx context.Context, dst RowReceiver, cause error, srcs ...RowSource) {
func DrainAndClose(
ctx context.Context,
dst RowReceiver,
cause error,
pushTrailingMeta func(context.Context),
srcs ...RowSource,
) {
if cause != nil {
// We ignore the returned ConsumerStatus and rely on the
// DrainAndForwardMetadata() calls below to close srcs in all cases.
Expand All @@ -236,7 +273,7 @@ func DrainAndClose(ctx context.Context, dst RowReceiver, cause error, srcs ...Ro
DrainAndForwardMetadata(ctx, srcs[0], dst)
wg.Wait()
}
sendTraceData(ctx, dst)
pushTrailingMeta(ctx)
dst.ProducerDone()
}

Expand Down Expand Up @@ -559,7 +596,7 @@ func (rb *RowBuffer) Push(row sqlbase.EncDatumRow, meta *ProducerMetadata) Consu
return status
}

// ProducerDone is part of the interface.
// ProducerDone is part of the RowSource interface.
func (rb *RowBuffer) ProducerDone() {
if rb.ProducerClosed {
panic("RowBuffer already closed")
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
defer wg.Done()
}

pushTrailingMeta := func(ctx context.Context) {
sendTraceData(ctx, h.out.output)
}

ctx := log.WithLogTag(h.flowCtx.Ctx, "HashJoiner", nil)
ctx, span := processorSpan(ctx, "hash joiner")
defer tracing.FinishSpan(span)
Expand Down Expand Up @@ -212,7 +216,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
// the consumer.
log.Infof(ctx, "buffer phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource)
return
}

Expand All @@ -229,7 +233,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
// the consumer.
log.Infof(ctx, "build phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource)
return
}
defer storedRows.Close(ctx)
Expand All @@ -241,7 +245,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
if row != nil {
if err := storedRows.AddRow(ctx, row); err != nil {
log.Infof(ctx, "unable to add row to disk %s", err)
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource)
return
}
}
Expand All @@ -262,7 +266,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) {
// point.
log.Infof(ctx, "probe phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, srcToClose)
DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, srcToClose)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsqlrun/interleaved_reader_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ func (irj *interleavedReaderJoiner) Run(wg *sync.WaitGroup) {

irj.sendMisplannedRangesMetadata(ctx)
sendTraceData(ctx, irj.out.output)
sendTxnCoordMetaMaybe(irj.flowCtx.txn, irj.out.output)
irj.out.Close()
}

Expand Down
104 changes: 100 additions & 4 deletions pkg/sql/distsqlrun/interleaved_reader_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// min and max are inclusive bounds on the root table's ID.
Expand Down Expand Up @@ -399,8 +400,9 @@ func TestInterleavedReaderJoiner(t *testing.T) {
Ctx: context.Background(),
EvalCtx: evalCtx,
Settings: s.ClusterSettings(),
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
// Run in a RootTxn so that there's no txn metadata produced.
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
}

out := &RowBuffer{}
Expand Down Expand Up @@ -527,8 +529,9 @@ func TestInterleavedReaderJoinerErrors(t *testing.T) {
flowCtx := FlowCtx{
EvalCtx: evalCtx,
Settings: s.ClusterSettings(),
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
// Run in a RootTxn so that there's no txn metadata produced.
txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn),
nodeID: s.NodeID(),
}

out := &RowBuffer{}
Expand All @@ -543,3 +546,96 @@ func TestInterleavedReaderJoinerErrors(t *testing.T) {
})
}
}

func TestInterleavedReaderJoinerTrailingMetadata(t *testing.T) {
defer leaktest.AfterTest(t)()

s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(context.TODO())

sqlutils.CreateTable(t, sqlDB, "parent",
"id INT PRIMARY KEY",
0, // numRows
func(row int) []tree.Datum { return nil },
)

sqlutils.CreateTableInterleaved(t, sqlDB, "child",
"pid INT, id INT, PRIMARY KEY (pid, id)",
"parent (pid)",
0,
func(row int) []tree.Datum { return nil },
)

pd := sqlbase.GetTableDescriptor(kvDB, sqlutils.TestDB, "parent")
cd := sqlbase.GetTableDescriptor(kvDB, sqlutils.TestDB, "child")

evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings())
defer evalCtx.Stop(context.Background())

// Run the flow in a snowball trace so that we can test for tracing info.
tracer := tracing.NewTracer()
ctx, sp, err := tracing.StartSnowballTrace(context.Background(), tracer, "test flow ctx")
if err != nil {
t.Fatal(err)
}
defer sp.Finish()

flowCtx := FlowCtx{
Ctx: ctx,
EvalCtx: evalCtx,
Settings: s.ClusterSettings(),
// Run in a LeafTxn so that txn metadata is produced.
txn: client.NewTxn(s.DB(), s.NodeID(), client.LeafTxn),
nodeID: s.NodeID(),
}

innerJoinSpec := InterleavedReaderJoinerSpec{
Tables: []InterleavedReaderJoinerSpec_Table{
{
Desc: *pd,
Ordering: Ordering{Columns: []Ordering_Column{{ColIdx: 0, Direction: Ordering_Column_ASC}}},
Spans: []TableReaderSpan{{Span: pd.PrimaryIndexSpan()}},
},
{
Desc: *cd,
Ordering: Ordering{Columns: []Ordering_Column{{ColIdx: 0, Direction: Ordering_Column_ASC}}},
Spans: []TableReaderSpan{{Span: cd.PrimaryIndexSpan()}},
},
},
Type: sqlbase.InnerJoin,
}

out := &RowBuffer{}
irj, err := newInterleavedReaderJoiner(&flowCtx, &innerJoinSpec, &PostProcessSpec{}, out)
if err != nil {
t.Fatal(err)
}
irj.Run(nil)
if !out.ProducerClosed {
t.Fatalf("output RowReceiver not closed")
}

// Check for trailing metadata.
var traceSeen, txnMetaSeen bool
for {
row, meta := out.Next()
if row != nil {
t.Fatalf("row was pushed unexpectedly: %s", row.String(oneIntCol))
}
if meta == nil {
break
}
if meta.TraceData != nil {
traceSeen = true
}
if meta.TxnMeta != nil {
txnMetaSeen = true
}
}
if !traceSeen {
t.Fatal("missing tracing trailing metadata")
}
if !txnMetaSeen {
t.Fatal("missing txn trailing metadata")
}
}
26 changes: 18 additions & 8 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ func (jr *joinReader) generateKey(
return sqlbase.MakeKeyFromEncDatums(types, keyRow, &jr.desc, index, primaryKeyPrefix, alloc)
}

func (jr *joinReader) pushTrailingMeta(ctx context.Context) {
sendTraceData(ctx, jr.out.output)
sendTxnCoordMetaMaybe(jr.flowCtx.txn, jr.out.output)
}

// mainLoop runs the mainLoop and returns any error.
//
// If no error is returned, the input has been drained and the output has been
Expand Down Expand Up @@ -267,7 +272,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {
if meta.Err != nil {
return meta.Err
}
if !emitHelper(ctx, &jr.out, nil /* row */, meta, jr.input) {
if !emitHelper(ctx, &jr.out, nil /* row */, meta, jr.pushTrailingMeta, jr.input) {
return nil
}
continue
Expand Down Expand Up @@ -312,7 +317,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error {

if len(spans) != joinReaderBatchSize {
// This was the last batch.
sendTraceData(ctx, jr.out.output)
jr.pushTrailingMeta(ctx)
jr.out.Close()
return nil
}
Expand All @@ -325,9 +330,12 @@ func (jr *joinReader) isLookupJoin() bool {
return len(jr.lookupCols) > 0
}

// Index lookup iterates through all matches of the given `spans`. A `row`
// which corresponds to the given span (of size 1) can be provided and
// then the lookup will emit the concatenation of the rows from both tables.
// Index lookup iterates through all matches of the given spans and emits the
// corresponding row.
//
// Returns false if more rows need to be produced, true otherwise. If true is
// returned, both the inputs and the output have been drained and closed, except
// if an error is returned.
func (jr *joinReader) indexLookup(
ctx context.Context,
txn *client.Txn,
Expand Down Expand Up @@ -372,7 +380,7 @@ func (jr *joinReader) indexLookup(

if !jr.isLookupJoin() {
// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.input) {
if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.pushTrailingMeta, jr.input) {
return true, nil
}
} else {
Expand All @@ -386,7 +394,9 @@ func (jr *joinReader) indexLookup(
}

// Emit the row; stop if no more rows are needed.
if !emitHelper(ctx, &jr.out, jr.renderedRow, nil /* meta */, jr.input) {
if !emitHelper(
ctx, &jr.out, jr.renderedRow, nil /* meta */, jr.pushTrailingMeta, jr.input,
) {
return true, nil
}
}
Expand All @@ -409,6 +419,6 @@ func (jr *joinReader) Run(wg *sync.WaitGroup) {

err := jr.mainLoop(ctx)
if err != nil {
DrainAndClose(ctx, jr.out.output, err /* cause */, jr.input)
DrainAndClose(ctx, jr.out.output, err /* cause */, jr.pushTrailingMeta, jr.input)
}
}
Loading

0 comments on commit f31f553

Please sign in to comment.