Skip to content

Commit

Permalink
changefeedccl: add support for gzip compression
Browse files Browse the repository at this point in the history
This adds support for requesting data files written to cloud storage sinks
be compressed using gzip.

Release note (enterprise change): CDC to cloud-storage sinks now supports optional gzip compression.
  • Loading branch information
dt committed Feb 24, 2020
1 parent 8820c13 commit 014d9dc
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 77 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
OptResolvedTimestamps = `resolved`
OptUpdatedTimestamps = `updated`
OptDiff = `diff`
OptCompression = `compression`

OptEnvelopeKeyOnly EnvelopeType = `key_only`
OptEnvelopeRow EnvelopeType = `row`
Expand Down Expand Up @@ -62,4 +63,5 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptResolvedTimestamps: sql.KVStringOptAny,
OptUpdatedTimestamps: sql.KVStringOptRequireNoValue,
OptDiff: sql.KVStringOptRequireNoValue,
OptCompression: sql.KVStringOptRequireValue,
}
47 changes: 43 additions & 4 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/url"
"path/filepath"
"strings"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
Expand Down Expand Up @@ -50,7 +52,19 @@ func cloudStorageFormatTime(ts hlc.Timestamp) string {

type cloudStorageSinkFile struct {
cloudStorageSinkKey
buf bytes.Buffer
codec io.WriteCloser
rawSize int
buf bytes.Buffer
}

var _ io.Writer = &cloudStorageSinkFile{}

func (f *cloudStorageSinkFile) Write(p []byte) (int, error) {
f.rawSize += len(p)
if f.codec != nil {
return f.codec.Write(p)
}
return f.buf.Write(p)
}

// cloudStorageSink writes changefeed output to files in a cloud storage bucket
Expand Down Expand Up @@ -259,6 +273,8 @@ type cloudStorageSink struct {
ext string
recordDelimFn func(io.Writer) error

compression string

es cloud.ExternalStorage

// These are fields to track information needed to output files based on the naming
Expand All @@ -276,6 +292,8 @@ type cloudStorageSink struct {
prevFilename string
}

const sinkCompressionGzip = "gzip"

var cloudStorageSinkIDAtomic int64

func makeCloudStorageSink(
Expand Down Expand Up @@ -334,6 +352,15 @@ func makeCloudStorageSink(
return nil, errors.Errorf(`this sink requires the WITH %s option`, changefeedbase.OptKeyInValue)
}

if codec, ok := opts[changefeedbase.OptCompression]; ok && codec != "" {
if strings.EqualFold(codec, "gzip") {
s.compression = sinkCompressionGzip
s.ext = s.ext + ".gz"
} else {
return nil, errors.Errorf(`unsupported compression codec %q`, codec)
}
}

var err error
if s.es, err = makeExternalStorageFromURI(ctx, baseURI); err != nil {
return nil, err
Expand All @@ -352,6 +379,10 @@ func (s *cloudStorageSink) getOrCreateFile(
f := &cloudStorageSinkFile{
cloudStorageSinkKey: key,
}
switch s.compression {
case sinkCompressionGzip:
f.codec = gzip.NewWriter(&f.buf)
}
s.files.ReplaceOrInsert(f)
return f
}
Expand All @@ -367,10 +398,10 @@ func (s *cloudStorageSink) EmitRow(
file := s.getOrCreateFile(table.Name, table.Version)

// TODO(dan): Memory monitoring for this
if _, err := file.buf.Write(value); err != nil {
if _, err := file.Write(value); err != nil {
return err
}
if err := s.recordDelimFn(&file.buf); err != nil {
if err := s.recordDelimFn(file); err != nil {
return err
}

Expand Down Expand Up @@ -467,12 +498,20 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {

// file should not be used after flushing.
func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error {
if file.buf.Len() == 0 {
if file.rawSize == 0 {
// This method shouldn't be called with an empty file, but be defensive
// about not writing empty files anyway.
return nil
}

// If the file is written via compression codec, close the codec to ensure it
// has flushed to the underlying buffer.
if file.codec != nil {
if err := file.codec.Close(); err != nil {
return err
}
}

// We use this monotonically increasing fileID to ensure correct ordering
// among files emitted at the same timestamp during the same job session.
fileID := s.fileID
Expand Down
177 changes: 104 additions & 73 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package changefeedccl

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/blobs"
Expand All @@ -38,6 +41,19 @@ func TestCloudStorageSink(t *testing.T) {
dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()

gzipDecompress := func(t *testing.T, compressed []byte) []byte {
r, err := gzip.NewReader(bytes.NewReader(compressed))
if err != nil {
t.Fatal(err)
}
defer r.Close()
decompressed, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
return decompressed
}

// slurpDir returns the contents of every file under root (relative to the
// temp dir created above), sorted by the name of the file.
slurpDir := func(t *testing.T, root string) []string {
Expand All @@ -53,6 +69,9 @@ func TestCloudStorageSink(t *testing.T) {
if err != nil {
return err
}
if strings.HasSuffix(path, ".gz") {
file = gzipDecompress(t, file)
}
files = append(files, string(file))
return nil
}
Expand All @@ -67,9 +86,10 @@ func TestCloudStorageSink(t *testing.T) {
settings := cluster.MakeTestingClusterSettings()
settings.ExternalIODir = dir
opts := map[string]string{
changefeedbase.OptFormat: string(changefeedbase.OptFormatJSON),
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeWrapped),
changefeedbase.OptKeyInValue: ``,
changefeedbase.OptFormat: string(changefeedbase.OptFormatJSON),
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeWrapped),
changefeedbase.OptKeyInValue: ``,
changefeedbase.OptCompression: ``, //`gzip`,
}
ts := func(i int64) hlc.Timestamp { return hlc.Timestamp{WallTime: i} }
e, err := makeJSONEncoder(opts)
Expand Down Expand Up @@ -107,77 +127,88 @@ func TestCloudStorageSink(t *testing.T) {
require.Equal(t, `{"resolved":"5.0000000000"}`, string(resolvedFile))
})
t.Run(`single-node`, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
t2 := &sqlbase.TableDescriptor{Name: `t2`}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `single-node`
s, err := makeCloudStorageSink(
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

// Empty flush emits no files.
require.NoError(t, s.Flush(ctx))
require.Equal(t, []string(nil), slurpDir(t, dir))

// Emitting rows and flushing should write them out in one file per table. Note
// the ordering among these two files is non deterministic as either of them could
// be flushed first (and thus be assigned fileID 0).
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1)))
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1)))
require.NoError(t, s.EmitRow(ctx, t2, noKey, []byte(`w1`), ts(3)))
require.NoError(t, s.Flush(ctx))
expected := []string{
"v1\nv2\n",
"w1\n",
}
actual := slurpDir(t, dir)
sort.Strings(actual)
require.Equal(t, expected, actual)

// Flushing with no new emits writes nothing new.
require.NoError(t, s.Flush(ctx))
actual = slurpDir(t, dir)
sort.Strings(actual)
require.Equal(t, expected, actual)

// Without a flush, nothing new shows up.
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3)))
actual = slurpDir(t, dir)
sort.Strings(actual)
require.Equal(t, expected, actual)

// Note that since we haven't forwarded `testSpan` yet, all files initiated until
// this point must have the same `frontier` timestamp. Since fileID increases
// monotonically, the last file emitted should be ordered as such.
require.NoError(t, s.Flush(ctx))
require.Equal(t, []string{
"v3\n",
}, slurpDir(t, dir)[2:])

// Data from different versions of a table is put in different files, so that we
// can guarantee that all rows in any given file have the same schema.
// We also advance `testSpan` and `Flush` to make sure these new rows are read
// after the rows emitted above.
require.True(t, sf.Forward(testSpan, ts(4)))
require.NoError(t, s.Flush(ctx))
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v4`), ts(4)))
t1.Version = 2
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v5`), ts(5)))
require.NoError(t, s.Flush(ctx))
expected = []string{
"v4\n",
"v5\n",
before := opts[changefeedbase.OptCompression]
// Compression codecs include buffering that interferes with other tests,
// e.g. the bucketing test that configures very small flush sizes.
defer func() {
opts[changefeedbase.OptCompression] = before
}()
for _, compression := range []string{"", "gzip"} {
opts[changefeedbase.OptCompression] = compression
t.Run("compress="+compression, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
t2 := &sqlbase.TableDescriptor{Name: `t2`}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `single-node` + compression
s, err := makeCloudStorageSink(
ctx, `nodelocal:///`+dir, 1, unlimitedFileSize,
settings, opts, timestampOracle, externalStorageFromURI,
)
require.NoError(t, err)
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

// Empty flush emits no files.
require.NoError(t, s.Flush(ctx))
require.Equal(t, []string(nil), slurpDir(t, dir))

// Emitting rows and flushing should write them out in one file per table. Note
// the ordering among these two files is non deterministic as either of them could
// be flushed first (and thus be assigned fileID 0).
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1)))
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1)))
require.NoError(t, s.EmitRow(ctx, t2, noKey, []byte(`w1`), ts(3)))
require.NoError(t, s.Flush(ctx))
expected := []string{
"v1\nv2\n",
"w1\n",
}
actual := slurpDir(t, dir)
sort.Strings(actual)
require.Equal(t, expected, actual)

