Skip to content

Commit

Permalink
export/csv: prepend a unique id to file name of csv exports
Browse files Browse the repository at this point in the history
Addresses #50580

When exporting CSV files, a failed & repeated export will use the same
file names. This can result in an error if an overwrite is disallowed,
but more importantly, it can result in a directory containing files
from different export runs. This can represent inconsistent data if the
previous export was not deleted.

This commit prepends a unique export ID to the file name. The ID
is the queryID from the sql.Statement struct. This choice seems to
guarantee the most uniqueness. It's 128 bits, and encoded as hex.

Other ID options considered were random int, timestamp, and
transaction ID. Each of these has a small risk of collision.

Related tests were updated and are (should be) passing.

Some formatting and renaming for clarity; `exportNode.fileName` is now
`exportNode.destination`. `chunkSize` is now `chunkRows`.

Release note (enterprise change): Exported CSV files are now prepended
with a long unique ID. This can help to mitigate situations where
multiple export runs are written to the same directory, resulting in
mixed data. This change does not prevent mixed data; rather, it makes
it possible to identify files from distinct runs, so that an operator
can clean up.
  • Loading branch information
shermanCRL committed Aug 10, 2020
1 parent fdf54eb commit 59dae5d
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 56 deletions.
107 changes: 71 additions & 36 deletions pkg/ccl/importccl/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
package importccl_test

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -186,6 +186,18 @@ func TestExportJoin(t *testing.T) {
sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/join' FROM SELECT * FROM t, t as u`)
}

func readFileByGlob(t *testing.T, pattern string) []byte {
paths, err := filepath.Glob(pattern)
require.NoError(t, err)

require.Equal(t, 1, len(paths))

result, err := ioutil.ReadFile(paths[0])
require.NoError(t, err)

return result
}

func TestExportOrder(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -196,30 +208,58 @@ func TestExportOrder(t *testing.T) {
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`)
sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' FROM SELECT * FROM foo ORDER BY y ASC LIMIT 2`)
content := readFileByGlob(t, filepath.Join(dir, "order", "export*-n1.0.csv"))

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' from select * from foo order by y asc limit 2`)
content, err := ioutil.ReadFile(filepath.Join(dir, "order", "n1.0.csv"))
if err != nil {
t.Fatal(err)
}
if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
}
}

func TestExportUniqueness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
dir, cleanupDir := testutils.TempDir(t)
defer cleanupDir()

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir})
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

const stmt = `EXPORT INTO CSV 'nodelocal://0/' WITH chunk_rows=$1 FROM SELECT * FROM foo`

sqlDB.Exec(t, stmt, 2)
dir1, err := ioutil.ReadDir(dir)
require.NoError(t, err)

sqlDB.Exec(t, stmt, 2)
dir2, err := ioutil.ReadDir(dir)
require.NoError(t, err)

require.Equal(t, 2*len(dir1), len(dir2), "second export did not double the number of files")
}

func TestExportUserDefinedTypes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir, cleanup := testutils.TempDir(t)
dir, cleanup := testutils.TempDir(t)
defer cleanup()

tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: dir}})
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

