From f31f553ee40677598653c4a35de82de8b06b5607 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 2 Apr 2018 11:21:10 -0400 Subject: [PATCH] distsqlrun: generate TxnCoordMeta in processors that weren't doing it Every processor that uses the flow's txn needs to send metadata about its reads to the root client.Txn/TxnCoordSender. The most critical thing in that metadata is the read spans. Only the TableReader was doing it, but more processors need to. Fixes #24385 Release note: None --- pkg/ccl/importccl/csv.go | 5 +- pkg/internal/client/txn.go | 5 + pkg/sql/distsqlrun/base.go | 43 +++++++- pkg/sql/distsqlrun/hashjoiner.go | 12 +- .../distsqlrun/interleaved_reader_joiner.go | 1 + .../interleaved_reader_joiner_test.go | 104 +++++++++++++++++- pkg/sql/distsqlrun/joinreader.go | 26 +++-- pkg/sql/distsqlrun/joinreader_test.go | 38 ++++++- pkg/sql/distsqlrun/processors.go | 7 +- pkg/sql/distsqlrun/sample_aggregator.go | 10 +- pkg/sql/distsqlrun/sampler.go | 14 ++- pkg/sql/distsqlrun/tablereader.go | 9 +- 12 files changed, 239 insertions(+), 35 deletions(-) diff --git a/pkg/ccl/importccl/csv.go b/pkg/ccl/importccl/csv.go index 48d0560b0943..6a55039d23a3 100644 --- a/pkg/ccl/importccl/csv.go +++ b/pkg/ccl/importccl/csv.go @@ -1398,7 +1398,7 @@ func (cp *readCSVProcessor) Run(wg *sync.WaitGroup) { return nil }) if err := group.Wait(); err != nil { - distsqlrun.DrainAndClose(ctx, cp.output, err) + distsqlrun.DrainAndClose(ctx, cp.output, err, func(context.Context) {} /* pushTrailingMeta */) return } @@ -1653,7 +1653,8 @@ func (sp *sstWriter) Run(wg *sync.WaitGroup) { } return nil }() - distsqlrun.DrainAndClose(ctx, sp.output, err, sp.input) + distsqlrun.DrainAndClose( + ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input) } type importResumer struct { diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index 1e59732d5851..fb353c3d57a2 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -1243,3 +1243,8 @@ func (txn *Txn) IsSerializablePushAndRefreshNotPossible() bool { return txn.Proto().Isolation == enginepb.SERIALIZABLE && isTxnPushed && txn.mu.Proto.OrigTimestampWasObserved } + +// Type returns the transaction's type. +func (txn *Txn) Type() TxnType { + return txn.typ +} diff --git a/pkg/sql/distsqlrun/base.go b/pkg/sql/distsqlrun/base.go index cf3519afdf7f..cd4895099c7e 100644 --- a/pkg/sql/distsqlrun/base.go +++ b/pkg/sql/distsqlrun/base.go @@ -22,12 +22,14 @@ import ( opentracing "github.com/opentracing/opentracing-go" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) const rowChannelBufSize = 16 @@ -200,12 +202,37 @@ func getTraceData(ctx context.Context) []tracing.RecordedSpan { return nil } +// sendTraceData collects the tracing information from the ctx and pushes it to +// dst. The ConsumerStatus returned by dst is ignored. +// +// Note that the tracing data is distinct between different processors, since +// each one gets its own trace "recording group". func sendTraceData(ctx context.Context, dst RowReceiver) { if rec := getTraceData(ctx); rec != nil { dst.Push(nil /* row */, &ProducerMetadata{TraceData: rec}) } } +// sendTxnCoordMetaMaybe reads the txn metadata from a leaf transactions and +// sends it to dst, so that it eventually makes it to the root txn. The +// ConsumerStatus returned by dst is ignored. +// +// If the txn is a root txn, this is a no-op. +// +// NOTE(andrei): As of 04/2018, the txn is shared by all processors scheduled on +// a node, and so it's possible for multiple processors to send the same +// TxnCoordMeta. The root TxnCoordSender doesn't care if it receives the same +// thing multiple times. +func sendTxnCoordMetaMaybe(txn *client.Txn, dst RowReceiver) { + if txn.Type() == client.RootTxn { + return + } + txnMeta := txn.GetTxnCoordMeta() + if txnMeta.Txn.ID != (uuid.UUID{}) { + dst.Push(nil /* row */, &ProducerMetadata{TxnMeta: &txnMeta}) + } +} + // DrainAndClose is a version of DrainAndForwardMetadata that drains multiple // sources. These sources are assumed to be the only producers left for dst, so // dst is closed once they're all exhausted (this is different from @@ -215,10 +242,20 @@ func sendTraceData(ctx context.Context, dst RowReceiver) { // metadata. This is intended to have been the error, if any, that caused the // draining. // +// pushTrailingMeta is called after draining the sources and before calling +// dst.ProducerDone(). It gives the caller the opportunity to push some trailing +// metadata (e.g. tracing information and txn updates, if applicable). +// // srcs can be nil. // // All errors are forwarded to the producer. -func DrainAndClose(ctx context.Context, dst RowReceiver, cause error, srcs ...RowSource) { +func DrainAndClose( + ctx context.Context, + dst RowReceiver, + cause error, + pushTrailingMeta func(context.Context), + srcs ...RowSource, +) { if cause != nil { // We ignore the returned ConsumerStatus and rely on the // DrainAndForwardMetadata() calls below to close srcs in all cases. @@ -236,7 +273,7 @@ func DrainAndClose(ctx context.Context, dst RowReceiver, cause error, srcs ...Ro DrainAndForwardMetadata(ctx, srcs[0], dst) wg.Wait() } - sendTraceData(ctx, dst) + pushTrailingMeta(ctx) dst.ProducerDone() } @@ -559,7 +596,7 @@ func (rb *RowBuffer) Push(row sqlbase.EncDatumRow, meta *ProducerMetadata) Consu return status } -// ProducerDone is part of the interface. +// ProducerDone is part of the RowSource interface. func (rb *RowBuffer) ProducerDone() { if rb.ProducerClosed { panic("RowBuffer already closed") diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index 95c4999b273c..7110a8bc767d 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -153,6 +153,10 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) { defer wg.Done() } + pushTrailingMeta := func(ctx context.Context) { + sendTraceData(ctx, h.out.output) + } + ctx := log.WithLogTag(h.flowCtx.Ctx, "HashJoiner", nil) ctx, span := processorSpan(ctx, "hash joiner") defer tracing.FinishSpan(span) @@ -212,7 +216,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) { // the consumer. log.Infof(ctx, "buffer phase error %s", err) } - DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource) + DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource) return } @@ -229,7 +233,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) { // the consumer. log.Infof(ctx, "build phase error %s", err) } - DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource) + DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource) return } defer storedRows.Close(ctx) @@ -241,7 +245,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) { if row != nil { if err := storedRows.AddRow(ctx, row); err != nil { log.Infof(ctx, "unable to add row to disk %s", err) - DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource) + DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, h.leftSource, h.rightSource) return } } @@ -262,7 +266,7 @@ func (h *hashJoiner) Run(wg *sync.WaitGroup) { // point. log.Infof(ctx, "probe phase error %s", err) } - DrainAndClose(ctx, h.out.output, err /* cause */, srcToClose) + DrainAndClose(ctx, h.out.output, err /* cause */, pushTrailingMeta, srcToClose) } } diff --git a/pkg/sql/distsqlrun/interleaved_reader_joiner.go b/pkg/sql/distsqlrun/interleaved_reader_joiner.go index ad704af071af..2a8aec81fbca 100644 --- a/pkg/sql/distsqlrun/interleaved_reader_joiner.go +++ b/pkg/sql/distsqlrun/interleaved_reader_joiner.go @@ -397,6 +397,7 @@ func (irj *interleavedReaderJoiner) Run(wg *sync.WaitGroup) { irj.sendMisplannedRangesMetadata(ctx) sendTraceData(ctx, irj.out.output) + sendTxnCoordMetaMaybe(irj.flowCtx.txn, irj.out.output) irj.out.Close() } diff --git a/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go b/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go index 75a950566427..c4889d30d4db 100644 --- a/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go +++ b/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) // min and max are inclusive bounds on the root table's ID. @@ -399,8 +400,9 @@ func TestInterleavedReaderJoiner(t *testing.T) { Ctx: context.Background(), EvalCtx: evalCtx, Settings: s.ClusterSettings(), - txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn), - nodeID: s.NodeID(), + // Run in a RootTxn so that there's no txn metadata produced. + txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn), + nodeID: s.NodeID(), } out := &RowBuffer{} @@ -527,8 +529,9 @@ func TestInterleavedReaderJoinerErrors(t *testing.T) { flowCtx := FlowCtx{ EvalCtx: evalCtx, Settings: s.ClusterSettings(), - txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn), - nodeID: s.NodeID(), + // Run in a RootTxn so that there's no txn metadata produced. + txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn), + nodeID: s.NodeID(), } out := &RowBuffer{} @@ -543,3 +546,96 @@ func TestInterleavedReaderJoinerErrors(t *testing.T) { }) } } + +func TestInterleavedReaderJoinerTrailingMetadata(t *testing.T) { + defer leaktest.AfterTest(t)() + + s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.TODO()) + + sqlutils.CreateTable(t, sqlDB, "parent", + "id INT PRIMARY KEY", + 0, // numRows + func(row int) []tree.Datum { return nil }, + ) + + sqlutils.CreateTableInterleaved(t, sqlDB, "child", + "pid INT, id INT, PRIMARY KEY (pid, id)", + "parent (pid)", + 0, + func(row int) []tree.Datum { return nil }, + ) + + pd := sqlbase.GetTableDescriptor(kvDB, sqlutils.TestDB, "parent") + cd := sqlbase.GetTableDescriptor(kvDB, sqlutils.TestDB, "child") + + evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings()) + defer evalCtx.Stop(context.Background()) + + // Run the flow in a snowball trace so that we can test for tracing info. + tracer := tracing.NewTracer() + ctx, sp, err := tracing.StartSnowballTrace(context.Background(), tracer, "test flow ctx") + if err != nil { + t.Fatal(err) + } + defer sp.Finish() + + flowCtx := FlowCtx{ + Ctx: ctx, + EvalCtx: evalCtx, + Settings: s.ClusterSettings(), + // Run in a LeafTxn so that txn metadata is produced. + txn: client.NewTxn(s.DB(), s.NodeID(), client.LeafTxn), + nodeID: s.NodeID(), + } + + innerJoinSpec := InterleavedReaderJoinerSpec{ + Tables: []InterleavedReaderJoinerSpec_Table{ + { + Desc: *pd, + Ordering: Ordering{Columns: []Ordering_Column{{ColIdx: 0, Direction: Ordering_Column_ASC}}}, + Spans: []TableReaderSpan{{Span: pd.PrimaryIndexSpan()}}, + }, + { + Desc: *cd, + Ordering: Ordering{Columns: []Ordering_Column{{ColIdx: 0, Direction: Ordering_Column_ASC}}}, + Spans: []TableReaderSpan{{Span: cd.PrimaryIndexSpan()}}, + }, + }, + Type: sqlbase.InnerJoin, + } + + out := &RowBuffer{} + irj, err := newInterleavedReaderJoiner(&flowCtx, &innerJoinSpec, &PostProcessSpec{}, out) + if err != nil { + t.Fatal(err) + } + irj.Run(nil) + if !out.ProducerClosed { + t.Fatalf("output RowReceiver not closed") + } + + // Check for trailing metadata. + var traceSeen, txnMetaSeen bool + for { + row, meta := out.Next() + if row != nil { + t.Fatalf("row was pushed unexpectedly: %s", row.String(oneIntCol)) + } + if meta == nil { + break + } + if meta.TraceData != nil { + traceSeen = true + } + if meta.TxnMeta != nil { + txnMetaSeen = true + } + } + if !traceSeen { + t.Fatal("missing tracing trailing metadata") + } + if !txnMetaSeen { + t.Fatal("missing txn trailing metadata") + } +} diff --git a/pkg/sql/distsqlrun/joinreader.go b/pkg/sql/distsqlrun/joinreader.go index b69d13ce6fc2..faa90534f7d6 100644 --- a/pkg/sql/distsqlrun/joinreader.go +++ b/pkg/sql/distsqlrun/joinreader.go @@ -234,6 +234,11 @@ func (jr *joinReader) generateKey( return sqlbase.MakeKeyFromEncDatums(types, keyRow, &jr.desc, index, primaryKeyPrefix, alloc) } +func (jr *joinReader) pushTrailingMeta(ctx context.Context) { + sendTraceData(ctx, jr.out.output) + sendTxnCoordMetaMaybe(jr.flowCtx.txn, jr.out.output) +} + // mainLoop runs the mainLoop and returns any error. // // If no error is returned, the input has been drained and the output has been @@ -267,7 +272,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error { if meta.Err != nil { return meta.Err } - if !emitHelper(ctx, &jr.out, nil /* row */, meta, jr.input) { + if !emitHelper(ctx, &jr.out, nil /* row */, meta, jr.pushTrailingMeta, jr.input) { return nil } continue @@ -312,7 +317,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error { if len(spans) != joinReaderBatchSize { // This was the last batch. - sendTraceData(ctx, jr.out.output) + jr.pushTrailingMeta(ctx) jr.out.Close() return nil } @@ -325,9 +330,12 @@ func (jr *joinReader) isLookupJoin() bool { return len(jr.lookupCols) > 0 } -// Index lookup iterates through all matches of the given `spans`. A `row` -// which corresponds to the given span (of size 1) can be provided and -// then the lookup will emit the concatenation of the rows from both tables. +// Index lookup iterates through all matches of the given spans and emits the +// corresponding row. +// +// Returns false if more rows need to be produced, true otherwise. If true is +// returned, both the inputs and the output have been drained and closed, except +// if an error is returned. func (jr *joinReader) indexLookup( ctx context.Context, txn *client.Txn, @@ -372,7 +380,7 @@ func (jr *joinReader) indexLookup( if !jr.isLookupJoin() { // Emit the row; stop if no more rows are needed. - if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.input) { + if !emitHelper(ctx, &jr.out, indexRow, nil /* meta */, jr.pushTrailingMeta, jr.input) { return true, nil } } else { @@ -386,7 +394,9 @@ func (jr *joinReader) indexLookup( } // Emit the row; stop if no more rows are needed. - if !emitHelper(ctx, &jr.out, jr.renderedRow, nil /* meta */, jr.input) { + if !emitHelper( + ctx, &jr.out, jr.renderedRow, nil /* meta */, jr.pushTrailingMeta, jr.input, + ) { return true, nil } } @@ -409,6 +419,6 @@ func (jr *joinReader) Run(wg *sync.WaitGroup) { err := jr.mainLoop(ctx) if err != nil { - DrainAndClose(ctx, jr.out.output, err /* cause */, jr.input) + DrainAndClose(ctx, jr.out.output, err /* cause */, jr.pushTrailingMeta, jr.input) } } diff --git a/pkg/sql/distsqlrun/joinreader_test.go b/pkg/sql/distsqlrun/joinreader_test.go index 926a7eaa5f36..70613f1f280b 100644 --- a/pkg/sql/distsqlrun/joinreader_test.go +++ b/pkg/sql/distsqlrun/joinreader_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) func TestJoinReader(t *testing.T) { @@ -252,11 +253,20 @@ func TestJoinReaderDrain(t *testing.T) { evalCtx := tree.MakeTestingEvalContext(s.ClusterSettings()) defer evalCtx.Stop(context.Background()) + + // Run the flow in a snowball trace so that we can test for tracing info. + tracer := tracing.NewTracer() + ctx, sp, err := tracing.StartSnowballTrace(context.Background(), tracer, "test flow ctx") + if err != nil { + t.Fatal(err) + } + defer sp.Finish() + flowCtx := FlowCtx{ - Ctx: context.Background(), + Ctx: ctx, EvalCtx: evalCtx, Settings: s.ClusterSettings(), - txn: client.NewTxn(s.DB(), s.NodeID(), client.RootTxn), + txn: client.NewTxn(s.DB(), s.NodeID(), client.LeafTxn), } encRow := make(sqlbase.EncDatumRow, 1) @@ -300,5 +310,29 @@ func TestJoinReaderDrain(t *testing.T) { if meta.Err != expectedMetaErr { t.Fatalf("unexpected error in metadata: %v", meta.Err) } + + // Check for trailing metadata. + var traceSeen, txnMetaSeen bool + for { + row, meta = out.Next() + if row != nil { + t.Fatalf("row was pushed unexpectedly: %s", row.String(oneIntCol)) + } + if meta == nil { + break + } + if meta.TraceData != nil { + traceSeen = true + } + if meta.TxnMeta != nil { + txnMetaSeen = true + } + } + if !traceSeen { + t.Fatal("missing tracing trailing metadata") + } + if !txnMetaSeen { + t.Fatal("missing txn trailing metadata") + } }) } diff --git a/pkg/sql/distsqlrun/processors.go b/pkg/sql/distsqlrun/processors.go index 92ca89cbc962..d6ff82e4aaa6 100644 --- a/pkg/sql/distsqlrun/processors.go +++ b/pkg/sql/distsqlrun/processors.go @@ -194,6 +194,10 @@ func (h *ProcOutputHelper) neededColumns() (colIdxs util.FastIntSet) { // // inputs are optional. // +// pushTrailingMeta is called after draining the sources and before calling +// dst.ProducerDone(). It gives the caller the opportunity to push some trailing +// metadata (e.g. tracing information and txn updates, if applicable). +// // Returns true if more rows are needed, false otherwise. If false is returned // both the inputs and the output have been properly closed. func emitHelper( @@ -201,6 +205,7 @@ func emitHelper( output *ProcOutputHelper, row sqlbase.EncDatumRow, meta *ProducerMetadata, + pushTrailingMeta func(context.Context), inputs ...RowSource, ) bool { if output.output == nil { @@ -230,7 +235,7 @@ func emitHelper( return true case DrainRequested: log.VEventf(ctx, 1, "no more rows required. drain requested.") - DrainAndClose(ctx, output.output, nil /* cause */, inputs...) + DrainAndClose(ctx, output.output, nil /* cause */, pushTrailingMeta, inputs...) return false case ConsumerClosed: log.VEventf(ctx, 1, "no more rows required. Consumer shut down.") diff --git a/pkg/sql/distsqlrun/sample_aggregator.go b/pkg/sql/distsqlrun/sample_aggregator.go index 349dcb7ce35b..1185c7f30955 100644 --- a/pkg/sql/distsqlrun/sample_aggregator.go +++ b/pkg/sql/distsqlrun/sample_aggregator.go @@ -106,6 +106,10 @@ func newSampleAggregator( return s, nil } +func (s *sampleAggregator) pushTrailingMeta(ctx context.Context) { + sendTraceData(ctx, s.out.output) +} + // Run is part of the Processor interface. func (s *sampleAggregator) Run(wg *sync.WaitGroup) { if wg != nil { @@ -116,9 +120,9 @@ func (s *sampleAggregator) Run(wg *sync.WaitGroup) { earlyExit, err := s.mainLoop(ctx) if err != nil { - DrainAndClose(ctx, s.out.output, err, s.input) + DrainAndClose(ctx, s.out.output, err, s.pushTrailingMeta, s.input) } else if !earlyExit { - sendTraceData(ctx, s.out.output) + s.pushTrailingMeta(ctx) s.input.ConsumerClosed() s.out.Close() } @@ -130,7 +134,7 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, _ erro for { row, meta := s.input.Next() if meta != nil { - if !emitHelper(ctx, &s.out, nil /* row */, meta, s.input) { + if !emitHelper(ctx, &s.out, nil /* row */, meta, s.pushTrailingMeta, s.input) { // No cleanup required; emitHelper() took care of it. return true, nil } diff --git a/pkg/sql/distsqlrun/sampler.go b/pkg/sql/distsqlrun/sampler.go index 413a789afdc5..2f6567faffff 100644 --- a/pkg/sql/distsqlrun/sampler.go +++ b/pkg/sql/distsqlrun/sampler.go @@ -125,6 +125,10 @@ func newSamplerProcessor( return s, nil } +func (s *samplerProcessor) pushTrailingMeta(ctx context.Context) { + sendTraceData(ctx, s.out.output) +} + // Run is part of the Processor interface. func (s *samplerProcessor) Run(wg *sync.WaitGroup) { if wg != nil { @@ -135,9 +139,9 @@ func (s *samplerProcessor) Run(wg *sync.WaitGroup) { earlyExit, err := s.mainLoop(ctx) if err != nil { - DrainAndClose(ctx, s.out.output, err, s.input) + DrainAndClose(ctx, s.out.output, err, s.pushTrailingMeta, s.input) } else if !earlyExit { - sendTraceData(ctx, s.out.output) + s.pushTrailingMeta(ctx) s.input.ConsumerClosed() s.out.Close() } @@ -150,7 +154,7 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ erro for { row, meta := s.input.Next() if meta != nil { - if !emitHelper(ctx, &s.out, nil /* row */, meta, s.input) { + if !emitHelper(ctx, &s.out, nil /* row */, meta, s.pushTrailingMeta, s.input) { // No cleanup required; emitHelper() took care of it. return true, nil } @@ -193,7 +197,7 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ erro for _, sample := range s.sr.Get() { copy(outRow, sample.Row) outRow[s.rankCol] = sqlbase.EncDatum{Datum: tree.NewDInt(tree.DInt(sample.Rank))} - if !emitHelper(ctx, &s.out, outRow, nil /* meta */, s.input) { + if !emitHelper(ctx, &s.out, outRow, nil /* meta */, s.pushTrailingMeta, s.input) { return true, nil } } @@ -214,7 +218,7 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ erro return false, err } outRow[s.sketchCol] = sqlbase.EncDatum{Datum: tree.NewDBytes(tree.DBytes(data))} - if !emitHelper(ctx, &s.out, outRow, nil /* meta */, s.input) { + if !emitHelper(ctx, &s.out, outRow, nil /* meta */, s.pushTrailingMeta, s.input) { return true, nil } } diff --git a/pkg/sql/distsqlrun/tablereader.go b/pkg/sql/distsqlrun/tablereader.go index ab9613d78c43..839dff94397e 100644 --- a/pkg/sql/distsqlrun/tablereader.go +++ b/pkg/sql/distsqlrun/tablereader.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -158,9 +159,11 @@ func (tr *tableReader) producerMeta(err error) *ProducerMetadata { if traceData != nil { tr.trailingMetadata = append(tr.trailingMetadata, ProducerMetadata{TraceData: traceData}) } - txnMeta := tr.flowCtx.txn.GetTxnCoordMeta() - if txnMeta.Txn.ID != (uuid.UUID{}) { - tr.trailingMetadata = append(tr.trailingMetadata, ProducerMetadata{TxnMeta: &txnMeta}) + if tr.flowCtx.txn.Type() == client.LeafTxn { + txnMeta := tr.flowCtx.txn.GetTxnCoordMeta() + if txnMeta.Txn.ID != (uuid.UUID{}) { + tr.trailingMetadata = append(tr.trailingMetadata, ProducerMetadata{TxnMeta: &txnMeta}) + } } tr.close() }