Skip to content

Commit

Permalink
Merge pull request #63633 from miretskiy/backport20.2-63406
Browse files Browse the repository at this point in the history
release-20.2: changefeedccl: Use memory monitor for kafka and cloud sinks.
  • Loading branch information
miretskiy authored Apr 15, 2021
2 parents 4bdc2bb + 8451004 commit 45fa9ef
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 92 deletions.
52 changes: 29 additions & 23 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,37 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
spans, sf := ca.setupSpans()
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf, initialInclusiveLowerBound: ca.spec.Feed.StatementTime}
nodeID, err := ca.flowCtx.EvalCtx.NodeID.OptionalNodeIDErr(48274)

if err != nil {
ca.MoveToDraining(err)
return ctx
}

if ca.sink, err = getSink(
var knobs TestingKnobs
if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
knobs = *cfKnobs
}

// TODO(yevgeniy): Introduce separate changefeed monitor that's a parent
// for all changefeeds to control memory allocated to all changefeeds.
pool := ca.flowCtx.Cfg.BackfillerMonitor
if knobs.MemMonitor != nil {
pool = knobs.MemMonitor
}
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV)
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", limit, pool)
kvFeedMemMon.Start(ctx, pool, mon.BoundAccount{})
ca.kvFeedMemMon = kvFeedMemMon

// TODO(yevgeniy): getSink is getting to be quite a kitchen sink -- refactor.
ca.sink, err = getSink(
ctx, ca.spec.Feed.SinkURI, nodeID, ca.spec.Feed.Opts, ca.spec.Feed.Targets,
ca.flowCtx.Cfg.Settings, timestampOracle, ca.flowCtx.Cfg.ExternalStorageFromURI, ca.spec.User,
); err != nil {
err = MarkRetryableError(err)
// Early abort in the case that there is an error creating the sink.
kvFeedMemMon.MakeBoundAccount(),
)

if err != nil {
ca.MoveToDraining(err)
ca.cancel()
return ctx
}

Expand All @@ -182,22 +200,6 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
ca.sink = makeMetricsSink(metrics, ca.sink)
ca.sink = &errorWrapperSink{wrapped: ca.sink}

var knobs TestingKnobs
if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
knobs = *cfKnobs
}

// TODO(yevgeniy): Introduce separate changefeed monitor that's a parent
// for all changefeeds to control memory allocated to all changefeeds.
pool := ca.flowCtx.Cfg.BackfillerMonitor
if knobs.MemMonitor != nil {
pool = knobs.MemMonitor
}
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV)
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", limit, pool)
kvFeedMemMon.Start(ctx, pool, mon.BoundAccount{})
ca.kvFeedMemMon = kvFeedMemMon

