Skip to content

Commit

Permalink
wip/importccl support option to compress output files using gzip
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Barger <[email protected]>

Release note (sql change): support option to compress output files using
gzip
  • Loading branch information
C0rWin committed Mar 11, 2020
1 parent eac401f commit 713ca1d
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 104 deletions.
105 changes: 91 additions & 14 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ package importccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
Expand All @@ -31,6 +33,88 @@ import (
const exportFilePatternPart = "%part%"
const exportFilePatternDefault = exportFilePatternPart + ".csv"

// csvExporter data structure to augment the compression
// and csv writer, encapsulating the internals to make
// exporting oblivious for the consumers
type csvExporter struct {
compressor io.WriteCloser
buf *bytes.Buffer
csvWriter *csv.Writer
}

// Write
func (c *csvExporter) Write(record []string) error {
return c.csvWriter.Write(record)
}

// Close
func (c *csvExporter) Close() error {
if c.compressor != nil {
return c.compressor.Close()
}
return nil
}

// Flush
func (c *csvExporter) Flush() {
c.csvWriter.Flush()
}

// ResetBuffer
func (c *csvExporter) ResetBuffer() {
c.buf.Reset()
}

func (c *csvExporter) Bytes() []byte {
return c.buf.Bytes()
}

// Len
func (c *csvExporter) Len() int {
return c.buf.Len()
}

func (c *csvExporter) FileName(spec execinfrapb.CSVWriterSpec, part string) string {
pattern := exportFilePatternDefault
if spec.NamePattern != "" {
pattern = spec.NamePattern
}

fileName := strings.Replace(pattern, exportFilePatternPart, part, -1)
// TODO: add suffix based on compressor type
if c.compressor != nil {
fileName += ".gz"
}
return fileName
}

func newCSVExporter(sp execinfrapb.CSVWriterSpec) *csvExporter {
buf := bytes.NewBuffer([]byte{})
var exporter *csvExporter
switch sp.CompressionCodec {
case execinfrapb.CSVWriterSpec_gzip:
{
writer := gzip.NewWriter(buf)
exporter = &csvExporter{
compressor: writer,
buf: buf,
csvWriter: csv.NewWriter(writer),
}
}
default:
{
exporter = &csvExporter{
buf: buf,
csvWriter: csv.NewWriter(buf),
}
}
}
if sp.Options.Comma != 0 {
exporter.csvWriter.Comma = sp.Options.Comma
}
return exporter
}

func newCSVWriterProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
Expand Down Expand Up @@ -85,22 +169,15 @@ func (sp *csvWriter) Run(ctx context.Context) {
defer tracing.FinishSpan(span)

err := func() error {
pattern := exportFilePatternDefault
if sp.spec.NamePattern != "" {
pattern = sp.spec.NamePattern
}

typs := sp.input.OutputTypes()
sp.input.Start(ctx)
input := execinfra.MakeNoMetadataRowSource(sp.input, sp.output)

alloc := &sqlbase.DatumAlloc{}

var buf bytes.Buffer
writer := csv.NewWriter(&buf)
if sp.spec.Options.Comma != 0 {
writer.Comma = sp.spec.Options.Comma
}
writer := newCSVExporter(sp.spec)
defer writer.Close()

nullsAs := ""
if sp.spec.Options.NullEncoding != nil {
nullsAs = *sp.spec.Options.NullEncoding
Expand All @@ -114,7 +191,7 @@ func (sp *csvWriter) Run(ctx context.Context) {
done := false
for {
var rows int64
buf.Reset()
writer.ResetBuffer()
for {
if sp.spec.ChunkRows > 0 && rows >= sp.spec.ChunkRows {
break
Expand Down Expand Up @@ -160,12 +237,12 @@ func (sp *csvWriter) Run(ctx context.Context) {
}
defer es.Close()

size := buf.Len()
size := writer.Len()

part := fmt.Sprintf("n%d.%d", sp.flowCtx.EvalCtx.NodeID, chunk)
chunk++
filename := strings.Replace(pattern, exportFilePatternPart, part, -1)
if err := es.WriteFile(ctx, filename, bytes.NewReader(buf.Bytes())); err != nil {
filename := writer.FileName(sp.spec, part)
if err := es.WriteFile(ctx, filename, bytes.NewReader(writer.Bytes())); err != nil {
return err
}
res := sqlbase.EncDatumRow{
Expand Down
16 changes: 12 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3219,11 +3219,19 @@ func (dsp *DistSQLPlanner) createPlanForExport(
return PhysicalPlan{}, err
}

// read value of compression codec, default is no compression
// otherwise lookup from supported set of codec values
compessionCodec := execinfrapb.CSVWriterSpec_none
if codec, exist := execinfrapb.CSVWriterSpec_Compression_value[n.codecName]; exist {
compessionCodec = execinfrapb.CSVWriterSpec_Compression(codec)
}

core := execinfrapb.ProcessorCoreUnion{CSVWriter: &execinfrapb.CSVWriterSpec{
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
Destination: n.fileName,
NamePattern: exportFilePatternDefault,
Options: n.csvOpts,
ChunkRows: int64(n.chunkSize),
CompressionCodec: compessionCodec,
}}

resTypes := make([]types.T, len(sqlbase.ExportColumns))
Expand Down
Loading

0 comments on commit 713ca1d

Please sign in to comment.