Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106195: storage: ensure no filesystem use after close in test builds r=jbowens a=jbowens

In test builds, panic if any of the VFS filesystem facilities are used after the Engine is closed.

Epic: none
Informs #106119.
Resolves #110293.
Release note: None

110576: kvserver: add kill switch for scheduled rangefeed processor r=erikgrinaker a=aliher1911

This commit adds env var `COCKROACH_RANGEFEED_DISABLE_SCHEDULER` which is preventing creation of scheduler based rangefeed processors forcing replicas to always create legacy processors.
This feature allows nodes to be recovered if scheduler based processor failes to process requests.

Epic: none
Fixes: #110559

Release note: None

110625: sql: fix cleanup of not exhausted pausable portals in some cases r=yuzefovich a=yuzefovich

**execinfra: relax RowSource.ConsumerClosed contract**

This commit adjusts RowSource.ConsumerClosed contract so that it's
possible for this method to be called multiple times. Previously, vast
majority of implementations already supported that (and some actually
assumed that), but there were a couple that would result in a panic or
an error if called multiple times - those have been relaxed.

This is needed for the follow-up commit which will introduce an
unconditional call of this method on the "head" processor of the flow
(needed for pausable portals execution model).

Release note: None

**sql: fix cleanup of not exhausted pausable portals in some cases**

Previously, if we executed a flow (either vectorized or row-based) that
contains a row-by-row processor in "pausable portal" model and didn't
fully exhaust that flow (meaning that it could still produce more rows),
this could result in an error when closing the portal because that
processor might not be closed properly. Vectorized operators aren't
affected by this, but row-by-row processors effectively require
`ConsumerClosed` method to be called on them, and this might not happen.
In particular, it was assumed that this method would get called by
`execinfra.Run` loop, but for pausable portal model we needed to adjust
it so that the loop could be exited (and then re-entered) when switching
to another portal; thus, we lost the guarantee that `ConsumerClosed` is
always called.

This commit fixes this problem by teaching both vectorized and row-based
flows to always call `ConsumerClosed` on their "head" processor on
`Cleanup` (which is called when closing the portal). Now, `Cleanup`
calls `ConsumerClosed` unconditionally (when the flow _might_ be running
in the pausable portal mode), and this is safe given the interface
adjustment in the previous commit.

Fixes: #110486

Release note (bug fix): Previously, when evaluating some queries (most
likely with a lookup join) via "multiple active portals" execution
mode (preview feature that can be enabled via
`multiple_active_portals_enabled` session variable) CockroachDB could
encounter an internal error like `unexpected 40960 leftover bytes` if
that portal wasn't fully consumed. This is now fixed.

