Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: support option to compress output files using gzip #45978

Merged
merged 1 commit into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 110 additions & 16 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package importccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"strings"
Expand All @@ -31,6 +32,99 @@ import (
const exportFilePatternPart = "%part%"
const exportFilePatternDefault = exportFilePatternPart + ".csv"

// csvExporter data structure to augment the compression
// and csv writer, encapsulating the internals to make
// exporting oblivious for the consumers
type csvExporter struct {
compressor *gzip.Writer
buf *bytes.Buffer
csvWriter *csv.Writer
}

// Write append record to csv file
func (c *csvExporter) Write(record []string) error {
return c.csvWriter.Write(record)
}

// Close closes the compressor writer which
// appends archive footers
func (c *csvExporter) Close() error {
if c.compressor != nil {
return c.compressor.Close()
}
return nil
}

// Flush flushes both csv and compressor writer if
// initialized
func (c *csvExporter) Flush() error {
c.csvWriter.Flush()
dt marked this conversation as resolved.
Show resolved Hide resolved
if c.compressor != nil {
return c.compressor.Flush()
}
return nil
}

// ResetBuffer resets the buffer and compressor state.
func (c *csvExporter) ResetBuffer() {
dt marked this conversation as resolved.
Show resolved Hide resolved
c.buf.Reset()
if c.compressor != nil {
// Brings compressor to its initial state
c.compressor.Reset(c.buf)
}
}

// Bytes results in the slice of bytes with compressed content
func (c *csvExporter) Bytes() []byte {
return c.buf.Bytes()
}

// Len returns length of the buffer with content
func (c *csvExporter) Len() int {
return c.buf.Len()
}

func (c *csvExporter) FileName(spec execinfrapb.CSVWriterSpec, part string) string {
pattern := exportFilePatternDefault
if spec.NamePattern != "" {
pattern = spec.NamePattern
}

fileName := strings.Replace(pattern, exportFilePatternPart, part, -1)
// TODO: add suffix based on compressor type
if c.compressor != nil {
fileName += ".gz"
}
return fileName
}

func newCSVExporter(sp execinfrapb.CSVWriterSpec) *csvExporter {
buf := bytes.NewBuffer([]byte{})
var exporter *csvExporter
switch sp.CompressionCodec {
case execinfrapb.FileCompression_Gzip:
{
writer := gzip.NewWriter(buf)
exporter = &csvExporter{
compressor: writer,
buf: buf,
csvWriter: csv.NewWriter(writer),
}
}
default:
{
exporter = &csvExporter{
buf: buf,
csvWriter: csv.NewWriter(buf),
}
}
}
if sp.Options.Comma != 0 {
exporter.csvWriter.Comma = sp.Options.Comma
}
return exporter
}

func newCSVWriterProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
Expand Down Expand Up @@ -85,22 +179,14 @@ func (sp *csvWriter) Run(ctx context.Context) {
defer tracing.FinishSpan(span)

err := func() error {
pattern := exportFilePatternDefault
if sp.spec.NamePattern != "" {
pattern = sp.spec.NamePattern
}

typs := sp.input.OutputTypes()
sp.input.Start(ctx)
input := execinfra.MakeNoMetadataRowSource(sp.input, sp.output)

alloc := &sqlbase.DatumAlloc{}

var buf bytes.Buffer
writer := csv.NewWriter(&buf)
if sp.spec.Options.Comma != 0 {
writer.Comma = sp.spec.Options.Comma
}
writer := newCSVExporter(sp.spec)

nullsAs := ""
if sp.spec.Options.NullEncoding != nil {
nullsAs = *sp.spec.Options.NullEncoding
Expand All @@ -114,7 +200,7 @@ func (sp *csvWriter) Run(ctx context.Context) {
done := false
for {
var rows int64
buf.Reset()
writer.ResetBuffer()
for {
if sp.spec.ChunkRows > 0 && rows >= sp.spec.ChunkRows {
break
Expand Down Expand Up @@ -148,7 +234,9 @@ func (sp *csvWriter) Run(ctx context.Context) {
if rows < 1 {
break
}
writer.Flush()
if err := writer.Flush(); err != nil {
return errors.New(fmt.Sprintf("failed to flush csv writer, error %s", err))
}

conf, err := cloud.ExternalStorageConfFromURI(sp.spec.Destination)
if err != nil {
Expand All @@ -160,12 +248,18 @@ func (sp *csvWriter) Run(ctx context.Context) {
}
defer es.Close()

size := buf.Len()

part := fmt.Sprintf("n%d.%d", sp.flowCtx.EvalCtx.NodeID, chunk)
chunk++
filename := strings.Replace(pattern, exportFilePatternPart, part, -1)
if err := es.WriteFile(ctx, filename, bytes.NewReader(buf.Bytes())); err != nil {
filename := writer.FileName(sp.spec, part)
// Close writer to ensure buffer and any compression footer is flushed.
err = writer.Close()
if err != nil {
return errors.New(fmt.Sprintf("failed to close exporting writer, error %s", err))
}

size := writer.Len()

if err := es.WriteFile(ctx, filename, bytes.NewReader(writer.Bytes())); err != nil {
return err
}
res := sqlbase.EncDatumRow{
Expand Down
46 changes: 46 additions & 0 deletions pkg/ccl/importccl/exportcsv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
package importccl_test

import (
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
Expand Down Expand Up @@ -200,6 +203,49 @@ func TestExportOrder(t *testing.T) {
}
}

func TestExportOrderCompressed(t *testing.T) {
defer leaktest.AfterTest(t)()
dir, cleanupDir := testutils.TempDir(t)
defer cleanupDir()

var close = func(c io.Closer) {
if err := c.Close(); err != nil {
t.Fatalf("failed to close stream, got error %s", err)
}
}

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ExternalIODir: dir})
defer srv.Stopper().Stop(context.Background())
sqlDB := sqlutils.MakeSQLRunner(db)

sqlDB.Exec(t, `create table foo (i int primary key, x int, y int, z int, index (y))`)
sqlDB.Exec(t, `insert into foo values (1, 12, 3, 14), (2, 22, 2, 24), (3, 32, 1, 34)`)

sqlDB.Exec(t, `EXPORT INTO CSV 'nodelocal://0/order' with compression = gzip from select * from foo order by y asc limit 2`)
fi, err := os.Open(filepath.Join(dir, "order", "n1.0.csv.gz"))
defer close(fi)

if err != nil {
t.Fatal(err)
}

gzipReader, err := gzip.NewReader(fi)
defer close(gzipReader)

if err != nil {
t.Fatal(err)
}

content, err := ioutil.ReadAll(gzipReader)
if err != nil {
t.Fatal(err)
}

if expected, got := "3,32,1,34\n2,22,2,24\n", string(content); expected != got {
t.Fatalf("expected %q, got %q", expected, got)
}
}

func TestExportShow(t *testing.T) {
defer leaktest.AfterTest(t)()
dir, cleanupDir := testutils.TempDir(t)
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3227,10 +3227,11 @@ func (dsp *DistSQLPlanner) createPlanForExport(
}

core := execinfrapb.ProcessorCoreUnion{CSVWriter: &execinfrapb.CSVWriterSpec{
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
CompressionCodec: n.fileCompression,
}}

resTypes := make([]types.T, len(sqlbase.ExportColumns))
Expand Down
Loading