// Set up some initial state for the tests.
sqlDB.Exec(t, `
SET experimental_enable_enums = true;
Expand All @@ -232,23 +272,27 @@ INSERT INTO greeting_table VALUES ('hello', 'hello'), ('hi', 'hi');
expected string
}{
{
stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM (SELECT 'hello':::greeting, 'hi':::greeting)",
stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM (SELECT 'hello':::greeting, 'hi':::greeting)",
expected: "hello,hi\n",
},
{
stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM TABLE greeting_table",
stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM TABLE greeting_table",
expected: "hello,hello\nhi,hi\n",
},
{
stmt: "EXPORT INTO CSV 'nodelocal://0/test/' FROM (SELECT x, y, enum_first(x) FROM greeting_table)",
stmt: "EXPORT INTO CSV 'nodelocal://0/%s/' FROM (SELECT x, y, enum_first(x) FROM greeting_table)",
expected: "hello,hello,hello\nhi,hi,hello\n",
},
}
for _, test := range tests {
sqlDB.Exec(t, test.stmt)
for i, test := range tests {
path := fmt.Sprintf("test%d", i)
stmt := fmt.Sprintf(test.stmt, path)

sqlDB.Exec(t, stmt)

// Read the dumped file.
contents, err := ioutil.ReadFile(filepath.Join(baseDir, "test", "n1.0.csv"))
require.NoError(t, err)
contents := readFileByGlob(t, filepath.Join(dir, path, "export*-n1.0.csv"))

require.Equal(t, test.expected, string(contents))
}
}
Expand All @@ -269,28 +313,19 @@ func TestExportOrderCompressed(t *testing.T) {
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`)
sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)
sqlDB.Exec(t, `CREATE TABLE foo (i INT PRIMARY KEY, x INT, y INT, z INT, INDEX (y))`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' with compression = gzip from select * from foo order by y asc limit 2`)
fi, err := os.Open(filepath.Join(dir, "order", "n1.0.csv.gz"))
defer close(fi)

if err != nil {
t.Fatal(err)
}
compressed := readFileByGlob(t, filepath.Join(dir, "order", "export*-n1.0.csv.gz"))

gzipReader, err := gzip.NewReader(fi)
gzipReader, err := gzip.NewReader(bytes.NewReader(compressed))
defer close(gzipReader)

if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

content, err := ioutil.ReadAll(gzipReader)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
Expand All @@ -300,18 +335,18 @@ func TestExportOrderCompressed(t *testing.T) {
func TestExportShow(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dir, cleanupDir := testutils.TempDir(t)
defer cleanupDir()

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir})
defer srv.Stopper().Stop(context.Background())

sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/show' FROM SELECT * FROM [SHOW DATABASES] ORDER BY database_name`)
content, err := ioutil.ReadFile(filepath.Join(dir, "show", "n1.0.csv"))
if err != nil {
t.Fatal(err)
}
content := readFileByGlob(t, filepath.Join(dir, "show", "export*-n1.0.csv"))

if expected, got := "defaultdb\npostgres\nsystem\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1868,12 +1868,15 @@ IMPORT TABLE import_with_db_privs (a INT8 PRIMARY KEY, b STRING) CSV DATA (%s)`,
func TestExportImportRoundTrip(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir, cleanup := testutils.TempDir(t)
defer cleanup()

tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ExternalIODir: baseDir}})
defer tc.Stopper().Stop(ctx)

conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

