diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 9054068a6926..aa011b155f31 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -26,6 +26,7 @@ const ( OptResolvedTimestamps = `resolved` OptUpdatedTimestamps = `updated` OptDiff = `diff` + OptCompression = `compression` OptEnvelopeKeyOnly EnvelopeType = `key_only` OptEnvelopeRow EnvelopeType = `row` @@ -62,4 +63,5 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ OptResolvedTimestamps: sql.KVStringOptAny, OptUpdatedTimestamps: sql.KVStringOptRequireNoValue, OptDiff: sql.KVStringOptRequireNoValue, + OptCompression: sql.KVStringOptRequireValue, } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 27f95abdafa0..6b21f69f1bd2 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -10,11 +10,13 @@ package changefeedccl import ( "bytes" + "compress/gzip" "context" "fmt" "io" "net/url" "path/filepath" + "strings" "sync/atomic" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" @@ -50,7 +52,19 @@ func cloudStorageFormatTime(ts hlc.Timestamp) string { type cloudStorageSinkFile struct { cloudStorageSinkKey - buf bytes.Buffer + codec io.WriteCloser + rawSize int + buf bytes.Buffer +} + +var _ io.Writer = &cloudStorageSinkFile{} + +func (f *cloudStorageSinkFile) Write(p []byte) (int, error) { + f.rawSize += len(p) + if f.codec != nil { + return f.codec.Write(p) + } + return f.buf.Write(p) } // cloudStorageSink writes changefeed output to files in a cloud storage bucket @@ -259,6 +273,8 @@ type cloudStorageSink struct { ext string recordDelimFn func(io.Writer) error + compression string + es cloud.ExternalStorage // These are fields to track information needed to output files based on the naming @@ -276,6 +292,8 @@ type cloudStorageSink struct { prevFilename string } +const sinkCompressionGzip = "gzip" + var cloudStorageSinkIDAtomic int64 func makeCloudStorageSink( @@ -334,6 +352,15 @@ func makeCloudStorageSink( return nil, errors.Errorf(`this sink requires the WITH %s option`, changefeedbase.OptKeyInValue) } + if codec, ok := opts[changefeedbase.OptCompression]; ok && codec != "" { + if strings.EqualFold(codec, "gzip") { + s.compression = sinkCompressionGzip + s.ext = s.ext + ".gz" + } else { + return nil, errors.Errorf(`unsupported compression codec %q`, codec) + } + } + var err error if s.es, err = makeExternalStorageFromURI(ctx, baseURI); err != nil { return nil, err @@ -352,6 +379,10 @@ func (s *cloudStorageSink) getOrCreateFile( f := &cloudStorageSinkFile{ cloudStorageSinkKey: key, } + switch s.compression { + case sinkCompressionGzip: + f.codec = gzip.NewWriter(&f.buf) + } s.files.ReplaceOrInsert(f) return f } @@ -367,10 +398,10 @@ func (s *cloudStorageSink) EmitRow( file := s.getOrCreateFile(table.Name, table.Version) // TODO(dan): Memory monitoring for this - if _, err := file.buf.Write(value); err != nil { + if _, err := file.Write(value); err != nil { return err } - if err := s.recordDelimFn(&file.buf); err != nil { + if err := s.recordDelimFn(file); err != nil { return err } @@ -467,12 +498,20 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { // file should not be used after flushing. func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { - if file.buf.Len() == 0 { + if file.rawSize == 0 { // This method shouldn't be called with an empty file, but be defensive // about not writing empty files anyway. return nil } + // If the file is written via compression codec, close the codec to ensure it + // has flushed to the underlying buffer. + if file.codec != nil { + if err := file.codec.Close(); err != nil { + return err + } + } + // We use this monotonically increasing fileID to ensure correct ordering // among files emitted at the same timestamp during the same job session. fileID := s.fileID diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 372d923339e7..4de81ed955b6 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -9,6 +9,8 @@ package changefeedccl import ( + "bytes" + "compress/gzip" "context" "fmt" "io/ioutil" @@ -16,6 +18,7 @@ import ( "os" "path/filepath" "sort" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/blobs" @@ -38,6 +41,19 @@ func TestCloudStorageSink(t *testing.T) { dir, dirCleanupFn := testutils.TempDir(t) defer dirCleanupFn() + gzipDecompress := func(t *testing.T, compressed []byte) []byte { + r, err := gzip.NewReader(bytes.NewReader(compressed)) + if err != nil { + t.Fatal(err) + } + defer r.Close() + decompressed, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + return decompressed + } + // slurpDir returns the contents of every file under root (relative to the // temp dir created above), sorted by the name of the file. slurpDir := func(t *testing.T, root string) []string { @@ -53,6 +69,9 @@ func TestCloudStorageSink(t *testing.T) { if err != nil { return err } + if strings.HasSuffix(path, ".gz") { + file = gzipDecompress(t, file) + } files = append(files, string(file)) return nil } @@ -67,9 +86,10 @@ func TestCloudStorageSink(t *testing.T) { settings := cluster.MakeTestingClusterSettings() settings.ExternalIODir = dir opts := map[string]string{ - changefeedbase.OptFormat: string(changefeedbase.OptFormatJSON), - changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeWrapped), - changefeedbase.OptKeyInValue: ``, + changefeedbase.OptFormat: string(changefeedbase.OptFormatJSON), + changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeWrapped), + changefeedbase.OptKeyInValue: ``, + changefeedbase.OptCompression: ``, //`gzip`, } ts := func(i int64) hlc.Timestamp { return hlc.Timestamp{WallTime: i} } e, err := makeJSONEncoder(opts) @@ -107,77 +127,88 @@ func TestCloudStorageSink(t *testing.T) { require.Equal(t, `{"resolved":"5.0000000000"}`, string(resolvedFile)) }) t.Run(`single-node`, func(t *testing.T) { - t1 := &sqlbase.TableDescriptor{Name: `t1`} - t2 := &sqlbase.TableDescriptor{Name: `t2`} - - testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} - sf := span.MakeFrontier(testSpan) - timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} - dir := `single-node` - s, err := makeCloudStorageSink( - ctx, `nodelocal:///`+dir, 1, unlimitedFileSize, - settings, opts, timestampOracle, externalStorageFromURI, - ) - require.NoError(t, err) - s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. - - // Empty flush emits no files. - require.NoError(t, s.Flush(ctx)) - require.Equal(t, []string(nil), slurpDir(t, dir)) - - // Emitting rows and flushing should write them out in one file per table. Note - // the ordering among these two files is non deterministic as either of them could - // be flushed first (and thus be assigned fileID 0). - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1))) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1))) - require.NoError(t, s.EmitRow(ctx, t2, noKey, []byte(`w1`), ts(3))) - require.NoError(t, s.Flush(ctx)) - expected := []string{ - "v1\nv2\n", - "w1\n", - } - actual := slurpDir(t, dir) - sort.Strings(actual) - require.Equal(t, expected, actual) - - // Flushing with no new emits writes nothing new. - require.NoError(t, s.Flush(ctx)) - actual = slurpDir(t, dir) - sort.Strings(actual) - require.Equal(t, expected, actual) - - // Without a flush, nothing new shows up. - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3))) - actual = slurpDir(t, dir) - sort.Strings(actual) - require.Equal(t, expected, actual) - - // Note that since we haven't forwarded `testSpan` yet, all files initiated until - // this point must have the same `frontier` timestamp. Since fileID increases - // monotonically, the last file emitted should be ordered as such. - require.NoError(t, s.Flush(ctx)) - require.Equal(t, []string{ - "v3\n", - }, slurpDir(t, dir)[2:]) - - // Data from different versions of a table is put in different files, so that we - // can guarantee that all rows in any given file have the same schema. - // We also advance `testSpan` and `Flush` to make sure these new rows are read - // after the rows emitted above. - require.True(t, sf.Forward(testSpan, ts(4))) - require.NoError(t, s.Flush(ctx)) - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v4`), ts(4))) - t1.Version = 2 - require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v5`), ts(5))) - require.NoError(t, s.Flush(ctx)) - expected = []string{ - "v4\n", - "v5\n", + before := opts[changefeedbase.OptCompression] + // Compression codecs include buffering that interferes with other tests, + // e.g. the bucketing test that configures very small flush sizes. + defer func() { + opts[changefeedbase.OptCompression] = before + }() + for _, compression := range []string{"", "gzip"} { + opts[changefeedbase.OptCompression] = compression + t.Run("compress="+compression, func(t *testing.T) { + t1 := &sqlbase.TableDescriptor{Name: `t1`} + t2 := &sqlbase.TableDescriptor{Name: `t2`} + + testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")} + sf := span.MakeFrontier(testSpan) + timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf} + dir := `single-node` + compression + s, err := makeCloudStorageSink( + ctx, `nodelocal:///`+dir, 1, unlimitedFileSize, + settings, opts, timestampOracle, externalStorageFromURI, + ) + require.NoError(t, err) + s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID. + + // Empty flush emits no files. + require.NoError(t, s.Flush(ctx)) + require.Equal(t, []string(nil), slurpDir(t, dir)) + + // Emitting rows and flushing should write them out in one file per table. Note + // the ordering among these two files is non deterministic as either of them could + // be flushed first (and thus be assigned fileID 0). + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v1`), ts(1))) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v2`), ts(1))) + require.NoError(t, s.EmitRow(ctx, t2, noKey, []byte(`w1`), ts(3))) + require.NoError(t, s.Flush(ctx)) + expected := []string{ + "v1\nv2\n", + "w1\n", + } + actual := slurpDir(t, dir) + sort.Strings(actual) + require.Equal(t, expected, actual) + + // Flushing with no new emits writes nothing new. + require.NoError(t, s.Flush(ctx)) + actual = slurpDir(t, dir) + sort.Strings(actual) + require.Equal(t, expected, actual) + + // Without a flush, nothing new shows up. + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v3`), ts(3))) + actual = slurpDir(t, dir) + sort.Strings(actual) + require.Equal(t, expected, actual) + + // Note that since we haven't forwarded `testSpan` yet, all files initiated until + // this point must have the same `frontier` timestamp. Since fileID increases + // monotonically, the last file emitted should be ordered as such. + require.NoError(t, s.Flush(ctx)) + require.Equal(t, []string{ + "v3\n", + }, slurpDir(t, dir)[2:]) + + // Data from different versions of a table is put in different files, so that we + // can guarantee that all rows in any given file have the same schema. + // We also advance `testSpan` and `Flush` to make sure these new rows are read + // after the rows emitted above. + require.True(t, sf.Forward(testSpan, ts(4))) + require.NoError(t, s.Flush(ctx)) + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v4`), ts(4))) + t1.Version = 2 + require.NoError(t, s.EmitRow(ctx, t1, noKey, []byte(`v5`), ts(5))) + require.NoError(t, s.Flush(ctx)) + expected = []string{ + "v4\n", + "v5\n", + } + actual = slurpDir(t, dir) + actual = actual[len(actual)-2:] + sort.Strings(actual) + require.Equal(t, expected, actual) + }) } - actual = slurpDir(t, dir) - actual = actual[len(actual)-2:] - sort.Strings(actual) - require.Equal(t, expected, actual) }) t.Run(`multi-node`, func(t *testing.T) {