From 5fea281c82d40e9949386fafba93abd37ffb88c4 Mon Sep 17 00:00:00 2001 From: Celia La Date: Fri, 23 Jul 2021 12:10:17 -0400 Subject: [PATCH 1/5] cluster-ui: bump package.json version In #67722, we updated peerDependencies, but forgot to bump the version in package.json. This fast-follow PR bumps the version for #67722. Release note: None Co-authored-by: Lauren Barker --- pkg/ui/cluster-ui/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ui/cluster-ui/package.json b/pkg/ui/cluster-ui/package.json index cb66a98c034a..1cd02a58e02c 100644 --- a/pkg/ui/cluster-ui/package.json +++ b/pkg/ui/cluster-ui/package.json @@ -1,6 +1,6 @@ { "name": "@cockroachlabs/cluster-ui", - "version": "21.2.0-prerelease-2", + "version": "21.2.0-prerelease-3", "description": "Cluster UI is a library of large features shared between CockroachDB and CockroachCloud", "repository": { "type": "git", From f8ead3c43e97798ab4f8e7b1b1243080116f84c4 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Mon, 26 Jul 2021 22:19:06 -0400 Subject: [PATCH 2/5] roachtest: create perf dir for bulk roachtests When the bulk op roachtests were updated to avoid racing when writing their stats files, the creation of the perf directory itself was removed. This adds it back. There was some consideration to update PutString to create the filepath.Dir of its destination but that refactor was left for a potential follow up since it applies to other tests as well. Release note: None --- pkg/cmd/roachtest/tests/backup.go | 3 +++ pkg/cmd/roachtest/tests/import.go | 6 ++++++ pkg/cmd/roachtest/tests/restore.go | 3 +++ 3 files changed, 12 insertions(+) diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index 285492e77b85..7b3ae9eac156 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -197,6 +197,9 @@ func registerBackup(r registry.Registry) { // Upload the perf artifacts to any one of the nodes so that the test // runner copies it into an appropriate directory path. dest := filepath.Join(t.PerfArtifactsDir(), "stats.json") + if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { + log.Errorf(ctx, "failed to create perf dir: %+v", err) + } if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } diff --git a/pkg/cmd/roachtest/tests/import.go b/pkg/cmd/roachtest/tests/import.go index 03733b284351..520e2d17caac 100644 --- a/pkg/cmd/roachtest/tests/import.go +++ b/pkg/cmd/roachtest/tests/import.go @@ -123,6 +123,9 @@ func registerImportTPCC(r registry.Registry) { // Upload the perf artifacts to any one of the nodes so that the test // runner copies it into an appropriate directory path. dest := filepath.Join(t.PerfArtifactsDir(), "stats.json") + if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { + log.Errorf(ctx, "failed to create perf dir: %+v", err) + } if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } @@ -260,6 +263,9 @@ func registerImportTPCH(r registry.Registry) { // Upload the perf artifacts to any one of the nodes so that the test // runner copies it into an appropriate directory path. dest := filepath.Join(t.PerfArtifactsDir(), "stats.json") + if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { + log.Errorf(ctx, "failed to create perf dir: %+v", err) + } if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } diff --git a/pkg/cmd/roachtest/tests/restore.go b/pkg/cmd/roachtest/tests/restore.go index c6e791be0254..a507bfca7de2 100644 --- a/pkg/cmd/roachtest/tests/restore.go +++ b/pkg/cmd/roachtest/tests/restore.go @@ -438,6 +438,9 @@ func registerRestore(r registry.Registry) { // Upload the perf artifacts to any one of the nodes so that the test // runner copies it into an appropriate directory path. dest := filepath.Join(t.PerfArtifactsDir(), "stats.json") + if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { + log.Errorf(ctx, "failed to create perf dir: %+v", err) + } if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } From 09de874b54f43c0d83675785cb16880e1d862f8e Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Tue, 27 Jul 2021 11:50:57 -0400 Subject: [PATCH 3/5] roachtest: fix perf stats file mode Release note: None --- pkg/cmd/roachtest/tests/backup.go | 2 +- pkg/cmd/roachtest/tests/import.go | 4 ++-- pkg/cmd/roachtest/tests/restore.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cmd/roachtest/tests/backup.go b/pkg/cmd/roachtest/tests/backup.go index 7b3ae9eac156..96a97a47b360 100644 --- a/pkg/cmd/roachtest/tests/backup.go +++ b/pkg/cmd/roachtest/tests/backup.go @@ -200,7 +200,7 @@ func registerBackup(r registry.Registry) { if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { log.Errorf(ctx, "failed to create perf dir: %+v", err) } - if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { + if err := c.PutString(ctx, perfBuf.String(), dest, 0755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } return nil diff --git a/pkg/cmd/roachtest/tests/import.go b/pkg/cmd/roachtest/tests/import.go index 520e2d17caac..9899236e9949 100644 --- a/pkg/cmd/roachtest/tests/import.go +++ b/pkg/cmd/roachtest/tests/import.go @@ -126,7 +126,7 @@ func registerImportTPCC(r registry.Registry) { if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { log.Errorf(ctx, "failed to create perf dir: %+v", err) } - if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { + if err := c.PutString(ctx, perfBuf.String(), dest, 0755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } return nil @@ -266,7 +266,7 @@ func registerImportTPCH(r registry.Registry) { if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { log.Errorf(ctx, "failed to create perf dir: %+v", err) } - if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { + if err := c.PutString(ctx, perfBuf.String(), dest, 0755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } return nil diff --git a/pkg/cmd/roachtest/tests/restore.go b/pkg/cmd/roachtest/tests/restore.go index a507bfca7de2..9bc47e8c1e4b 100644 --- a/pkg/cmd/roachtest/tests/restore.go +++ b/pkg/cmd/roachtest/tests/restore.go @@ -441,7 +441,7 @@ func registerRestore(r registry.Registry) { if err := c.RunE(ctx, c.Node(1), "mkdir -p "+filepath.Dir(dest)); err != nil { log.Errorf(ctx, "failed to create perf dir: %+v", err) } - if err := c.PutString(ctx, perfBuf.String(), dest, 755, c.Node(1)); err != nil { + if err := c.PutString(ctx, perfBuf.String(), dest, 0755, c.Node(1)); err != nil { log.Errorf(ctx, "failed to upload perf artifacts to node: %s", err.Error()) } return nil From cf62288996fbc7b9d8af5bb39e94bc0ab20d5afa Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Tue, 27 Jul 2021 13:17:43 -0400 Subject: [PATCH 4/5] roachtest: bump import/tpch/nodes=8 timeout to 10h Previously, the roachtest had a timeout of 8h. The test usually runs in ~7hrs but occasionally tips over the configured time out. While we investigate the slowness of this import as tracked in https://github.com/cockroachdb/cockroach/issues/68117, we are bumping the timeout to 10h. Release note: None --- pkg/cmd/roachtest/tests/import.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/roachtest/tests/import.go b/pkg/cmd/roachtest/tests/import.go index 03733b284351..af213f9f63c9 100644 --- a/pkg/cmd/roachtest/tests/import.go +++ b/pkg/cmd/roachtest/tests/import.go @@ -172,8 +172,10 @@ func registerImportTPCH(r registry.Registry) { // is required to confirm this. Until then, the 4 and 32 node configurations // are removed (4 is too slow and 32 is pretty expensive) while 8-node is // given a 50% longer timeout (which running by hand suggests should be OK). - // (10/30/19) The timeout was increased again to 8 hours. - {8, 8 * time.Hour}, + // (07/27/21) The timeout was increased again to 10 hours. The test runs in + // ~7 hours which causes it to occasionally exceed the previous timeout of 8 + // hours. + {8, 10 * time.Hour}, } { item := item r.Add(registry.TestSpec{ From 306f8edada31e3386e0414eb8a69436b67fd11d8 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 27 Jul 2021 10:21:43 -0700 Subject: [PATCH 5/5] rowexec: ask for at least 8MiB in the join reader memory limit The join reader doesn't know how to spill to disk, so previously in some cases (namely, when `distsql_workmem` session variable is low) the queries would error out. Now this is temporarily fixed by requiring the memory limit to be at least 8MiB (to accommodate 4MiB scratch input rows). This shouldn't really matter in the production setting but makes `tpchvec/disk` roachtest happy. Release note: None --- pkg/sql/execinfra/processorsbase.go | 14 -- pkg/sql/execinfra/server_config.go | 8 - pkg/sql/rowexec/BUILD.bazel | 1 - pkg/sql/rowexec/joinreader.go | 15 +- pkg/sql/rowexec/joinreader_test.go | 247 +++++++++++++--------------- 5 files changed, 124 insertions(+), 161 deletions(-) diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 65a7a4383462..0466cf3228f1 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -949,20 +949,6 @@ func NewLimitedMonitor( return limitedMon } -// NewLimitedMonitorNoDiskSpill is a utility function used by processors to -// create a new limited memory monitor with the given name and start it. The -// returned monitor must be closed. The limit is determined by -// SessionData.WorkMemLimit (stored inside of the flowCtx) but overridden to -// ServerConfig.TestingKnobs.MemoryLimitBytes if that knob is set. -// ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function. -func NewLimitedMonitorNoDiskSpill( - ctx context.Context, parent *mon.BytesMonitor, flowCtx *FlowCtx, name string, -) *mon.BytesMonitor { - limitedMon := mon.NewMonitorInheritWithLimit(name, GetWorkMemLimitNoDiskSpill(flowCtx), parent) - limitedMon.Start(ctx, parent, mon.BoundAccount{}) - return limitedMon -} - // NewLimitedMonitorNoFlowCtx is the same as NewLimitedMonitor and should be // used when the caller doesn't have an access to *FlowCtx. func NewLimitedMonitorNoFlowCtx( diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index d9390b53bf5d..381556bcabed 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -268,14 +268,6 @@ func GetWorkMemLimit(flowCtx *FlowCtx) int64 { if flowCtx.Cfg.TestingKnobs.ForceDiskSpill { return 1 } - return GetWorkMemLimitNoDiskSpill(flowCtx) -} - -// GetWorkMemLimitNoDiskSpill returns the number of bytes determining the amount -// of RAM available to a single processor or operator. This function should be -// used instead of GetWorkMemLimit if the processor cannot spill to disk, -// since ServerConfig.TestingKnobs.ForceDiskSpill is ignored by this function. -func GetWorkMemLimitNoDiskSpill(flowCtx *FlowCtx) int64 { if flowCtx.Cfg.TestingKnobs.MemoryLimitBytes != 0 { return flowCtx.Cfg.TestingKnobs.MemoryLimitBytes } diff --git a/pkg/sql/rowexec/BUILD.bazel b/pkg/sql/rowexec/BUILD.bazel index b64b122c916b..cfdd5927f004 100644 --- a/pkg/sql/rowexec/BUILD.bazel +++ b/pkg/sql/rowexec/BUILD.bazel @@ -158,7 +158,6 @@ go_test( "//pkg/sql/rowcontainer", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", - "//pkg/sql/sqlerrors", "//pkg/sql/sqlutil", "//pkg/sql/stats", "//pkg/sql/types", diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 85b1749587c9..e7265db5b8ff 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -373,10 +373,21 @@ func newJoinReader( } } + // We will create a memory monitor with at least 8MiB of memory limit since + // the join reader doesn't know how to spill to disk. It is most likely that + // if the target limit is below 8MiB, then we're in a test scenario and we + // don't want to error out. + const minMemoryLimit = 8 << 20 + memoryLimit := execinfra.GetWorkMemLimit(flowCtx) + if memoryLimit < minMemoryLimit { + memoryLimit = minMemoryLimit + } + // Initialize memory monitors and bound account for data structures in the joinReader. - jr.MemMonitor = execinfra.NewLimitedMonitorNoDiskSpill( - flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, flowCtx, "joinreader-mem", + jr.MemMonitor = mon.NewMonitorInheritWithLimit( + "joinreader-mem" /* name */, memoryLimit, flowCtx.EvalCtx.Mon, ) + jr.MemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) jr.memAcc = jr.MemMonitor.MakeBoundAccount() if err := jr.initJoinReaderStrategy(flowCtx, columnTypes, len(columnIDs), rightCols, readerType); err != nil { diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index 6183250de3bd..e30d8274be75 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -851,155 +850,131 @@ func TestJoinReader(t *testing.T) { // paired joins, so do both. for _, smallBatch := range []bool{true, false} { for _, outputContinuation := range []bool{false, true} { - for _, lowMemory := range []bool{false, true} { - if outputContinuation && c.secondJoinInPairedJoin { - // outputContinuation is for the first join in paired-joins, so - // can't do that when this test case is for the second join in - // paired-joins. - continue + if outputContinuation && c.secondJoinInPairedJoin { + // outputContinuation is for the first join in paired-joins, so + // can't do that when this test case is for the second join in + // paired-joins. + continue + } + if outputContinuation && !reqOrdering { + // The first join in paired-joins must preserve ordering. + continue + } + if outputContinuation && len(c.expectedWithContinuation) == 0 { + continue + } + t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t", + i, reqOrdering, c.description, smallBatch, outputContinuation), func(t *testing.T) { + evalCtx := tree.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) + flowCtx := execinfra.FlowCtx{ + EvalCtx: &evalCtx, + Cfg: &execinfra.ServerConfig{ + Settings: st, + TempStorage: tempEngine, + }, + Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()), + DiskMonitor: diskMonitor, } - if outputContinuation && !reqOrdering { - // The first join in paired-joins must preserve ordering. - continue + encRows := make(rowenc.EncDatumRows, len(c.input)) + for rowIdx, row := range c.input { + encRow := make(rowenc.EncDatumRow, len(row)) + for i, d := range row { + encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d) + } + encRows[rowIdx] = encRow } - if outputContinuation && len(c.expectedWithContinuation) == 0 { - continue + in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{}) + + out := &distsqlutils.RowBuffer{} + post := c.post + if outputContinuation { + post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation) } - if smallBatch && lowMemory { - continue + jr, err := newJoinReader( + &flowCtx, + 0, /* processorID */ + &execinfrapb.JoinReaderSpec{ + Table: *td.TableDesc(), + IndexIdx: c.indexIdx, + LookupColumns: c.lookupCols, + LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr}, + RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr}, + OnExpr: execinfrapb.Expression{Expr: c.onExpr}, + Type: c.joinType, + MaintainOrdering: reqOrdering, + LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin, + OutputGroupContinuationForLeftRow: outputContinuation, + }, + in, + &post, + out, + lookupJoinReaderType, + ) + if err != nil { + t.Fatal(err) } - t.Run(fmt.Sprintf("%d/reqOrdering=%t/%s/smallBatch=%t/cont=%t/lowMem=%t", - i, reqOrdering, c.description, smallBatch, outputContinuation, lowMemory), func(t *testing.T) { - evalCtx := tree.MakeTestingEvalContext(st) - defer evalCtx.Stop(ctx) - flowCtx := execinfra.FlowCtx{ - EvalCtx: &evalCtx, - Cfg: &execinfra.ServerConfig{ - Settings: st, - TempStorage: tempEngine, - }, - Txn: kv.NewTxn(ctx, s.DB(), s.NodeID()), - DiskMonitor: diskMonitor, - } - encRows := make(rowenc.EncDatumRows, len(c.input)) - for rowIdx, row := range c.input { - encRow := make(rowenc.EncDatumRow, len(row)) - for i, d := range row { - encRow[i] = rowenc.DatumToEncDatum(c.inputTypes[i], d) - } - encRows[rowIdx] = encRow - } - in := distsqlutils.NewRowBuffer(c.inputTypes, encRows, distsqlutils.RowBufferArgs{}) - - if lowMemory { - flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = int64(encRows[0].Size() * 2) - } - out := &distsqlutils.RowBuffer{} - post := c.post - if outputContinuation { - post.OutputColumns = append(post.OutputColumns, c.outputColumnForContinuation) - } - jr, err := newJoinReader( - &flowCtx, - 0, /* processorID */ - &execinfrapb.JoinReaderSpec{ - Table: *td.TableDesc(), - IndexIdx: c.indexIdx, - LookupColumns: c.lookupCols, - LookupExpr: execinfrapb.Expression{Expr: c.lookupExpr}, - RemoteLookupExpr: execinfrapb.Expression{Expr: c.remoteLookupExpr}, - OnExpr: execinfrapb.Expression{Expr: c.onExpr}, - Type: c.joinType, - MaintainOrdering: reqOrdering, - LeftJoinWithPairedJoiner: c.secondJoinInPairedJoin, - OutputGroupContinuationForLeftRow: outputContinuation, - }, - in, - &post, - out, - lookupJoinReaderType, - ) - if err != nil { - t.Fatal(err) - } - - if smallBatch { - // Set a lower batch size to force multiple batches. - jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2)) - } - // Else, use the default. + if smallBatch { + // Set a lower batch size to force multiple batches. + jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 2)) + } + // Else, use the default. - jr.Run(ctx) + jr.Run(ctx) - if !in.Done { - t.Fatal("joinReader didn't consume all the rows") - } - if !out.ProducerClosed() { - t.Fatalf("output RowReceiver not closed") - } + if !in.Done { + t.Fatal("joinReader didn't consume all the rows") + } + if !out.ProducerClosed() { + t.Fatalf("output RowReceiver not closed") + } - var res rowenc.EncDatumRows - var gotOutOfMemoryError bool - for { - row, meta := out.Next() - if meta != nil { - if lowMemory && meta.Err != nil { - if !sqlerrors.IsOutOfMemoryError(meta.Err) { - t.Fatalf("unexpected metadata %+v", meta) - } - gotOutOfMemoryError = true - } else if meta.Metrics == nil { - t.Fatalf("unexpected metadata %+v", meta) - } - } - if row == nil { - break - } - res = append(res, row) + var res rowenc.EncDatumRows + for { + row, meta := out.Next() + if meta != nil && meta.Metrics == nil { + t.Fatalf("unexpected metadata %+v", meta) } - - if lowMemory { - if gotOutOfMemoryError { - return - } - t.Fatal("expected out of memory error but it did not occur") + if row == nil { + break } + res = append(res, row) + } - // processOutputRows is a helper function that takes a stringified - // EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of - // stringified rows without brackets (e.g. []string{"1 2", "3 1"}). - processOutputRows := func(output string) []string { - // Comma-separate the rows. - output = strings.ReplaceAll(output, "] [", ",") - // Remove leading and trailing bracket. - output = strings.Trim(output, "[]") - // Split on the commas that were introduced and return that. - return strings.Split(output, ",") - } + // processOutputRows is a helper function that takes a stringified + // EncDatumRows output (e.g. [[1 2] [3 1]]) and returns a slice of + // stringified rows without brackets (e.g. []string{"1 2", "3 1"}). + processOutputRows := func(output string) []string { + // Comma-separate the rows. + output = strings.ReplaceAll(output, "] [", ",") + // Remove leading and trailing bracket. + output = strings.Trim(output, "[]") + // Split on the commas that were introduced and return that. + return strings.Split(output, ",") + } - outputTypes := c.outputTypes - if outputContinuation { - outputTypes = append(outputTypes, types.Bool) - } - result := processOutputRows(res.String(outputTypes)) - var expected []string - if outputContinuation { - expected = processOutputRows(c.expectedWithContinuation) - } else { - expected = processOutputRows(c.expected) - } + outputTypes := c.outputTypes + if outputContinuation { + outputTypes = append(outputTypes, types.Bool) + } + result := processOutputRows(res.String(outputTypes)) + var expected []string + if outputContinuation { + expected = processOutputRows(c.expectedWithContinuation) + } else { + expected = processOutputRows(c.expected) + } - if !reqOrdering { - // An ordering was not required, so sort both the result and - // expected slice to reuse equality comparison. - sort.Strings(result) - sort.Strings(expected) - } + if !reqOrdering { + // An ordering was not required, so sort both the result and + // expected slice to reuse equality comparison. + sort.Strings(result) + sort.Strings(expected) + } - require.Equal(t, expected, result) - }) - } + require.Equal(t, expected, result) + }) } } }