diff --git a/pkg/ccl/importccl/exportcsv_test.go b/pkg/ccl/importccl/exportcsv_test.go index 7d07ef9d4520..c3aff90159d0 100644 --- a/pkg/ccl/importccl/exportcsv_test.go +++ b/pkg/ccl/importccl/exportcsv_test.go @@ -9,12 +9,12 @@ package importccl_test import ( + "bytes" "compress/gzip" "context" "fmt" "io" "io/ioutil" - "os" "path/filepath" "strings" "testing" @@ -186,6 +186,18 @@ func TestExportJoin(t *testing.T) { sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/join' FROM SELECT * FROM t, t as u`) } +func readFileByGlob(t *testing.T, pattern string) []byte { + paths, err := filepath.Glob(pattern) + require.NoError(t, err) + + require.Equal(t, 1, len(paths)) + + result, err := ioutil.ReadFile(paths[0]) + require.NoError(t, err) + + return result +} + func TestExportOrder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -196,30 +208,58 @@ func TestExportOrder(t *testing.T) { defer srv.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`) - sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`) + sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`) + + sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' FROM SELECT * FROM foo ORDER BY y ASC LIMIT 2`) + content := readFileByGlob(t, filepath.Join(dir, "order", "export*-n1.0.csv")) - sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' from select * from foo order by y asc limit 2`) - content, err := ioutil.ReadFile(filepath.Join(dir, "order", "n1.0.csv")) - if err != nil { - t.Fatal(err) - } if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got { t.Fatalf("expected %q, got %q", expected, got) } } +func TestExportUniqueness(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + dir, cleanupDir := testutils.TempDir(t) + defer cleanupDir() + + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir}) + defer srv.Stopper().Stop(context.Background()) + sqlDB := sqlutils.MakeSQLRunner(db) + + sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`) + + const stmt = `EXPORT INTO CSV 'nodelocal://0/' WITH chunk_rows=$1 FROM SELECT * FROM foo` + + sqlDB.Exec(t, stmt, 2) + dir1, err := ioutil.ReadDir(dir) + require.NoError(t, err) + + sqlDB.Exec(t, stmt, 2) + dir2, err := ioutil.ReadDir(dir) + require.NoError(t, err) + + require.Equal(t, 2*len(dir1), len(dir2), "second export did not double the number of files") +} + func TestExportUserDefinedTypes(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() - baseDir, cleanup := testutils.TempDir(t) + dir, cleanup := testutils.TempDir(t) defer cleanup() + tc := testcluster.StartTestCluster( - t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}}) + t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: dir}}) defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] sqlDB := sqlutils.MakeSQLRunner(conn) + // Set up some initial state for the tests. sqlDB.Exec(t, ` SET experimental_enable_enums = true; @@ -232,23 +272,27 @@ INSERT INTO greeting_table VALUES ('hello', 'hello'), ('hi', 'hi'); expected string }{ { - stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM (SELECT 'hello':::greeting, 'hi':::greeting)", + stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM (SELECT 'hello':::greeting, 'hi':::greeting)", expected: "hello,hi\n", }, { - stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM TABLE greeting_table", + stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM TABLE greeting_table", expected: "hello,hello\nhi,hi\n", }, { - stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM (SELECT x, y, enum_first(x) FROM greeting_table)", + stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM (SELECT x, y, enum_first(x) FROM greeting_table)", expected: "hello,hello,hello\nhi,hi,hello\n", }, } - for _, test := range tests { - sqlDB.Exec(t, test.stmt) + for i, test := range tests { + path := fmt.Sprintf("test%d", i) + stmt := fmt.Sprintf(test.stmt, path) + + sqlDB.Exec(t, stmt) + // Read the dumped file. - contents, err := ioutil.ReadFile(filepath.Join(baseDir, "test", "n1.0.csv")) - require.NoError(t, err) + contents := readFileByGlob(t, filepath.Join(dir, path, "export*-n1.0.csv")) + require.Equal(t, test.expected, string(contents)) } } @@ -269,28 +313,19 @@ func TestExportOrderCompressed(t *testing.T) { defer srv.Stopper().Stop(context.Background()) sqlDB := sqlutils.MakeSQLRunner(db) - sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`) - sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`) + sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`) sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' with compression = gzip from select * from foo order by y asc limit 2`) - fi, err := os.Open(filepath.Join(dir, "order", "n1.0.csv.gz")) - defer close(fi) - - if err != nil { - t.Fatal(err) - } + compressed := readFileByGlob(t, filepath.Join(dir, "order", "export*-n1.0.csv.gz")) - gzipReader, err := gzip.NewReader(fi) + gzipReader, err := gzip.NewReader(bytes.NewReader(compressed)) defer close(gzipReader) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) content, err := ioutil.ReadAll(gzipReader) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got { t.Fatalf("expected %q, got %q", expected, got) @@ -300,18 +335,18 @@ func TestExportOrderCompressed(t *testing.T) { func TestExportShow(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + dir, cleanupDir := testutils.TempDir(t) defer cleanupDir() srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir}) defer srv.Stopper().Stop(context.Background()) + sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/show' FROM SELECT * FROM [SHOW DATABASES] ORDER BY database_name`) - content, err := ioutil.ReadFile(filepath.Join(dir, "show", "n1.0.csv")) - if err != nil { - t.Fatal(err) - } + content := readFileByGlob(t, filepath.Join(dir, "show", "export*-n1.0.csv")) + if expected, got := "defaultdb\npostgres\nsystem\n", string(content); expected != got { t.Fatalf("expected %q, got %q", expected, got) } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index f0fa69d3cdee..5175d16bbb96 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -1868,12 +1868,15 @@ IMPORT TABLE import_with_db_privs (a INT8 PRIMARY KEY, b STRING) CSV DATA (%s)`, func TestExportImportRoundTrip(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() baseDir, cleanup := testutils.TempDir(t) defer cleanup() + tc := testcluster.StartTestCluster( t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}}) defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] sqlDB := sqlutils.MakeSQLRunner(conn) @@ -1887,19 +1890,19 @@ func TestExportImportRoundTrip(t *testing.T) { // with a unique directory name per run. { stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT ARRAY['a', 'b', 'c']; - IMPORT TABLE t (x TEXT[]) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`, + IMPORT TABLE t (x TEXT[]) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`, tbl: "t", expected: `SELECT ARRAY['a', 'b', 'c']`, }, { stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT ARRAY[b'abc', b'\141\142\143', b'\x61\x62\x63']; - IMPORT TABLE t (x BYTES[]) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`, + IMPORT TABLE t (x BYTES[]) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`, tbl: "t", expected: `SELECT ARRAY[b'abc', b'\141\142\143', b'\x61\x62\x63']`, }, { stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT 'dog' COLLATE en; - IMPORT TABLE t (x STRING COLLATE en) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`, + IMPORT TABLE t (x STRING COLLATE en) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`, tbl: "t", expected: `SELECT 'dog' COLLATE en`, }, diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 76dd0a2e4459..05d3a4f6fb84 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -3443,10 +3443,10 @@ func (dsp *DistSQLPlanner) createPlanForExport( } core := execinfrapb.ProcessorCoreUnion{CSVWriter: &execinfrapb.CSVWriterSpec{ - Destination: n.fileName, - NamePattern: exportFilePatternDefault, + Destination: n.destination, + NamePattern: n.fileNamePattern, Options: n.csvOpts, - ChunkRows: int64(n.chunkSize), + ChunkRows: int64(n.chunkRows), CompressionCodec: n.fileCompression, }} diff --git a/pkg/sql/export.go b/pkg/sql/export.go index bfde4172fd44..418f7e9629de 100644 --- a/pkg/sql/export.go +++ b/pkg/sql/export.go @@ -12,6 +12,7 @@ package sql import ( "context" + "fmt" "strconv" "strings" @@ -30,9 +31,14 @@ type exportNode struct { source planNode - fileName string + // destination represents the destination URI for the export, + // typically a directory + destination string + // fileNamePattern represents the file naming pattern for the + // export, typically to be appended to the destination URI + fileNamePattern string csvOpts roachpb.CSVOptions - chunkSize int + chunkRows int fileCompression execinfrapb.FileCompression } @@ -55,20 +61,20 @@ func (e *exportNode) Close(ctx context.Context) { const ( exportOptionDelimiter = "delimiter" exportOptionNullAs = "nullas" - exportOptionChunkSize = "chunk_rows" + exportOptionChunkRows = "chunk_rows" exportOptionFileName = "filename" exportOptionCompression = "compression" ) var exportOptionExpectValues = map[string]KVStringOptValidate{ - exportOptionChunkSize: KVStringOptRequireValue, + exportOptionChunkRows: KVStringOptRequireValue, exportOptionDelimiter: KVStringOptRequireValue, exportOptionFileName: KVStringOptRequireValue, exportOptionNullAs: KVStringOptRequireValue, exportOptionCompression: KVStringOptRequireValue, } -const exportChunkSizeDefault = 100000 +const exportChunkRowsDefault = 100000 const exportFilePatternPart = "%part%" const exportFilePatternDefault = exportFilePatternPart + ".csv" const exportCompressionCodec = "gzip" @@ -85,11 +91,12 @@ func (ef *execFactory) ConstructExport( return nil, errors.Errorf("unsupported export format: %q", fileFormat) } - fileNameDatum, err := fileName.Eval(ef.planner.EvalContext()) + destinationDatum, err := fileName.Eval(ef.planner.EvalContext()) if err != nil { return nil, err } - fileNameStr, ok := fileNameDatum.(*tree.DString) + + destination, ok := destinationDatum.(*tree.DString) if !ok { return nil, errors.Errorf("expected string value for the file location") } @@ -112,13 +119,13 @@ func (ef *execFactory) ConstructExport( csvOpts.NullEncoding = &override } - chunkSize := exportChunkSizeDefault - if override, ok := optVals[exportOptionChunkSize]; ok { - chunkSize, err = strconv.Atoi(override) + chunkRows := exportChunkRowsDefault + if override, ok := optVals[exportOptionChunkRows]; ok { + chunkRows, err = strconv.Atoi(override) if err != nil { return nil, pgerror.WithCandidateCode(err, pgcode.InvalidParameterValue) } - if chunkSize < 1 { + if chunkRows < 1 { return nil, pgerror.New(pgcode.InvalidParameterValue, "invalid csv chunk size") } } @@ -135,11 +142,15 @@ func (ef *execFactory) ConstructExport( } } + exportID := ef.planner.stmt.queryID.String() + namePattern := fmt.Sprintf("export%s-%s", exportID, exportFilePatternDefault) + return &exportNode{ source: input.(planNode), - fileName: string(*fileNameStr), + destination: string(*destination), + fileNamePattern: namePattern, csvOpts: csvOpts, - chunkSize: chunkSize, + chunkRows: chunkRows, fileCompression: codec, }, nil } diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index 4ad7d7247c24..b1eafbc9d304 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -804,7 +804,7 @@ func (v *planVisitor) visitInternal(plan planNode, name string) { case *exportNode: if v.observer.attr != nil { - v.observer.attr(name, "destination", n.fileName) + v.observer.attr(name, "destination", n.destination) } n.source = v.visit(n.source) }