buf := kvfeed.MakeChanBuffer()
leaseMgr := ca.flowCtx.Cfg.LeaseManager.(*lease.Manager)
_, withDiff := ca.spec.Feed.Opts[changefeedbase.OptDiff]
Expand Down Expand Up @@ -550,10 +552,14 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
// Pass a nil oracle because this sink is only used to emit resolved timestamps
// but the oracle is only used when emitting row updates.
var nilOracle timestampLowerBoundOracle
if cf.sink, err = getSink(
// TODO(yevgeniy): Evaluate if we should introduce changefeed specific monitor.
mm := cf.flowCtx.Cfg.BackfillerMonitor
cf.sink, err = getSink(
ctx, cf.spec.Feed.SinkURI, nodeID, cf.spec.Feed.Opts, cf.spec.Feed.Targets,
cf.flowCtx.Cfg.Settings, nilOracle, cf.flowCtx.Cfg.ExternalStorageFromURI, cf.spec.User,
); err != nil {
mm.MakeBoundAccount(),
)
if err != nil {
err = MarkRetryableError(err)
cf.MoveToDraining(err)
return ctx
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/cloudimpl"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -289,7 +290,8 @@ func changefeedPlanHook(
var nilOracle timestampLowerBoundOracle
canarySink, err := getSink(
ctx, details.SinkURI, nodeID, details.Opts, details.Targets,
settings, nilOracle, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, p.User(),
settings, nilOracle, p.ExecCfg().DistSQLSrv.ExternalStorageFromURI,
p.User(), mon.BoundAccount{},
)
if err != nil {
return MaybeStripRetryableErrorMarker(err)
Expand Down
73 changes: 57 additions & 16 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -75,6 +76,7 @@ func getSink(
timestampOracle timestampLowerBoundOracle,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
user string,
acc mon.BoundAccount,
) (Sink, error) {
u, err := url.Parse(sinkURI)
if err != nil {
Expand Down Expand Up @@ -178,7 +180,7 @@ func getSink(
}

makeSink = func() (Sink, error) {
return makeKafkaSink(cfg, u.Host, targets, opts)
return makeKafkaSink(ctx, cfg, u.Host, targets, opts, acc)
}
case isCloudStorageSink(u):
fileSizeParam := q.Get(changefeedbase.SinkParamFileSize)
Expand All @@ -197,7 +199,7 @@ func getSink(
makeSink = func() (Sink, error) {
return makeCloudStorageSink(
ctx, u.String(), nodeID, fileSize, settings,
opts, timestampOracle, makeExternalStorageFromURI, user,
opts, timestampOracle, makeExternalStorageFromURI, user, acc,
)
}
case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL:
Expand Down Expand Up @@ -310,6 +312,7 @@ type kafkaSinkConfig struct {
// kafkaSink emits to Kafka asynchronously. It is not concurrency-safe; all
// calls to Emit and Flush should be from the same goroutine.
type kafkaSink struct {
ctx context.Context
cfg kafkaSinkConfig
client sarama.Client
producer sarama.AsyncProducer
Expand All @@ -324,6 +327,7 @@ type kafkaSink struct {
// Only synchronized between the client goroutine and the worker goroutine.
mu struct {
syncutil.Mutex
mem mon.BoundAccount
inflight int64
flushErr error
flushCh chan struct{}
Expand Down Expand Up @@ -419,16 +423,22 @@ func getSaramaConfig(opts map[string]string) (config *saramaConfig, err error) {
}

func makeKafkaSink(
ctx context.Context,
cfg kafkaSinkConfig,
bootstrapServers string,
targets jobspb.ChangefeedTargets,
opts map[string]string,
acc mon.BoundAccount,
) (Sink, error) {
sink := &kafkaSink{cfg: cfg}
sink := &kafkaSink{
ctx: ctx,
cfg: cfg,
}
sink.topics = make(map[string]struct{})
for _, t := range targets {
sink.topics[cfg.kafkaTopicPrefix+SQLNameToKafkaName(t.StatementTimeName)] = struct{}{}
}
sink.mu.mem = acc

config := sarama.NewConfig()
config.ClientID = `CockroachDB`
Expand Down Expand Up @@ -510,9 +520,14 @@ func (s *kafkaSink) start() {

// Close implements the Sink interface.
func (s *kafkaSink) Close() error {
defer func() {
s.mu.Lock()
s.mu.mem.Close(s.ctx)
s.mu.Unlock()
}()

close(s.stopWorkerCh)
s.worker.Wait()

// If we're shutting down, we don't care what happens to the outstanding
// messages, so ignore this error.
_ = s.producer.Close()
Expand Down Expand Up @@ -626,42 +641,68 @@ func (s *kafkaSink) Flush(ctx context.Context) error {
}
}

func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
func kafkaMessageBytes(m *sarama.ProducerMessage) (s int64) {
if m.Key != nil {
s += int64(m.Key.Length())
}
if m.Value != nil {
s += int64(m.Value.Length())
}
return
}

func (s *kafkaSink) startInflightMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.mu.mem.Grow(ctx, kafkaMessageBytes(msg)); err != nil {
return err
}

s.mu.inflight++
inflight := s.mu.inflight
s.mu.Unlock()

if log.V(2) {
log.Infof(ctx, "emitting %d inflight records to kafka", s.mu.inflight)
}
return nil
}

func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
if err := s.startInflightMessage(ctx, msg); err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
case s.producer.Input() <- msg:
}

if log.V(2) {
log.Infof(ctx, "emitted %d inflight records to kafka", inflight)
}
return nil
}

func (s *kafkaSink) workerLoop() {
defer s.worker.Done()

for {
var ackMsg *sarama.ProducerMessage
var ackError error

select {
case <-s.stopWorkerCh:
return
case <-s.producer.Successes():
case m := <-s.producer.Successes():
ackMsg = m
case err := <-s.producer.Errors():
s.mu.Lock()
if s.mu.flushErr == nil {
s.mu.flushErr = err
}
s.mu.Unlock()
ackMsg, ackError = err.Msg, err.Err
}

s.mu.Lock()
s.mu.inflight--
s.mu.mem.Shrink(s.ctx, kafkaMessageBytes(ackMsg))
if s.mu.flushErr == nil && ackError != nil {
s.mu.flushErr = ackError
}

if s.mu.inflight == 0 && s.mu.flushCh != nil {
s.mu.flushCh <- struct{}{}
s.mu.flushCh = nil
Expand Down
32 changes: 23 additions & 9 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/google/btree"
)
Expand Down Expand Up @@ -271,8 +272,8 @@ type cloudStorageSink struct {
settings *cluster.Settings
partitionFormat string

ext string
recordDelimFn func(io.Writer) error
ext string
rowDelimiter []byte

compression string

Expand All @@ -291,6 +292,9 @@ type cloudStorageSink struct {
dataFileTs string
dataFilePartition string
prevFilename string

// Memory used by this sink
mem mon.BoundAccount
}

const sinkCompressionGzip = "gzip"
Expand All @@ -307,6 +311,7 @@ func makeCloudStorageSink(
timestampOracle timestampLowerBoundOracle,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
user string,
acc mon.BoundAccount,
) (Sink, error) {
// Date partitioning is pretty standard, so no override for now, but we could
// plumb one down if someone needs it.
Expand All @@ -323,6 +328,7 @@ func makeCloudStorageSink(
timestampOracle: timestampOracle,
// TODO(dan,ajwerner): Use the jobs framework's session ID once that's available.
jobSessionID: generateChangefeedSessionID(),
mem: acc,
}
if timestampOracle != nil {
s.dataFileTs = cloudStorageFormatTime(timestampOracle.inclusiveLowerBoundTS())
Expand All @@ -334,10 +340,7 @@ func makeCloudStorageSink(
// TODO(dan): It seems like these should be on the encoder, but that
// would require a bit of refactoring.
s.ext = `.ndjson`
s.recordDelimFn = func(w io.Writer) error {
_, err := w.Write([]byte{'\n'})
return err
}
s.rowDelimiter = []byte{'\n'}
default:
return nil, errors.Errorf(`this sink is incompatible with %s=%s`,
changefeedbase.OptFormat, opts[changefeedbase.OptFormat])
Expand Down Expand Up @@ -399,11 +402,17 @@ func (s *cloudStorageSink) EmitRow(

file := s.getOrCreateFile(table.GetName(), table.GetVersion())

// TODO(dan): Memory monitoring for this
oldCap := file.buf.Cap()
if _, err := file.Write(value); err != nil {
return err
}
if err := s.recordDelimFn(file); err != nil {
if _, err := file.Write(s.rowDelimiter); err != nil {
return err
}

// Grow buffered memory. It's okay that we do it after the fact
// (and if not, we're in a deeper problem and probably OOMed by now).
if err := s.mem.Grow(ctx, int64(file.buf.Cap()-oldCap)); err != nil {
return err
}

Expand Down Expand Up @@ -529,12 +538,17 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
"precedes a file emitted before: %s", filename, s.prevFilename)
}
s.prevFilename = filename
return s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes()))
if err := s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
return err
}
s.mem.Shrink(ctx, int64(file.buf.Cap()))
return nil
}

// Close implements the Sink interface.
func (s *cloudStorageSink) Close() error {
s.files = nil
s.mem.Close(context.Background())
return s.es.Close()
}

Expand Down
Loading

0 comments on commit 45fa9ef

Please sign in to comment.