Expand All @@ -1887,19 +1890,19 @@ func TestExportImportRoundTrip(t *testing.T) {
// with a unique directory name per run.
{
stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT ARRAY['a', 'b', 'c'];
IMPORT TABLE t (x TEXT[]) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`,
IMPORT TABLE t (x TEXT[]) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`,
tbl: "t",
expected: `SELECT ARRAY['a', 'b', 'c']`,
},
{
stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT ARRAY[b'abc', b'\141\142\143', b'\x61\x62\x63'];
IMPORT TABLE t (x BYTES[]) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`,
IMPORT TABLE t (x BYTES[]) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`,
tbl: "t",
expected: `SELECT ARRAY[b'abc', b'\141\142\143', b'\x61\x62\x63']`,
},
{
stmts: `EXPORT INTO CSV 'nodelocal://0/%[1]s' FROM SELECT 'dog' COLLATE en;
IMPORT TABLE t (x STRING COLLATE en) CSV DATA ('nodelocal://0/%[1]s/n1.0.csv')`,
IMPORT TABLE t (x STRING COLLATE en) CSV DATA ('nodelocal://0/%[1]s/export*-n1.0.csv')`,
tbl: "t",
expected: `SELECT 'dog' COLLATE en`,
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3443,10 +3443,10 @@ func (dsp *DistSQLPlanner) createPlanForExport(
}

core := execinfrapb.ProcessorCoreUnion{CSVWriter: &execinfrapb.CSVWriterSpec{
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Destination: n.destination,
NamePattern: n.fileNamePattern,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
ChunkRows: int64(n.chunkRows),
CompressionCodec: n.fileCompression,
}}

Expand Down
37 changes: 24 additions & 13 deletions pkg/sql/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sql

import (
"context"
"fmt"
"strconv"
"strings"

Expand All @@ -30,9 +31,14 @@ type exportNode struct {

source planNode

fileName string
// destination represents the destination URI for the export,
// typically a directory
destination string
// fileNamePattern represents the file naming pattern for the
// export, typically to be appended to the destination URI
fileNamePattern string
csvOpts roachpb.CSVOptions
chunkSize int
chunkRows int
fileCompression execinfrapb.FileCompression
}

Expand All @@ -55,20 +61,20 @@ func (e *exportNode) Close(ctx context.Context) {
const (
exportOptionDelimiter = "delimiter"
exportOptionNullAs = "nullas"
exportOptionChunkSize = "chunk_rows"
exportOptionChunkRows = "chunk_rows"
exportOptionFileName = "filename"
exportOptionCompression = "compression"
)

var exportOptionExpectValues = map[string]KVStringOptValidate{
exportOptionChunkSize: KVStringOptRequireValue,
exportOptionChunkRows: KVStringOptRequireValue,
exportOptionDelimiter: KVStringOptRequireValue,
exportOptionFileName: KVStringOptRequireValue,
exportOptionNullAs: KVStringOptRequireValue,
exportOptionCompression: KVStringOptRequireValue,
}

const exportChunkSizeDefault = 100000
const exportChunkRowsDefault = 100000
const exportFilePatternPart = "%part%"
const exportFilePatternDefault = exportFilePatternPart + ".csv"
const exportCompressionCodec = "gzip"
Expand All @@ -85,11 +91,12 @@ func (ef *execFactory) ConstructExport(
return nil, errors.Errorf("unsupported export format: %q", fileFormat)
}

fileNameDatum, err := fileName.Eval(ef.planner.EvalContext())
destinationDatum, err := fileName.Eval(ef.planner.EvalContext())
if err != nil {
return nil, err
}
fileNameStr, ok := fileNameDatum.(*tree.DString)

destination, ok := destinationDatum.(*tree.DString)
if !ok {
return nil, errors.Errorf("expected string value for the file location")
}
Expand All @@ -112,13 +119,13 @@ func (ef *execFactory) ConstructExport(
csvOpts.NullEncoding = &override
}

chunkSize := exportChunkSizeDefault
if override, ok := optVals[exportOptionChunkSize]; ok {
chunkSize, err = strconv.Atoi(override)
chunkRows := exportChunkRowsDefault
if override, ok := optVals[exportOptionChunkRows]; ok {
chunkRows, err = strconv.Atoi(override)
if err != nil {
return nil, pgerror.WithCandidateCode(err, pgcode.InvalidParameterValue)
}
if chunkSize < 1 {
if chunkRows < 1 {
return nil, pgerror.New(pgcode.InvalidParameterValue, "invalid csv chunk size")
}
}
Expand All @@ -135,11 +142,15 @@ func (ef *execFactory) ConstructExport(
}
}

exportID := ef.planner.stmt.queryID.String()
namePattern := fmt.Sprintf("export%s-%s", exportID, exportFilePatternDefault)

return &exportNode{
source: input.(planNode),
fileName: string(*fileNameStr),
destination: string(*destination),
fileNamePattern: namePattern,
csvOpts: csvOpts,
chunkSize: chunkSize,
chunkRows: chunkRows,
fileCompression: codec,
}, nil
}
2 changes: 1 addition & 1 deletion pkg/sql/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ func (v *planVisitor) visitInternal(plan planNode, name string) {

case *exportNode:
if v.observer.attr != nil {
v.observer.attr(name, "destination", n.fileName)
v.observer.attr(name, "destination", n.destination)
}
n.source = v.visit(n.source)
}
Expand Down

0 comments on commit 59dae5d

Please sign in to comment.