Skip to content

Commit

Permalink
sql: unit test, benchmark and logictests for COPY
Browse files Browse the repository at this point in the history
As a new/half supported feature COPY FROM STDIN support is under tested.
Add tests and a simple benchmark to pave the way for enhancements and
optimizations.

Assists: #81731

Release note: None

Release justification: non-production code change
  • Loading branch information
cucaroach committed Aug 16, 2022
1 parent 33b343d commit b2156f8
Show file tree
Hide file tree
Showing 19 changed files with 1,037 additions and 129 deletions.
13 changes: 10 additions & 3 deletions pkg/cmd/roachtest/tests/copyfrom.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,19 @@ func runTest(ctx context.Context, t test.Test, c cluster.Cluster, pg string) {
_, err = fmt.Sscan(det.Stdout, &copy, &rows)
require.NoError(t, err)
rate := int(float64(rows) / dur.Seconds())
t.L().Printf("results: %d rows/s", rate)

det, err = c.RunWithDetailsSingleNode(ctx, t.L(), c.Node(1), "wc -c /tmp/lineitem-table.csv")
require.NoError(t, err)
var bytes float64
_, err = fmt.Sscan(det.Stdout, &bytes)
require.NoError(t, err)
dataRate := bytes / 1024 / 1024 / dur.Seconds()
t.L().Printf("results: %d rows/s, %f mb/s", rate, dataRate)
// Write the copy rate into the stats.json file to be used by roachperf.
c.Run(ctx, c.Node(1), "mkdir", t.PerfArtifactsDir())
cmd := fmt.Sprintf(
`echo '{ "copy_rate": %d }' > %s/stats.json`,
rate, t.PerfArtifactsDir(),
`echo '{ "copy_row_rate": %d, "copy_data_rate": %f}' > %s/stats.json`,
rate, dataRate, t.PerfArtifactsDir(),
)
c.Run(ctx, c.Node(1), cmd)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"control_schedules.go",
"copy.go",
"copy_file_upload.go",
"copyshim.go",
"crdb_internal.go",
"create_database.go",
"create_extension.go",
Expand Down Expand Up @@ -497,6 +498,7 @@ go_library(
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_jackc_pgproto3_v2//:pgproto3",
"@com_github_lib_pq//:pq",
"@com_github_lib_pq//oid",
"@com_github_prometheus_client_model//go",
Expand Down Expand Up @@ -532,6 +534,7 @@ go_test(
"conn_executor_test.go",
"conn_io_test.go",
"constraint_test.go",
"copy_from_test.go",
"copy_in_test.go",
"copy_test.go",
"crdb_internal_test.go",
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2335,7 +2335,7 @@ func isCopyToExternalStorage(cmd CopyIn) bool {
// We handle the CopyFrom statement by creating a copyMachine and handing it
// control over the connection until the copying is done. The contract is that,
// when this is called, the pgwire.conn is not reading from the network
// connection any more until this returns. The copyMachine will to the reading
// connection any more until this returns. The copyMachine will do the reading
// and writing up to the CommandComplete message.
func (ex *connExecutor) execCopyIn(
ctx context.Context, cmd CopyIn,
Expand Down Expand Up @@ -2415,8 +2415,9 @@ func (ex *connExecutor) execCopyIn(
if isCopyToExternalStorage(cmd) {
cm, err = newFileUploadMachine(ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg)
} else {
p := planner{execCfg: ex.server.cfg, alloc: &tree.DatumAlloc{}}
cm, err = newCopyMachine(
ctx, cmd.Conn, cmd.Stmt, txnOpt, ex.server.cfg,
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt,
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
Expand Down
40 changes: 4 additions & 36 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,8 @@ func (ex *connExecutor) rollbackSQLTransaction(
// dispatchToExecutionEngine executes the statement, writes the result to res
// and returns an event for the connection's state machine.
//
// If an error is returned, the connection needs to stop processing queries.
// Query execution errors are written to res; they are not returned; it is
// If an error is returned, the connection needs to stop processing queries.`
// Query execution errors are written to res; they are not returned; it is`
// expected that the caller will inspect res and react to query errors by
// producing an appropriate state machine event.
func (ex *connExecutor) dispatchToExecutionEngine(
Expand Down Expand Up @@ -1525,40 +1525,8 @@ func (ex *connExecutor) execWithDistSQLEngine(
return &factoryEvalCtx
}
}

if len(planner.curPlan.subqueryPlans) != 0 {
// Create a separate memory account for the results of the subqueries.
// Note that we intentionally defer the closure of the account until we
// return from this method (after the main query is executed).
subqueryResultMemAcc := planner.EvalContext().Mon.MakeBoundAccount()
defer subqueryResultMemAcc.Close(ctx)
if !ex.server.cfg.DistSQLPlanner.PlanAndRunSubqueries(
ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, &subqueryResultMemAcc,
// Skip the diagram generation since on this "main" query path we
// can get it via the statement bundle.
true, /* skipDistSQLDiagramGeneration */
) {
return *recv.stats, recv.commErr
}
}
recv.discardRows = planner.instrumentation.ShouldDiscardRows()
// We pass in whether or not we wanted to distribute this plan, which tells
// the planner whether or not to plan remote table readers.
cleanup := ex.server.cfg.DistSQLPlanner.PlanAndRun(
ctx, evalCtx, planCtx, planner.txn, planner.curPlan.main, recv,
)
// Note that we're not cleaning up right away because postqueries might
// need to have access to the main query tree.
defer cleanup()
if recv.commErr != nil || res.Err() != nil {
return *recv.stats, recv.commErr
}

ex.server.cfg.DistSQLPlanner.PlanAndRunCascadesAndChecks(
ctx, planner, evalCtxFactory, &planner.curPlan.planComponents, recv,
)

return *recv.stats, recv.commErr
err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
return *recv.stats, err
}

// beginTransactionTimestampsAndReadMode computes the timestamps and
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type copyMachine struct {

// p is the planner used to plan inserts. preparePlanner() needs to be called
// before preparing each new statement.
p planner
p *planner

// parsingEvalCtx is an EvalContext used for the very limited needs to strings
// parsing. Is it not correctly initialized with timestamps, transactions and
Expand All @@ -115,8 +115,8 @@ func newCopyMachine(
ctx context.Context,
conn pgwirebase.Conn,
n *tree.CopyFrom,
p *planner,
txnOpt copyTxnOpt,
execCfg *ExecutorConfig,
execInsertPlan func(ctx context.Context, p *planner, res RestrictedCommandResult) error,
) (_ *copyMachine, retErr error) {
c := &copyMachine{
Expand All @@ -129,7 +129,7 @@ func newCopyMachine(
txnOpt: txnOpt,
csvExpectHeader: n.Options.Header,
// The planner will be prepared before use.
p: planner{execCfg: execCfg, alloc: &tree.DatumAlloc{}},
p: p,
execInsertPlan: execInsertPlan,
}

Expand Down Expand Up @@ -213,7 +213,7 @@ func newCopyMachine(
}

flags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc)
_, tableDesc, err := resolver.ResolveExistingTableObject(ctx, &c.p, &n.Table, flags)
_, tableDesc, err := resolver.ResolveExistingTableObject(ctx, c.p, &n.Table, flags)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -755,7 +755,7 @@ func (c *copyMachine) insertRows(ctx context.Context) (retErr error) {
}

var res streamingCommandResult
err := c.execInsertPlan(ctx, &c.p, &res)
err := c.execInsertPlan(ctx, c.p, &res)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func newFileUploadMachine(
c := &copyMachine{
conn: conn,
// The planner will be prepared before use.
p: planner{execCfg: execCfg, alloc: &tree.DatumAlloc{}},
p: &planner{execCfg: execCfg, alloc: &tree.DatumAlloc{}},
}
f = &fileUploadMachine{
c: c,
Expand Down
164 changes: 164 additions & 0 deletions pkg/sql/copy_from_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql_test

import (
"context"
"fmt"
"runtime/pprof"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

var lineitemSchema string = `CREATE DATABASE c; CREATE TABLE c.lineitem (
l_orderkey INT8 NOT NULL,
l_partkey INT8 NOT NULL,
l_suppkey INT8 NOT NULL,
l_linenumber INT8 NOT NULL,
l_quantity DECIMAL(15,2) NOT NULL,
l_extendedprice DECIMAL(15,2) NOT NULL,
l_discount DECIMAL(15,2) NOT NULL,
l_tax DECIMAL(15,2) NOT NULL,
l_returnflag CHAR(1) NOT NULL,
l_linestatus CHAR(1) NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct CHAR(25) NOT NULL,
l_shipmode CHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL,
PRIMARY KEY (l_orderkey, l_linenumber),
INDEX l_ok (l_orderkey ASC),
INDEX l_pk (l_partkey ASC),
INDEX l_sk (l_suppkey ASC),
INDEX l_sd (l_shipdate ASC),
INDEX l_cd (l_commitdate ASC),
INDEX l_rd (l_receiptdate ASC),
INDEX l_pk_sk (l_partkey ASC, l_suppkey ASC),
INDEX l_sk_pk (l_suppkey ASC, l_partkey ASC)
)`

const csvData = `%d|155190|7706|1|17|21168.23|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the
`

// TestCopyFrom is a simple test to verify RunCopyFrom works for benchmarking
// purposes.
func TestCopyFrom(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

s, conn, _ := serverutils.StartServer(t, base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettings(),
})
defer s.Stopper().Stop(ctx)

r := sqlutils.MakeSQLRunner(conn)
r.Exec(t, lineitemSchema)
rows := []string{fmt.Sprintf(csvData, 1), fmt.Sprintf(csvData, 2)}
numrows, err := sql.RunCopyFrom(ctx, s, "c", nil, "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", rows)
require.Equal(t, 2, numrows)
require.NoError(t, err)

partKey := 0
r.QueryRow(t, "SELECT l_partkey FROM c.lineitem WHERE l_orderkey = 1").Scan(&partKey)
require.Equal(t, 155190, partKey)
}

// BenchmarkCopy measures copy performance against a TestServer.
func BenchmarkCopyFrom(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)
ctx := context.Background()

s, conn, _ := serverutils.StartServer(b, base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettings(),
})
defer s.Stopper().Stop(ctx)

r := sqlutils.MakeSQLRunner(conn)
r.Exec(b, lineitemSchema)

// send data in 5 batches of 10k rows
ROWS := 50000
datalen := 0
var rows []string
for i := 0; i < ROWS; i++ {
row := fmt.Sprintf(csvData, i)
rows = append(rows, row)
datalen += len(row)
}
start := timeutil.Now()
pprof.Do(ctx, pprof.Labels("run", "copy"), func(ctx context.Context) {
rows, err := sql.RunCopyFrom(ctx, s, "c", nil, "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", rows)
require.NoError(b, err)
require.Equal(b, ROWS, rows)
})
duration := timeutil.Since(start)
b.ReportMetric(float64(datalen)/(1024*1024)/duration.Seconds(), "mb/s")
b.ReportMetric(float64(ROWS)/duration.Seconds(), "rows/s")
}

func BenchmarkParallelCopyFrom(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)
ctx := context.Background()

s, conn, _ := serverutils.StartServer(b, base.TestServerArgs{
Settings: cluster.MakeTestingClusterSettings(),
})
defer s.Stopper().Stop(ctx)

r := sqlutils.MakeSQLRunner(conn)
r.Exec(b, lineitemSchema)
ROWS := 50000
datalen := 0
THREADS := 10
var allrows [][]string

chunk := ROWS / THREADS
for j := 0; j < THREADS; j++ {
var rows []string
for i := 0; i < chunk; i++ {
row := fmt.Sprintf(csvData, i+j*chunk)
rows = append(rows, row)
datalen += len(row)
}
allrows = append(allrows, rows)
}

start := timeutil.Now()
var wg sync.WaitGroup

for j := 0; j < THREADS; j++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
count, err := sql.RunCopyFrom(ctx, s, "c", nil, "COPY lineitem FROM STDIN WITH CSV DELIMITER '|';", allrows[j])
require.NoError(b, err)
require.Equal(b, chunk, count)
}(j)
}
wg.Wait()
duration := timeutil.Since(start)
b.ReportMetric(float64(datalen)/(1024*1024)/duration.Seconds(), "mb/s")
b.ReportMetric(float64(ROWS)/duration.Seconds(), "rows/s")
}
Loading

0 comments on commit b2156f8

Please sign in to comment.