Skip to content

Commit

Permalink
Merge #87472
Browse files Browse the repository at this point in the history
87472: util,changefeedccl: streaminger csv writes r=HonoreDB a=HonoreDB

The changefeed csv encoder was reading an entire row into memory before encoding it into CSV. There's no particular need to do so, and CSV is now being used at scale, so this PR modifies the CSV writer API slightly to allow for writing fields directly from an iterator.

Release justification: Performance tweak to new functionality.

Release note: None

Co-authored-by: Aaron Zinger <[email protected]>
  • Loading branch information
craig[bot] and HonoreDB committed Sep 12, 2022
2 parents 3b95d11 + 212bf4a commit 7bcbc70
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 90 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ go_test(
"avro_test.go",
"bench_test.go",
"changefeed_test.go",
"csv_test.go",
"encoder_test.go",
"event_processing_test.go",
"helpers_test.go",
Expand Down
19 changes: 19 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,25 @@ func TestingMakeEventRow(
}
}

// TestingMakeEventRowFromDatums initializes a Row that will return the provided datums when
// ForEachColumn is called. If anything else needs to be hydrated, use TestingMakeEventRow
// instead.
func TestingMakeEventRowFromDatums(datums tree.Datums) Row {
var desc EventDescriptor
var encRow rowenc.EncDatumRow
var alloc tree.DatumAlloc
for i, d := range datums {
desc.cols = append(desc.cols, ResultColumn{ord: i})
desc.valueCols = append(desc.valueCols, i)
encRow = append(encRow, rowenc.DatumToEncDatum(d.ResolvedType(), d))
}
return Row{
EventDescriptor: &desc,
datums: encRow,
alloc: &alloc,
}
}

// TestingGetFamilyIDFromKey returns family ID encoded in the specified roachpb.Key.
// Exposed for testing.
func TestingGetFamilyIDFromKey(
Expand Down
76 changes: 76 additions & 0 deletions pkg/ccl/changefeedccl/csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func BenchmarkCSVEncodeWideRow(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

// 1024 columns of three runes each.
benchmarkEncodeCSV(b, 1024, 1024*3, 1024)
}

func BenchmarkCSVEncodeWideRowASCII(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

// 1024 columns of three runes each, all ASCII codepoints.
benchmarkEncodeCSV(b, 1024, 1024*3, 127)
}

func BenchmarkCSVEncodeWideColumnsASCII(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

// 3 columns of 1024 runes each, all ASCII codepoints.
benchmarkEncodeCSV(b, 3, 1024*3, 127)
}

func BenchmarkCSVEncodeWideColumns(b *testing.B) {
defer leaktest.AfterTest(b)()
defer log.Scope(b).Close(b)

// 3 columns of 1024 runes each.
benchmarkEncodeCSV(b, 3, 1024*3, 1024)
}

func benchmarkEncodeCSV(b *testing.B, numCols int, numChars int, maxCodepoint int) {
encoder := newCSVEncoder(changefeedbase.EncodingOptions{Format: changefeedbase.OptFormatCSV})
ctx := context.Background()
vals := make([]string, numCols)
for i := 0; i < numChars; i++ {
vals[i%numCols] += fmt.Sprintf("%c", i%(maxCodepoint+1))
}
datums := tree.Datums{}
for _, str := range vals {
datums = append(datums, tree.NewDString(str))
}
row := cdcevent.TestingMakeEventRowFromDatums(datums)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
_, err := encoder.EncodeValue(ctx, eventContext{}, row, cdcevent.Row{})
require.NoError(b, err)
}
}
21 changes: 11 additions & 10 deletions pkg/ccl/changefeedccl/encoder_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import (
)

type csvEncoder struct {
csvRow []string
buf *bytes.Buffer
writer *csv.Writer
buf *bytes.Buffer
formatter *tree.FmtCtx
writer *csv.Writer
}

var _ Encoder = &csvEncoder{}

