Skip to content

Commit

Permalink
changefeedccl: Release allocation when skipping events
Browse files Browse the repository at this point in the history
The changefeed (or KV feed to be precise) may skip
some events when "scan boundary" is reached.
Scan boundary is a timestamp when certain event occurs --
usually a schema change.  But, it may also occur
when the `end_time` option is set.

The KV feed ignores events that have MVCC timestamp
greater or equal to the scan boundary event.

Unfortunately, due to a long outstanding bug, the memory
allocation associated with the event would not be released
when KV feed decides to skip the event.

Because of this, allocated memory was "leaked" and not reclaimed.
If enough additional events arrive, those leaked events may
account for all of the memory budget, thus leading to inability
for additional events to be added.

This bug impacts any changefeeds running with the `end_time`
option set.  It might also impact changefeeds that observe
normal schema change event, though this situation is highly unlikely
(the same transaction that perform schema change had to have
modified sufficient number of rows in the table to fill
up all of the memory budget).

Fixes #108040

Release note (enterprise change): Fix a potential "deadlock" when
running changefeed with `end_time` option set.
  • Loading branch information
Yevgeniy Miretskiy committed Aug 2, 2023
1 parent aebcd92 commit 7bdb71d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 3 deletions.
67 changes: 67 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7841,6 +7841,25 @@ func (s *memoryHoggingSink) Close() error {
return nil
}

type countEmittedRowsSink struct {
memoryHoggingSink
numRows int64 // Accessed atomically; not using atomic.Int64 to make backports possible.
}

func (s *countEmittedRowsSink) EmitRow(
ctx context.Context,
topic TopicDescriptor,
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
) error {
alloc.Release(ctx)
atomic.AddInt64(&s.numRows, 1)
return nil
}

var _ Sink = (*countEmittedRowsSink)(nil)

func TestChangefeedFlushesSinkToReleaseMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -7891,6 +7910,54 @@ func TestChangefeedFlushesSinkToReleaseMemory(t *testing.T) {
require.Greater(t, sink.numFlushes(), 0)
}

// Test verifies that KV feed does not leak event memory allocation
// when it reaches end_time or scan boundary.
func TestKVFeedDoesNotLeakMemoryWhenSkippingEvents(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, stopServer := makeServer(t)
defer stopServer()

sqlDB := sqlutils.MakeSQLRunner(s.DB)
knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)

// Arrange for a small memory budget.
knobs.MemMonitor = startMonitorWithBudget(4096)

// Arrange for custom sink to be used -- a sink that counts emitted rows.
sink := &countEmittedRowsSink{}
knobs.WrapSink = func(_ Sink, _ jobspb.JobID) Sink {
return sink
}
sqlDB.Exec(t, `CREATE TABLE foo(key INT PRIMARY KEY DEFAULT unique_rowid(), val INT)`)

startTime := s.Server.Clock().Now().AsOfSystemTime()

// Insert 123 rows -- this fills up our tiny memory buffer (~26 rows do)
// Collect statement timestamp -- this will become our end time.
var insertTimeStr string
sqlDB.QueryRow(t,
`INSERT INTO foo (val) SELECT * FROM generate_series(1, 123) RETURNING cluster_logical_timestamp();`,
).Scan(&insertTimeStr)
endTime := parseTimeToHLC(t, insertTimeStr).AsOfSystemTime()

// Start the changefeed, with end_time set to be equal to the insert time.
// KVFeed should ignore all events.
var jobID jobspb.JobID
sqlDB.QueryRow(t, `CREATE CHANGEFEED FOR foo INTO 'null:' WITH cursor = $1, end_time = $2`,
startTime, endTime).Scan(&jobID)

// If everything is fine (events are ignored, but their memory allocation is released),
// the changefeed should terminate. If not, we'll time out waiting for job.
waitForJobStatus(sqlDB, t, jobID, jobs.StatusSucceeded)

// No rows should have been emitted (all should have been filtered out due to end_time).
require.EqualValues(t, 0, atomic.LoadInt64(&sink.numRows))
}

func TestChangefeedMultiPodTenantPlanning(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
21 changes: 18 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ func copyFromSourceToDestUntilTableEvent(
) error {
var (
scanBoundary errBoundaryReached
endTimeIsSet = !endTime.IsEmpty()

// checkForScanBoundary takes in a new event's timestamp (event generated
// from rangefeed), and asks "Is some type of 'boundary' reached
Expand All @@ -591,7 +592,11 @@ func copyFromSourceToDestUntilTableEvent(
// If the scanBoundary is not nil, it either means that there is a table
// event boundary set or a boundary for the end time. If the boundary is
// for the end time, we should keep looking for table events.
_, isEndTimeBoundary := scanBoundary.(*errEndTimeReached)
isEndTimeBoundary := false
if endTimeIsSet {
_, isEndTimeBoundary = scanBoundary.(*errEndTimeReached)
}

if scanBoundary != nil && !isEndTimeBoundary {
return nil
}
Expand All @@ -606,7 +611,7 @@ func copyFromSourceToDestUntilTableEvent(
// precedence to table events.
if len(nextEvents) > 0 {
scanBoundary = &errTableEventReached{nextEvents[0]}
} else if !endTime.IsEmpty() && scanBoundary == nil {
} else if endTimeIsSet && scanBoundary == nil {
scanBoundary = &errEndTimeReached{
endTime: endTime,
}
Expand Down Expand Up @@ -687,6 +692,17 @@ func copyFromSourceToDestUntilTableEvent(
if err != nil {
return err
}

if skipEntry || scanBoundaryReached {
// We will skip this entry or outright terminate kvfeed (if boundary reached).
// Regardless of the reason, we must release this event memory allocation
// since other ranges might not have reached scan boundary yet.
// Failure to release this event allocation may prevent other events from being
// enqueued in the blocking buffer due to memory limit.
a := e.DetachAlloc()
a.Release(ctx)
}

if scanBoundaryReached {
// All component rangefeeds are now at the boundary.
// Break out of the ctxgroup by returning the sentinel error.
Expand All @@ -698,7 +714,6 @@ func copyFromSourceToDestUntilTableEvent(
return addEntry(e)
}
)

for {
e, err := source.Get(ctx)
if err != nil {
Expand Down

0 comments on commit 7bdb71d

Please sign in to comment.