Co-authored-by: Jackson Owens <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
4 people committed Sep 14, 2023
4 parents 36b61a8 + 9da340c + 9962d75 + 568cf9b commit d05cdeb
Show file tree
Hide file tree
Showing 15 changed files with 403 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type generativeSplitAndScatterProcessor struct {
chunkEntrySplitAndScatterers []splitAndScatterer

// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiple times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type splitAndScatterProcessor struct {

scatterer splitAndScatterer
// cancelScatterAndWaitForWorker cancels the scatter goroutine and waits for
// it to finish.
// it to finish. It can be called multiples times.
cancelScatterAndWaitForWorker func()

doneScatterCh chan entryNode
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/abortspan",
Expand Down Expand Up @@ -417,6 +418,7 @@ go_test(
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftlog",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/replicastats",
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) {

// Write some arbitrary data only to store on n2. Inconsistent key "e"!
s2 := tc.GetFirstStoreFromServer(t, 1)
s2AuxDir := s2.TODOEngine().GetAuxiliaryDir()
var val roachpb.Value
val.SetInt(42)
// Put an inconsistent key "e" to s2, and have s1 and s3 still agree.
Expand Down Expand Up @@ -411,9 +412,10 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
assert.Equal(t, hashes[0], hashes[2]) // s1 and s3 agree
assert.NotEqual(t, hashes[0], hashes[1]) // s2 diverged

// A death rattle should have been written on s2.
eng := s2.TODOEngine()
f, err := eng.Open(base.PreventedStartupFile(eng.GetAuxiliaryDir()))
// A death rattle should have been written on s2. Note that the VFSes are
// zero-indexed whereas store IDs are one-indexed.
fs := stickyVFSRegistry.Get("1")
f, err := fs.Open(base.PreventedStartupFile(s2AuxDir))
require.NoError(t, err)
b, err := io.ReadAll(f)
require.NoError(t, err)
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ var RangeFeedUseScheduler = settings.RegisterBoolSetting(
false,
)

// RangefeedSchedulerDisabled is a kill switch for scheduler based rangefeed
// processors. To be removed in 24.1 after new processor becomes default.
var RangefeedSchedulerDisabled = envutil.EnvOrDefaultBool("COCKROACH_RANGEFEED_DISABLE_SCHEDULER",
false)

func init() {
// Inject into kvserverbase to allow usage from kvcoord.
kvserverbase.RangeFeedRefreshInterval = RangeFeedRefreshInterval
Expand Down Expand Up @@ -407,7 +412,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
feedBudget := r.store.GetStoreConfig().RangefeedBudgetFactory.CreateBudget(r.startKey)

var sched *rangefeed.Scheduler
if RangeFeedUseScheduler.Get(&r.ClusterSettings().SV) {
if shouldUseRangefeedScheduler(&r.ClusterSettings().SV) {
sched = r.store.getRangefeedScheduler()
}

Expand Down Expand Up @@ -843,3 +848,14 @@ func (r *Replica) ensureClosedTimestampStarted(ctx context.Context) *kvpb.Error
}
return nil
}

func shouldUseRangefeedScheduler(sv *settings.Values) bool {
return RangeFeedUseScheduler.Get(sv) && !RangefeedSchedulerDisabled
}

// TestGetReplicaRangefeedProcessor exposes rangefeed processor for test
// introspection. Note that while retrieving processor is threadsafe, invoking
// processor methods should be done with caution to not break any invariants.
func TestGetReplicaRangefeedProcessor(r *Replica) rangefeed.Processor {
return r.getRangefeedProcessor()
}
57 changes: 57 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
clientrf "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -501,6 +503,61 @@ func waitErrorFuture(f *future.ErrorFuture) error {
return resultErr
}

func TestScheduledProcessorKillSwitch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

kvserver.RangefeedSchedulerDisabled = true
defer func() { kvserver.RangefeedSchedulerDisabled = false }()

ctx := context.Background()
ts, err := serverutils.NewServer(base.TestServerArgs{
DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant,
})
require.NoError(t, err, "failed to start test server")
require.NoError(t, ts.Start(ctx), "start server")
defer ts.Stopper().Stop(ctx)

db := ts.SystemLayer().SQLConn(t, "")
_, err = db.Exec("set cluster setting kv.rangefeed.enabled = t")
require.NoError(t, err, "can't enable rangefeeds")
_, err = db.Exec("set cluster setting kv.rangefeed.scheduler.enabled = t")
require.NoError(t, err, "can't enable rangefeed scheduler")

sr, err := ts.ScratchRange()
require.NoError(t, err, "can't create scratch range")
f := ts.RangeFeedFactory().(*clientrf.Factory)
rf, err := f.RangeFeed(ctx, "test-feed", []roachpb.Span{{Key: sr, EndKey: sr.PrefixEnd()}},
hlc.Timestamp{},
func(ctx context.Context, value *kvpb.RangeFeedValue) {},
)
require.NoError(t, err, "failed to start rangefeed")
defer rf.Close()

rd, err := ts.LookupRange(sr)
require.NoError(t, err, "failed to get descriptor for scratch range")

stores := ts.GetStores().(*kvserver.Stores)
_ = stores.VisitStores(func(s *kvserver.Store) error {
repl, err := s.GetReplica(rd.RangeID)
require.NoError(t, err, "failed to find scratch range replica in store")
var proc rangefeed.Processor
// Note that we can't rely on checkpoint or event because client rangefeed
// call can return and emit first checkpoint and data before processor is
// actually attached to replica.
testutils.SucceedsSoon(t, func() error {
proc = kvserver.TestGetReplicaRangefeedProcessor(repl)
if proc == nil {
return errors.New("scratch range must have processor")
}
return nil
})
require.IsType(t, (*rangefeed.LegacyProcessor)(nil), proc,
"kill switch didn't prevent scheduled processor creation")
return nil
})
}

func TestReplicaRangefeedErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ func (f *vectorizedFlow) Setup(
// Resume is part of the Flow interface.
func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) {
if f.batchFlowCoordinator != nil {
// Resume is expected to be called only for pausable portals, for which
// we must be using limitedCommandResult which currently doesn't
// implement the execinfra.BatchReceiver interface, so we shouldn't have
// a batch flow coordinator here.
recv.Push(
nil, /* row */
&execinfrapb.ProducerMetadata{
Expand Down Expand Up @@ -384,6 +388,9 @@ func (f *vectorizedFlow) Cleanup(ctx context.Context) {
// as well as closes all the closers.
f.creator.cleanup(ctx)

// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()

f.tempStorage.Lock()
created := f.tempStorage.path != ""
f.tempStorage.Unlock()
Expand Down
16 changes: 2 additions & 14 deletions pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// RowChannelBufSize is the default buffer size of a RowChannel.
Expand Down Expand Up @@ -167,7 +165,7 @@ type RowSource interface {
ConsumerDone()

// ConsumerClosed informs the source that the consumer is done and will not
// make any more calls to Next(). Must only be called once on a given
// make any more calls to Next(). Must be called at least once on a given
// RowSource.
//
// Like ConsumerDone(), if the consumer of the source stops consuming rows
Expand Down Expand Up @@ -423,16 +421,6 @@ func (rb *rowSourceBase) consumerDone() {
uint32(NeedMoreRows), uint32(DrainRequested))
}

// consumerClosed helps processors implement RowSource.ConsumerClosed. The name
// is only used for debug messages.
func (rb *rowSourceBase) consumerClosed(name string) {
status := ConsumerStatus(atomic.LoadUint32((*uint32)(&rb.ConsumerStatus)))
if status == ConsumerClosed {
logcrash.ReportOrPanic(context.Background(), nil, "%s already closed", redact.Safe(name))
}
atomic.StoreUint32((*uint32)(&rb.ConsumerStatus), uint32(ConsumerClosed))
}

// RowChannel is a thin layer over a RowChannelMsg channel, which can be used to
// transfer rows between goroutines.
type RowChannel struct {
Expand Down Expand Up @@ -533,7 +521,7 @@ func (rc *RowChannel) ConsumerDone() {

// ConsumerClosed is part of the RowSource interface.
func (rc *RowChannel) ConsumerClosed() {
rc.consumerClosed("RowChannel")
atomic.StoreUint32((*uint32)(&rc.ConsumerStatus), uint32(ConsumerClosed))
numSenders := atomic.LoadInt32(&rc.numSenders)
// Drain (at most) numSenders messages in case senders are blocked trying to
// emit a row.
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,30 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
return onCleanupStart, onCleanupEnd
}

// ConsumerClosedOnHeadProc calls ConsumerClosed method on the "head" processor
// of this flow to make sure that all resources are released. This is needed for
// pausable portal execution model where execinfra.Run might never call
// ConsumerClosed on the source (i.e. the "head" processor).
//
// The method is only called if:
// - there is exactly 1 processor in the flow that runs in its own goroutine
// (which is always the case for pausable portal model at this time)
// - that single processor implements execinfra.RowSource interface (those
// processors that don't implement it shouldn't be running through pausable
// portal model).
//
// Otherwise, this method is a noop.
func (f *FlowBase) ConsumerClosedOnHeadProc() {
if len(f.processors) != 1 {
return
}
rs, ok := f.processors[0].(execinfra.RowSource)
if !ok {
return
}
rs.ConsumerClosed()
}

// Cleanup is part of the Flow interface.
// NOTE: this implements only the shared cleanup logic between row-based and
// vectorized flows.
Expand Down
99 changes: 99 additions & 0 deletions pkg/sql/pgwire/testdata/pgtest/multiple_active_portals
Original file line number Diff line number Diff line change
Expand Up @@ -1306,3 +1306,102 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end

subtest incomplete_processor_cleanup

send
Query {"String": "DEALLOCATE ALL;"}
----

until
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DEALLOCATE ALL"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "CREATE TABLE t110486_1 (k1 PRIMARY KEY) AS VALUES (1), (2), (3);"}
Query {"String": "CREATE TABLE t110486_2 (k2 PRIMARY KEY) AS VALUES (1), (2), (3);"}
----

until
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CommandComplete","CommandTag":"CREATE TABLE AS"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Run the query that uses a row-by-row processor (the join reader) which
# previously wasn't cleaned up properly. This first execution doesn't trigger
# #110846 bug because it is effectively run in EXPLAIN ANALYZE flavor due to txn
# stats sampling (we prefer to run it explicitly rather than disable the txn
# stats sampling).
send
Parse {"Name": "q1", "Query": "SELECT * FROM t110486_1 INNER LOOKUP JOIN t110486_2 ON k1 = k2;"}
Bind {"DestinationPortal": "p1", "PreparedStatement": "q1"}
Execute {"Portal": "p1", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"ParseComplete"}
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Now run the query again (not in EXPLAIN ANALYZE flavor)
send
Bind {"DestinationPortal": "p2", "PreparedStatement": "q1"}
Execute {"Portal": "p2", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Do the same again using the row-based engine directly.
send crdb_only
Query {"String": "SET vectorize = off;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Bind {"DestinationPortal": "p3", "PreparedStatement": "q1"}
Execute {"Portal": "p3", "MaxRows": 1}
Sync
----

until
ReadyForQuery
----
{"Type":"BindComplete"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"1"}]}
{"Type":"PortalSuspended"}
{"Type":"ReadyForQuery","TxStatus":"I"}

send crdb_only
Query {"String": "RESET vectorize;"}
----

until crdb_only ignore=NoticeResponse
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"RESET"}
{"Type":"ReadyForQuery","TxStatus":"I"}

subtest end
2 changes: 2 additions & 0 deletions pkg/sql/rowflow/row_based_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ func (f *rowBasedFlow) Cleanup(ctx context.Context) {
startCleanup, endCleanup := f.FlowBase.GetOnCleanupFns()
startCleanup()
defer endCleanup()
// Ensure that the "head" processor is always closed.
f.ConsumerClosedOnHeadProc()
f.FlowBase.Cleanup(ctx)
f.Release()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,7 @@ func TestEngineFS(t *testing.T) {
"8c: f.close",
"8d: f = open /bar",
"8e: f.read 3 == ghe",
"8f: f.close",
"9a: create-dir /dir1",
"9b: create /dir1/bar",
"9c: list-dir /dir1 == bar",
Expand Down
Loading

0 comments on commit d05cdeb

Please sign in to comment.