Skip to content

Commit

Permalink
Merge #63891
Browse files Browse the repository at this point in the history
63891: changefeedccl: Correctly account for memory when closing gzip files. r=miretskiy a=miretskiy

Correctly account for used memory when closing gzip compressed cloud storage
file.

Fixes #63888 

Release Notes: Bug fix; correctly account for used memory when closing
compressed files.

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Apr 19, 2021
2 parents 1c10372 + 76ad31b commit eaedf38
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
12 changes: 9 additions & 3 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,15 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
return nil
}

// If the file is written via compression codec, close the codec to ensure it
// has flushed to the underlying buffer.
// Release memory allocated for this file. Note, closing codec
// below may as well write more data to our buffer (and that may cause buffer
// to grow due to reallocation). But we don't account for that additional memory
// because a) we don't know if buffer will be resized (nor by how much), and
// b) if we're out of memory we'd OOMed when trying to close codec anyway.
defer func(delta int) {
s.mem.Shrink(ctx, int64(delta))
}(file.buf.Cap())

if file.codec != nil {
if err := file.codec.Close(); err != nil {
return err
Expand All @@ -538,7 +545,6 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
if err := s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
return err
}
s.mem.Shrink(ctx, int64(file.buf.Cap()))
return nil
}

Expand Down
40 changes: 40 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -575,4 +576,43 @@ func TestCloudStorageSink(t *testing.T) {
}
require.Regexp(t, "memory budget exceeded", err)
})
t.Run(`memory-accounting`, func(t *testing.T) {
before := opts[changefeedbase.OptCompression]
// Compression codecs include buffering that interferes with other tests,
// e.g. the bucketing test that configures very small flush sizes.
defer func() {
opts[changefeedbase.OptCompression] = before
}()

// A bit of magic constant: we're using bytes.Buffer internally, which
// allocates "small" buffer (64 bytes) initially. We will try to target
// our file size to be less than that value; but we will write
// larger amount of data (thus, hopefully causing multiple bytes.Buffer
// reallocs, plus a file flush).
const targetFileSize = 63

rnd, _ := randutil.NewPseudoRand()
for _, compression := range []string{"", "gzip"} {
opts[changefeedbase.OptCompression] = compression
t.Run("compress="+compression, func(t *testing.T) {
t1 := makeTopic(`t1`)
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
sinkDir := `memory-accounting`
s, err := makeCloudStorageSink(
ctx, `nodelocal://0/`+sinkDir, 1, targetFileSize,
settings, opts, timestampOracle, externalStorageFromURI, user, memAcc,
)
require.NoError(t, err)
defer func() { require.NoError(t, s.Close()) }()
s.(*cloudStorageSink).sinkID = 7 // Force a deterministic sinkID.

data := randutil.RandBytes(rnd, 1+rnd.Intn(targetFileSize))
require.NoError(t, s.EmitRow(ctx, t1, noKey, data, ts(0)))

require.NoError(t, s.Flush(ctx))
})
}
})
}

0 comments on commit eaedf38

Please sign in to comment.