Skip to content

Commit

Permalink
importccl support option to compress output files using gzip
Browse files Browse the repository at this point in the history
This commit extends EXPORT functionality by enabling compression of the
exported stream as suggested in cockroachdb#45579. Currently only gzip is supported
and the export clause to use compression looks as following:

```
export into csv 's3://export.csv' with compression = gzip from select * from foo;
```

Signed-off-by: Artem Barger <[email protected]>

Release note (sql change): support option to compress output files using
gzip

Release justification: none
  • Loading branch information
C0rWin committed Mar 20, 2020
1 parent 8eff499 commit 4b78863
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 112 deletions.
121 changes: 106 additions & 15 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package importccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"strings"
Expand All @@ -31,6 +32,98 @@ import (
const exportFilePatternPart = "%part%"
const exportFilePatternDefault = exportFilePatternPart + ".csv"

// csvExporter data structure to augment the compression
// and csv writer, encapsulating the internals to make
// exporting oblivious for the consumers
type csvExporter struct {
compressor *gzip.Writer
buf *bytes.Buffer
csvWriter *csv.Writer
}

// Write append record to csv file
func (c *csvExporter) Write(record []string) error {
return c.csvWriter.Write(record)
}

// Close closes the compressor writer which
// appends archive footers
func (c *csvExporter) Close() error {
if c.compressor != nil {
return c.compressor.Close()
}
return nil
}

// Flush flushes both csv and compressor writer if
// initialized
func (c *csvExporter) Flush() {
c.csvWriter.Flush()
if c.compressor != nil {
c.compressor.Flush()
}
}

// ResetBuffer resets the buffer and compressor state.
func (c *csvExporter) ResetBuffer() {
c.buf.Reset()
if c.compressor != nil {
// Brings compressor to its initial state
c.compressor.Reset(c.buf)
}
}

// Bytes results in the slice of bytes with compressed content
func (c *csvExporter) Bytes() []byte {
return c.buf.Bytes()
}

// Len returns length of the buffer with content
func (c *csvExporter) Len() int {
return c.buf.Len()
}

func (c *csvExporter) FileName(spec execinfrapb.CSVWriterSpec, part string) string {
pattern := exportFilePatternDefault
if spec.NamePattern != "" {
pattern = spec.NamePattern
}

fileName := strings.Replace(pattern, exportFilePatternPart, part, -1)
// TODO: add suffix based on compressor type
if c.compressor != nil {
fileName += ".gz"
}
return fileName
}

func newCSVExporter(sp execinfrapb.CSVWriterSpec) *csvExporter {
buf := bytes.NewBuffer([]byte{})
var exporter *csvExporter
switch sp.CompressionCodec {
case execinfrapb.FileCompression_Gzip:
{
writer := gzip.NewWriter(buf)
exporter = &csvExporter{
compressor: writer,
buf: buf,
csvWriter: csv.NewWriter(writer),
}
}
default:
{
exporter = &csvExporter{
buf: buf,
csvWriter: csv.NewWriter(buf),
}
}
}
if sp.Options.Comma != 0 {
exporter.csvWriter.Comma = sp.Options.Comma
}
return exporter
}

func newCSVWriterProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
Expand Down Expand Up @@ -85,22 +178,14 @@ func (sp *csvWriter) Run(ctx context.Context) {
defer tracing.FinishSpan(span)

err := func() error {
pattern := exportFilePatternDefault
if sp.spec.NamePattern != "" {
pattern = sp.spec.NamePattern
}

typs := sp.input.OutputTypes()
sp.input.Start(ctx)
input := execinfra.MakeNoMetadataRowSource(sp.input, sp.output)

alloc := &sqlbase.DatumAlloc{}

var buf bytes.Buffer
writer := csv.NewWriter(&buf)
if sp.spec.Options.Comma != 0 {
writer.Comma = sp.spec.Options.Comma
}
writer := newCSVExporter(sp.spec)

nullsAs := ""
if sp.spec.Options.NullEncoding != nil {
nullsAs = *sp.spec.Options.NullEncoding
Expand All @@ -114,7 +199,7 @@ func (sp *csvWriter) Run(ctx context.Context) {
done := false
for {
var rows int64
buf.Reset()
writer.ResetBuffer()
for {
if sp.spec.ChunkRows > 0 && rows >= sp.spec.ChunkRows {
break
Expand Down Expand Up @@ -160,12 +245,18 @@ func (sp *csvWriter) Run(ctx context.Context) {
}
defer es.Close()

size := buf.Len()

part := fmt.Sprintf("n%d.%d", sp.flowCtx.EvalCtx.NodeID, chunk)
chunk++
filename := strings.Replace(pattern, exportFilePatternPart, part, -1)
if err := es.WriteFile(ctx, filename, bytes.NewReader(buf.Bytes())); err != nil {
filename := writer.FileName(sp.spec, part)
// Close writer to ensure buffer and any compression footer is flushed.
err = writer.Close()
if err != nil {
return errors.New(fmt.Sprintf("failed to close exporting writer, error %s", err))
}

size := writer.Len()

if err := es.WriteFile(ctx, filename, bytes.NewReader(writer.Bytes())); err != nil {
return err
}
res := sqlbase.EncDatumRow{
Expand Down
37 changes: 37 additions & 0 deletions pkg/ccl/importccl/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
package importccl_test

import (
"compress/gzip"
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -200,6 +202,41 @@ func TestExportOrder(t *testing.T) {
}
}

func TestExportOrderCompressed(t *testing.T) {
defer leaktest.AfterTest(t)()
dir, cleanupDir := testutils.TempDir(t)
defer cleanupDir()

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir})
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`)
sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' with compression = gzip from select * from foo order by y asc limit 2`)
fi, err := os.Open(filepath.Join(dir, "order", "n1.0.csv.gz"))
defer fi.Close()
if err != nil {
t.Fatal(err)
}

gzipReader, err := gzip.NewReader(fi)
defer gzipReader.Close()
if err != nil {
t.Fatal(err)
}

content, err := ioutil.ReadAll(gzipReader)
if err != nil {
t.Fatal(err)
}

if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
}
}

func TestExportShow(t *testing.T) {
defer leaktest.AfterTest(t)()
dir, cleanupDir := testutils.TempDir(t)
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3227,10 +3227,11 @@ func (dsp *DistSQLPlanner) createPlanForExport(
}

core := execinfrapb.ProcessorCoreUnion{CSVWriter: &execinfrapb.CSVWriterSpec{
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
CompressionCodec: n.fileCompression,
}}

resTypes := make([]types.T, len(sqlbase.ExportColumns))
Expand Down
Loading

0 comments on commit 4b78863

Please sign in to comment.