// Flushing with no new emits writes nothing new.
require.NoError(t, s.Flush(ctx))
actual = slurpDir(t, dir)
sort.Strings(actual)
require.Equal(t, expected, actual)

// Without a flush, nothing new shows up.
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3)))
actual = slurpDir(t, dir)
sort.Strings(actual)
require.Equal(t, expected, actual)

// Note that since we haven't forwarded `testSpan` yet, all files initiated until
// this point must have the same `frontier` timestamp. Since fileID increases
// monotonically, the last file emitted should be ordered as such.
require.NoError(t, s.Flush(ctx))
require.Equal(t, []string{
"v3\n",
}, slurpDir(t, dir)[2:])

// Data from different versions of a table is put in different files, so that we
// can guarantee that all rows in any given file have the same schema.
// We also advance `testSpan` and `Flush` to make sure these new rows are read
// after the rows emitted above.
require.True(t, sf.Forward(testSpan, ts(4)))
require.NoError(t, s.Flush(ctx))
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v4`), ts(4)))
t1.Version = 2
require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v5`), ts(5)))
require.NoError(t, s.Flush(ctx))
expected = []string{
"v4\n",
"v5\n",
}
actual = slurpDir(t, dir)
actual = actual[len(actual)-2:]
sort.Strings(actual)
require.Equal(t, expected, actual)
})
}
actual = slurpDir(t, dir)
actual = actual[len(actual)-2:]
sort.Strings(actual)
require.Equal(t, expected, actual)
})

t.Run(`multi-node`, func(t *testing.T) {
Expand Down

0 comments on commit 014d9dc

Please sign in to comment.