func newCSVEncoder(opts changefeedbase.EncodingOptions) *csvEncoder {
newBuf := bytes.NewBuffer([]byte{})
newEncoder := &csvEncoder{
buf: newBuf,
writer: csv.NewWriter(newBuf),
buf: newBuf,
formatter: tree.NewFmtCtx(tree.FmtSimple),
writer: csv.NewWriter(newBuf),
}
newEncoder.writer.SkipNewline = true
return newEncoder
Expand All @@ -50,16 +51,16 @@ func (e *csvEncoder) EncodeValue(
if updatedRow.IsDeleted() {
return nil, errors.Errorf(`cannot encode deleted rows into CSV format`)
}
e.csvRow = e.csvRow[:0]
e.buf.Reset()
if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
e.csvRow = append(e.csvRow, tree.AsString(d))
return nil
e.formatter.Reset()
e.formatter.FormatNode(d)
return e.writer.WriteField(&e.formatter.Buffer)
}); err != nil {
return nil, err
}

e.buf.Reset()
if err := e.writer.Write(e.csvRow); err != nil {
if err := e.writer.FinishRecord(); err != nil {
return nil, err
}
e.writer.Flush()
Expand Down
193 changes: 114 additions & 79 deletions pkg/util/encoding/csv/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ package csv

import (
"bufio"
"bytes"
"io"
"strings"
"unicode"
"unicode/utf8"
)

// A Writer writes records to a CSV encoded file.
Expand All @@ -32,19 +31,25 @@ import (
//
// If UseCRLF is true, the Writer ends each record with \r\n instead of \n.
type Writer struct {
Comma rune // Field delimiter (set to ',' by NewWriter)
Escape rune
UseCRLF bool // True to use \r\n as the line terminator
SkipNewline bool // True to skip \n as the line terminator
w *bufio.Writer
Comma rune // Field delimiter (set to ',' by NewWriter)
Escape rune
UseCRLF bool // True to use \r\n as the line terminator
SkipNewline bool // True to skip \n as the line terminator
w *bufio.Writer
scratch *bytes.Buffer
i int
midRow bool
currentRecordNeedsQuotes bool
maybeTerminatorString bool
}

// NewWriter returns a new Writer that writes to w.
func NewWriter(w io.Writer) *Writer {
return &Writer{
Comma: ',',
Escape: '"',
w: bufio.NewWriter(w),
Comma: ',',
Escape: '"',
w: bufio.NewWriter(w),
scratch: new(bytes.Buffer),
}
}

Expand All @@ -55,64 +60,118 @@ func (w *Writer) Write(record []string) error {
return errInvalidDelim
}

for n, field := range record {
if n > 0 {
if _, err := w.w.WriteRune(w.Comma); err != nil {
return err
}
for _, field := range record {
if err := w.WriteField(bytes.NewBufferString(field)); err != nil {
return err
}
}
return w.FinishRecord()
}

// If we don't have to have a quoted field then just
// write out the field and continue to the next field.
if !w.fieldNeedsQuotes(field) {
if _, err := w.w.WriteString(field); err != nil {
return err
}
continue
// FinishRecord writes the newline at the end of a record.
// Only call FinishRecord in conjunction with WriteField,
// not Write.
func (w *Writer) FinishRecord() (err error) {
if !w.SkipNewline {
if w.UseCRLF {
_, err = w.w.WriteString("\r\n")
} else {
err = w.w.WriteByte('\n')
}
if err := w.w.WriteByte('"'); err != nil {
}
w.midRow = false
return err
}

// WriteField writes an individual field.
func (w *Writer) WriteField(field *bytes.Buffer) (e error) {
if w.midRow {
if _, err := w.w.WriteRune(w.Comma); err != nil {
return err
}

for _, r1 := range field {
var err error
switch r1 {
case '"':
_, err = w.w.WriteString(string(w.Escape) + `"`)
case w.Escape:
_, err = w.w.WriteString(string(w.Escape) + string(w.Escape))
case '\r':
if !w.UseCRLF {
err = w.w.WriteByte('\r')
}
case '\n':
if w.UseCRLF {
_, err = w.w.WriteString("\r\n")
} else {
err = w.w.WriteByte('\n')
}
default:
_, err = w.w.WriteRune(r1)
}
w.midRow = true
w.i = 0
w.currentRecordNeedsQuotes = false
w.scratch.Reset()
w.maybeTerminatorString = true
// Iterate through the input rune by rune, escaping where needed,
// modifying linebreaks as configured by w.UseCRLF, and tracking
// whether the string as a whole needs to be enclosed in quotes.
// We write to a scratch buffer instead of directly to w since we
// don't know yet if the first byte needs to be '"'.
var r rune
for ; e == nil; w.i++ {
r, _, e = field.ReadRune()
if e != nil {
break
}
// Check if the string exactly equals the Postgres terminator string \.
w.maybeTerminatorString = w.maybeTerminatorString && ((w.i == 0 && r == '\\') || (w.i == 1 && r == '.'))
switch r {
case '"', w.Escape:
w.currentRecordNeedsQuotes = true
_, e = w.scratch.WriteRune(w.Escape)
if e == nil {
_, e = w.scratch.WriteRune(r)
}
case w.Comma:
w.currentRecordNeedsQuotes = true
_, e = w.scratch.WriteRune(r)
case '\r':
// TODO: This is copying how the previous implementation behaved,
// even though it looks wrong: if we're omitting the return, why
// do we still need to quote the field?
w.currentRecordNeedsQuotes = true
if !w.UseCRLF {
e = w.scratch.WriteByte('\r')
}
if err != nil {
return err
case '\n':
w.currentRecordNeedsQuotes = true
if w.UseCRLF {
_, e = w.scratch.WriteString("\r\n")
} else {
e = w.scratch.WriteByte('\n')
}
default:
if w.i == 0 {
w.currentRecordNeedsQuotes = unicode.IsSpace(r)
}
_, e = w.scratch.WriteRune(r)
}
}

if err := w.w.WriteByte('"'); err != nil {
return err
}
if e != io.EOF {
return e
}
var err error

if !w.SkipNewline {
if w.UseCRLF {
_, err = w.w.WriteString("\r\n")
} else {
err = w.w.WriteByte('\n')
w.maybeTerminatorString = w.maybeTerminatorString && w.i == 2
w.currentRecordNeedsQuotes = w.currentRecordNeedsQuotes || w.maybeTerminatorString

// By now we know whether or not the entire field needs to be quoted.
// Fields with a Comma, fields with a quote or newline, and
// fields which start with a space must be enclosed in quotes.
// We used to quote empty strings, but we do not anymore (as of Go 1.4).
// The two representations should be equivalent, but Postgres distinguishes
// quoted vs non-quoted empty string during database imports, and it has
// an option to force the quoted behavior for non-quoted CSV but it has
// no option to force the non-quoted behavior for quoted CSV, making
// CSV with quoted empty strings strictly less useful.
// Not quoting the empty string also makes this package match the behavior
// of Microsoft Excel and Google Drive.
// For Postgres, quote the data terminating string `\.`.
if w.currentRecordNeedsQuotes {
e = w.w.WriteByte('"')
if e != nil {
return e
}
}
return err
_, e = w.scratch.WriteTo(w.w)
if w.currentRecordNeedsQuotes {
e = w.w.WriteByte('"')
}

return e
}

// Flush writes any buffered data to the underlying io.Writer.
Expand All @@ -137,27 +196,3 @@ func (w *Writer) WriteAll(records [][]string) error {
}
return w.w.Flush()
}

// fieldNeedsQuotes reports whether our field must be enclosed in quotes.
// Fields with a Comma, fields with a quote or newline, and
// fields which start with a space must be enclosed in quotes.
// We used to quote empty strings, but we do not anymore (as of Go 1.4).
// The two representations should be equivalent, but Postgres distinguishes
// quoted vs non-quoted empty string during database imports, and it has
// an option to force the quoted behavior for non-quoted CSV but it has
// no option to force the non-quoted behavior for quoted CSV, making
// CSV with quoted empty strings strictly less useful.
// Not quoting the empty string also makes this package match the behavior
// of Microsoft Excel and Google Drive.
// For Postgres, quote the data terminating string `\.`.
func (w *Writer) fieldNeedsQuotes(field string) bool {
if field == "" {
return false
}
if field == `\.` || strings.ContainsRune(field, w.Comma) || strings.ContainsAny(field, "\"\r\n") {
return true
}

r1, _ := utf8.DecodeRuneInString(field)
return unicode.IsSpace(r1)
}
Loading

0 comments on commit 7bcbc70

Please sign in to comment.