Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: server: retry ambiguous kv/sql operations on node startup #100463

Merged
merged 3 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ ALL_TESTS = [
"//pkg/util/shuffle:shuffle_test",
"//pkg/util/slidingwindow:slidingwindow_test",
"//pkg/util/span:span_test",
"//pkg/util/startup:startup_test",
"//pkg/util/stop:stop_test",
"//pkg/util/stringarena:stringarena_test",
"//pkg/util/strutil:strutil_test",
Expand Down Expand Up @@ -2034,6 +2035,8 @@ GO_TARGETS = [
"//pkg/util/sort:sort",
"//pkg/util/span:span",
"//pkg/util/span:span_test",
"//pkg/util/startup:startup",
"//pkg/util/startup:startup_test",
"//pkg/util/stop:stop",
"//pkg/util/stop:stop_test",
"//pkg/util/stringarena:stringarena",
Expand Down Expand Up @@ -3005,6 +3008,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/slidingwindow:get_x_data",
"//pkg/util/sort:get_x_data",
"//pkg/util/span:get_x_data",
"//pkg/util/startup:get_x_data",
"//pkg/util/stop:get_x_data",
"//pkg/util/stringarena:get_x_data",
"//pkg/util/stringencoding:get_x_data",
Expand Down
7 changes: 7 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ type TestClusterArgs struct {
// A copy of an entry from this map will be copied to each individual server
// and potentially adjusted according to ReplicationMode.
ServerArgsPerNode map[int]TestServerArgs

// If reusable listeners is true, then restart should keep listeners untouched
// so that servers are kept on the same ports. It is up to the test to set
// proxy listeners to TestServerArgs.Listener that would survive
// net.Listener.Close() and then allow restarted server to use them again.
// See testutils.ListenerRegistry.
ReusableListeners bool
}

var (
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/util/quotapool",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -776,6 +777,8 @@ func unsetCanForwardReadTimestampFlag(ba *roachpb.BatchRequest) {
func (ds *DistSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
startup.AssertStartupRetry(ctx)

ds.incrementBatchCounters(&ba)

// TODO(nvanbenschoten): This causes ba to escape to the heap. Either
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ go_library(
"//pkg/util/quotapool",
"//pkg/util/retry",
"//pkg/util/schedulerlatency",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down Expand Up @@ -374,6 +375,7 @@ go_test(
"server_http_test.go",
"server_import_ts_test.go",
"server_internal_executor_factory_test.go",
"server_startup_test.go",
"server_systemlog_gc_test.go",
"server_test.go",
"settings_cache_test.go",
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/import_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
yaml "gopkg.in/yaml.v2"
Expand All @@ -38,6 +39,10 @@ import (
const maxBatchSize = 10000

func maybeImportTS(ctx context.Context, s *Server) (returnErr error) {
// We don't want to do startup retries as this is not meant to be run in
// production.
ctx = startup.WithoutChecks(ctx)

var deferError func(error)
{
var defErr error
Expand Down
56 changes: 31 additions & 25 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -940,35 +941,40 @@ func (n *Node) startWriteNodeStatus(frequency time.Duration) error {
// Immediately record summaries once on server startup. The update loop below
// will only update the key if it exists, to avoid race conditions during
// node decommissioning, so we have to error out if we can't create it.
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
n.stopper.ShouldQuiesce(),
"kv write node status", func(ctx context.Context) error {
return n.writeNodeStatus(ctx, 0 /* alertTTL */, false /* mustExist */)
}); err != nil {
return errors.Wrap(err, "error recording initial status summaries")
}
return n.stopper.RunAsyncTask(ctx, "write-node-status", func(ctx context.Context) {
// Write a status summary immediately; this helps the UI remain
// responsive when new nodes are added.
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Use an alertTTL of twice the ticker frequency. This makes sure that
// alerts don't disappear and reappear spuriously while at the same
// time ensuring that an alert doesn't linger for too long after having
// resolved.
//
// The status key must already exist, to avoid race conditions
// during decommissioning of this node. Decommissioning may be
// carried out by a different node, so this avoids resurrecting
// the status entry after the decommissioner has removed it.
// See Server.Decommission().
if err := n.writeNodeStatus(ctx, 2*frequency, true /* mustExist */); err != nil {
log.Warningf(ctx, "error recording status summaries: %s", err)
return n.stopper.RunAsyncTask(ctx, "write-node-status",
func(ctx context.Context) {
// Write a status summary immediately; this helps the UI remain
// responsive when new nodes are added.
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Use an alertTTL of twice the ticker frequency. This makes sure that
// alerts don't disappear and reappear spuriously while at the same
// time ensuring that an alert doesn't linger for too long after having
// resolved.
//
// The status key must already exist, to avoid race conditions
// during decommissioning of this node. Decommissioning may be
// carried out by a different node, so this avoids resurrecting
// the status entry after the decommissioner has removed it.
// See Server.Decommission().
if err := n.writeNodeStatus(ctx, 2*frequency, true /* mustExist */); err != nil {
log.Warningf(ctx, "error recording status summaries: %s", err)
}
case <-n.stopper.ShouldQuiesce():
return
}
case <-n.stopper.ShouldQuiesce():
return
}
}
})
})
}

// writeNodeStatus retrieves status summaries from the supplied
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/schedulerlatency"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil/ptp"
Expand Down Expand Up @@ -1042,6 +1043,8 @@ func (s *Server) Start(ctx context.Context) error {
// should represent the general startup operation.
func (s *Server) PreStart(ctx context.Context) error {
ctx = s.AnnotateCtx(ctx)
done := startup.Begin(ctx)
defer done()

// Start the time sanity checker.
s.startTime = timeutil.Now()
Expand Down
11 changes: 8 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/cockroach/pkg/util/netutil/addr"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -1347,9 +1348,13 @@ func (s *SQLServer) preStart(

var bootstrapVersion roachpb.Version
if s.execCfg.Codec.ForSystemTenant() {
if err := s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
}); err != nil {
if err := startup.RunIdempotentWithRetry(ctx,
s.stopper.ShouldQuiesce(),
"sql get cluster version", func(ctx context.Context) error {
return s.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return txn.GetProto(ctx, keys.BootstrapVersionKey, &bootstrapVersion)
})
}); err != nil {
return err
}
} else {
Expand Down
96 changes: 96 additions & 0 deletions pkg/server/server_startup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server_test

import (
"context"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestStartupInjectedFailureSingleNode(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const failProb = 0.1

ctx := context.Background()

rng, seed := randutil.NewLockedTestRand()
t.Log("TestStartupInjectedFailure random seed", seed)
lReg := testutils.NewListenerRegistry()
defer lReg.Close()
reg := server.NewStickyInMemEnginesRegistry()
defer reg.CloseAllStickyInMemEngines()

var enableFaults atomic.Bool
args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: "1",
},
},
Listener: lReg.GetOrFail(t, 0),
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: reg,
},
SpanConfig: &spanconfig.TestingKnobs{
// Ensure that scratch range has proper zone config, otherwise it is
// anybody's guess and if we chose it test can fail.
ConfigureScratchRange: true,
},
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, br roachpb.BatchRequest,
) *roachpb.Error {
if enableFaults.Load() {
if rng.Float32() < failProb {
t.Log("injecting fault into range ", br.RangeID)
return roachpb.NewError(roachpb.NewReplicaUnavailableError(errors.New("injected error"),
&roachpb.RangeDescriptor{RangeID: br.RangeID}, roachpb.ReplicaDescriptor{
NodeID: roachpb.NodeID(1),
StoreID: roachpb.StoreID(1),
}))
}
}
return nil
},
},
},
},
ReusableListeners: true,
}
tc := testcluster.NewTestCluster(t, 1, args)
tc.Start(t)
defer tc.Stopper().Stop(ctx)
tc.StopServer(0)
enableFaults.Store(true)
lReg.ReopenOrFail(t, 0)
require.NoError(t, tc.RestartServer(0), "failed to restart server")

// Disable faults to make it easier for cluster to stop.
enableFaults.Store(false)
}
2 changes: 2 additions & 0 deletions pkg/server/tenantsettingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
14 changes: 13 additions & 1 deletion pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -93,7 +95,17 @@ func (w *Watcher) Start(ctx context.Context, sysTableResolver catalog.SystemTabl
func (w *Watcher) startRangeFeed(
ctx context.Context, sysTableResolver catalog.SystemTableIDResolver,
) error {
tableID, err := sysTableResolver.LookupSystemTableID(ctx, systemschema.TenantSettingsTable.GetName())
// We need to retry unavailable replicas here. This is only meant to be called
// at server startup.
var tableID descpb.ID
err := startup.RunIdempotentWithRetry(ctx,
w.stopper.ShouldQuiesce(),
"tenant start setting rangefeed",
func(ctx context.Context) (err error) {
tableID, err = sysTableResolver.LookupSystemTableID(ctx,
systemschema.TenantSettingsTable.GetName())
return err
})
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/ring",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/syncutil/singleflight",
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -790,6 +791,8 @@ func (ie *InternalExecutor) execInternal(
stmt string,
qargs ...interface{},
) (r *rowsIterator, retErr error) {
startup.AssertStartupRetry(ctx)

if err := ie.checkIfTxnIsConsistent(txn); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/startupmigrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
Loading