From 81b90e52ea90c5bc36de85f04d69b5a2a956ba3d Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Mon, 12 Oct 2020 17:07:46 +0200 Subject: [PATCH] server,kvserver: wait for liveness record refresh on other nodes at the end of drain Prior to this patch, it was possible for a node to shut down gracefully "too quickly", before the other nodes got a chance to see that the node has gone away. In particular it was possible: - while the node was pushing leases away, it was possible for the other nodes with replicas on shared ranges to push them back (store rebalance / allocator). This is because the other nodes did not yet have a copy of the updated node descriptor marked "draining". - after the node had moved its leases away and stopped, it was possible for range caches on other nodes to continue to try to use replicas on the drained node. To alleviate both issues, this commit makes a server wait until the expiry deadline on the draining node's liveness. This prevents other nodes from considering the draining node as a candidate and "push back" the leases to it during the drain. An additional wait of 5 seconds is added at the very end after all leases have transferred, so that if another node still finds itself wanting to address a replica on the now-drained node, it gets a chance to get a NodeLeaseHolderError and a redirect to the new leaseholder. This is expected to be the most effective change in this commit. Additionally, this commit adds 2 seconds after waiting on liveness expiry, before starting to transfer leases away. This way, there is confidence during the lease transfer that the other nodes know the draining node is, in fact, draining, and will not be considered as a transfer target. This is an optimization. Release note: None --- pkg/cli/start.go | 1 + pkg/kv/kvserver/liveness/liveness.go | 32 ++++ pkg/server/drain.go | 218 +++++++++++++++++++++------ pkg/server/drain_test.go | 159 +++++++++++++++++++ pkg/server/server.go | 5 + 5 files changed, 371 insertions(+), 44 deletions(-) diff --git a/pkg/cli/start.go b/pkg/cli/start.go index 880537db2efc..cbb08552b731 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -796,6 +796,7 @@ If problems persist, please see %s.` time.Sleep(200 * time.Millisecond) } + log.Infof(drainCtx, "stopping all tasks") stopper.Stop(drainCtx) }() diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index 35f14c5b3796..3f5d1e909396 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -344,6 +344,23 @@ func (nl *NodeLiveness) SetDraining( return errors.New("failed to drain self") } +// TimeUntilLivenessExpiry returns a duration equal to the difference +// between the current time and the current expiration deadline on +// this node's liveness record. +func (nl *NodeLiveness) TimeUntilLivenessExpiry(ctx context.Context) (time.Duration, error) { + myID := nl.gossip.NodeID.Get() + liveness, ok := nl.GetLiveness(myID) + if !ok { + // Our liveness record does not exist yet? This is surprising, + // but it does mean we have nothing to do here. + return 0, nil + } + + // Wait until the record has expired. + expiryTime := timeutil.Unix(0, liveness.Expiration.WallTime) + return expiryTime.Sub(nl.clock.PhysicalTime()), nil +} + // SetMembershipStatus changes the liveness record to reflect the target // membership status. It does so idempotently, and may retry internally until it // observes its target state durably persisted. It returns whether it was able @@ -1473,3 +1490,18 @@ func (nl *NodeLiveness) TestingSetDecommissioningInternal( ) (changeCommitted bool, err error) { return nl.setMembershipStatusInternal(ctx, oldLivenessRec, targetStatus) } + +// GetLiveNodeCount returns a count of the number of live, +// possibly decommission{ing,ed} nodes as known to liveness. +func (nl *NodeLiveness) GetLiveNodeCount() int { + now := nl.clock.Now().GoTime() + nl.mu.RLock() + defer nl.mu.RUnlock() + var count int + for _, l := range nl.mu.nodes { + if l.IsLive(now) { + count++ + } + } + return count +} diff --git a/pkg/server/drain.go b/pkg/server/drain.go index a21e9c3202a6..a70d47645144 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -12,7 +12,6 @@ package server import ( "context" - "strings" "time" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -122,12 +121,9 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr // directly. Use the Drain() RPC instead with a suitably crafted // DrainRequest. // -// On failure, the system may be in a partially drained -// state; the client should either continue calling Drain() or shut -// down the server. -// -// The reporter function, if non-nil, is called for each -// packet of load shed away from the server during the drain. +// On failure, the system may be in a partially drained state; the +// client should either continue calling Drain() until the remaining +// count comes down to zero, or shut down the server. // // TODO(knz): This method is currently exported for use by the // shutdown code in cli/start.go; however, this is a mis-design. The @@ -135,46 +131,140 @@ func (s *adminServer) Drain(req *serverpb.DrainRequest, stream serverpb.Admin_Dr func (s *Server) Drain( ctx context.Context, ) (remaining uint64, info redact.RedactableString, err error) { - reports := make(map[redact.SafeString]int) - var mu syncutil.Mutex - reporter := func(howMany int, what redact.SafeString) { - if howMany > 0 { - mu.Lock() - reports[what] += howMany - mu.Unlock() - } - } + progress := newDrainProgress() + + // Regardless of the return path, report on the progress in logs before + // the function exits. defer func() { - // Detail the counts based on the collected reports. - var descBuf strings.Builder - comma := redact.SafeString("") - for what, howMany := range reports { - remaining += uint64(howMany) - redact.Fprintf(&descBuf, "%s%s: %d", comma, what, howMany) - comma = ", " - } - info = redact.RedactableString(descBuf.String()) - log.Ops.Infof(ctx, "drain remaining: %d", remaining) + remaining, info = progress.getProgress() + log.Ops.Infof(ctx, "drain progress: %d", remaining) if info != "" { log.Ops.Infof(ctx, "drain details: %s", info) } }() - if err := s.doDrain(ctx, reporter); err != nil { - return 0, "", err + // First drain all clients and SQL leases. + log.Infof(ctx, "draining clients") + if err := s.drainClients(ctx, progress.report); err != nil { + return remaining, info, err } - return -} + // Mark the node liveness record as draining. This starts telling + // range caches on other nodes that this node is going away. + log.Infof(ctx, "draining liveness") + if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, progress.report); err != nil { + return remaining, info, err + } -func (s *Server) doDrain(ctx context.Context, reporter func(int, redact.SafeString)) error { - // First drain all clients and SQL leases. - if err := s.drainClients(ctx, reporter); err != nil { - return err + // GetLiveNodeCount includes nodes which are member of the + // cluster and for which their liveness + // record has not expired. This includes the current (draining) node, + // because marking the liveness record as draining does not prevent + // it from heartbeating. + liveNodes := s.nodeLiveness.GetLiveNodeCount() + + if progress.hasProgress() { + // Wait until some confidence exists that the other nodes have + // acknowledged the draining state: this waits until the expiry + // of of the liveness record, which is at least as late as when + // other nodes are forced to refresh their range leases. + // + // The problem that this wait solves is the following. When other + // nodes already have replicas for the same ranges as this node, + // these other nodes may attempt to transfer leases to this node + // for routine rebalancing as long as they see this node as live + // (via its liveness). This wait ensures they are forced to reload + // the liveness record and see the node is draining, and stop using + // it as target for rebalancing. + // + // We wait here only the first time Drain() is called, when the + // liveness record has been toggled from non-draining to + // draining. + // + // The reason why hasProgress() is synonymous with "Drain() is + // being called for the first time" here is because only during + // the first iteration is there work performed in drainClients() + // and nodeLiveness.SetDraining() above. At the second and later + // iterations, these first two steps do no work. + + toWait, err := s.nodeLiveness.TimeUntilLivenessExpiry(ctx) + if err != nil { + return remaining, info, err + } + + if toWait > 0 { + log.Infof(ctx, "waiting %s for the liveness record to expire", toWait) + time.Sleep(toWait) + } else { + log.VInfof(ctx, 1, "no liveness record on this node, no expiry to wait on") + } + + if liveNodes > 1 { + // If we believe there are other nodes, we also wait 5 seconds + // past the expiration to give ample time for these nodes to + // re-load their copy of this node's liveness record, prior to + // us transferring leases below. + // + // This wait is not necessary for correctness; it is merely an + // optimization: it reduces the probability that another node + // hasn't seen the expiration yet and tries to transfer a + // lease back to this draining node during the lease drain + // below. + // + // We also only use the optimization if we have some + // confidence that there are other ready nodes in the cluster; + // for a single-node cluster, this wait is clearly a waste of + // time and would be a source of annoyance to the user. + const extraLivenessDrainStatePropagationDelay = 2 * time.Second + + log.Infof(ctx, "waiting %s to let draining state propagate throughout cluster", extraLivenessDrainStatePropagationDelay) + time.Sleep(extraLivenessDrainStatePropagationDelay) + } + } + + // Transfer the range leases away. + // This may take a while; that's OK. + // + // Note that we are careful to only start shedding range leases away + // after we have some confidence that the other nodes have picked up + // the draining bit (by waiting for the expiration delay, + // above). This is because otherwise, this node would be a candidate + // for transferring the leases back immediately, concurrently + // negating the progress made by this SetDraining() call. + log.Infof(ctx, "transferring leases") + if err := s.node.SetDraining(true /* drain */, progress.report); err != nil { + return remaining, info, err } - // Finally, mark the node as draining in liveness and drain the - // range leases. - return s.drainNode(ctx, reporter) + + if !progress.hasProgress() && liveNodes > 1 { + s.TestingLeaseTransferDoneAfterDrain.Set(true) + // If there is no more work to do, the process will then proceed to + // shut down. + // + // Just before doing so however, if we believe there are other + // nodes, then wait a little bit more. + // + // The problem that this second wait helps solve is when other + // nodes do not have replicas for the ranges whose leases were + // just transferred away, but may have oudated information about + // them in their range desc/lease caches. These other nodes may be + // serving application traffic, and we want to give them a chance + // to encounter a NotLeaseHolderError and refresh their cache + // after the last lease has been transferred. + // + // Like above, this is an optimization: if this was not + // occurring, the other nodes would simply time out on a request + // and start inquiring other replicas to discover the new + // leaseholder. We also avoid the optimization if the ready node + // count is just 1, to prevent UX annoyances. + const extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay = 5 * time.Second + log.Infof(ctx, + "waiting %s so that final requests to this node from rest of cluster can be redirected", + extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay) + time.Sleep(extraIdleStateForReceivingStrayMisroutedLeaseholderRequestsDelay) + } + + return remaining, info, nil } // isDraining returns true if either clients are being drained @@ -204,15 +294,55 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf // given sessions a chance to finish ongoing work. s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter) - // Done. This executes the defers set above to drain SQL leases. return nil } -// drainNode initiates the draining mode for the node, which -// starts draining range leases. -func (s *Server) drainNode(ctx context.Context, reporter func(int, redact.SafeString)) error { - if err := s.nodeLiveness.SetDraining(ctx, true /* drain */, reporter); err != nil { - return err +// drainProgress stores the result of calls to the reporter function +// passed to the various draining function. +// +// This is made safe for concurrent use. +type drainProgress struct { + // We use a mutex to ensure that the reporter function can be called + // concurrently and safely from multiple goroutines. + syncutil.Mutex + reports map[redact.SafeString]int +} + +func newDrainProgress() *drainProgress { + return &drainProgress{ + reports: make(map[redact.SafeString]int), + } +} + +// report some drain work to the drainProgress tracker. +// This is safe for use by multiple goroutines concurrently. +func (p *drainProgress) report(howMany int, what redact.SafeString) { + if howMany > 0 { + p.Lock() + defer p.Unlock() + p.reports[what] += howMany + } +} + +// hasProgress returns true iff some progress was reported via +// the report() method already. +func (p *drainProgress) hasProgress() bool { + p.Lock() + defer p.Unlock() + return len(p.reports) > 0 +} + +// getProgress retrieves a description and a count of the work +// performed so far. +// The caller guarantees that no concurrent calls to the report() +// method are occurring. +func (p *drainProgress) getProgress() (remaining uint64, details redact.RedactableString) { + var descBuf redact.StringBuilder + comma := redact.SafeString("") + for what, howMany := range p.reports { + remaining += uint64(howMany) + descBuf.Printf("%s%s: %d", comma, what, howMany) + comma = ", " } - return s.node.SetDraining(true /* drain */, reporter) + return remaining, descBuf.RedactableString() } diff --git a/pkg/server/drain_test.go b/pkg/server/drain_test.go index 9288c9b66485..7d7eca330e36 100644 --- a/pkg/server/drain_test.go +++ b/pkg/server/drain_test.go @@ -14,15 +14,21 @@ import ( "context" "io" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/kr/pretty" "google.golang.org/grpc" @@ -204,3 +210,156 @@ func getAdminClientForServer( client := serverpb.NewAdminClient(conn) return client, func() { _ = conn.Close() }, nil } + +// TestRangeCacheUpdateWithNLEAfterDrain checks that in a 3-node cluster, an observer node that +// does not witness the lease transfers during a drain, properly +// experiences a "fast" NLE at the end of the drain. +func TestRangeCacheUpdateWithNLEAfterDrain(t *testing.T) { + defer leaktest.AfterTest(t) + defer log.Scope(t).Close(t) + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{ + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0 /* n1 */ : {Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "storage", Value: "true"}, {Key: "node", Value: "draining"}}}}, + 1 /* n2 */ : {Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "storage", Value: "true"}, {Key: "node", Value: "target"}}}}, + 2 /* n3 */ : {Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "storage", Value: "true"}, {Key: "node", Value: "spare"}}}}, + 3 /* n4 */ : {Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "storage", Value: "false"}, {Key: "node", Value: "observer"}}}}, + }, + }) + defer tc.Stopper().Stop(ctx) + + t.Logf("setting up data...") + s0 := tc.Server(0).(*server.TestServer) + if err := s0.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error { + for _, stmt := range []string{ + // Adjust the zone configs so that data spreads over the storage + // nodes, excluding the observer node. + `ALTER RANGE meta CONFIGURE ZONE USING constraints = '[+storage=true]'`, + `ALTER RANGE system CONFIGURE ZONE USING constraints = '[+storage=true]'`, + `ALTER DATABASE system CONFIGURE ZONE USING constraints = '[+storage=true]'`, + `ALTER RANGE liveness CONFIGURE ZONE USING constraints = '[+storage=true]'`, + `ALTER RANGE default CONFIGURE ZONE USING constraints = '[+storage=true]'`, + // Create a table and make its lease live on the draining node. + `CREATE TABLE defaultdb.public.t(x INT PRIMARY KEY)`, + `ALTER TABLE defaultdb.public.t CONFIGURE ZONE USING num_replicas = 3, constraints = '[+storage=true]', lease_preferences = '[[+node=draining]]'`, + `INSERT INTO defaultdb.public.t(x) SELECT generate_series(1,10000)`, + } { + if _, err := ie.Exec(ctx, "set-zone", nil, stmt); err != nil { + return errors.Wrap(err, stmt) + } + } + return nil + }); err != nil { + t.Fatal(err) + } + + t.Logf("waiting for up-replication") + // Wait for the newly created table to spread over all nodes. + testutils.SucceedsSoon(t, func() error { + return s0.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error { + _, err := ie.Exec(ctx, "wait-replication", nil, ` +SELECT -- wait for up-replication. + IF(array_length(replicas, 1) != 3, + crdb_internal.force_error('UU000', 'not ready: ' || array_length(replicas, 1)::string || ' replicas'), + + -- once up-replication is reached, ensure that we got the replicas where we wanted. + IF(replicas != '{1,2,3}'::INT[] OR lease_holder != 1, + crdb_internal.force_Error('UU000', 'zone config not applied properly: ' || replicas::string || ' / lease at n' || lease_holder::int), + 0)) + FROM [SHOW RANGES FROM TABLE defaultdb.public.t]`) + if err != nil && !testutils.IsError(err, "not ready") { + t.Fatal(err) + } + return err + }) + }) + + t.Logf("populating cache on observer node") + // Now query the newly created table from the observer node. This + // populates the range cache on that node. + sObserver := tc.Server(3).(*server.TestServer) + if err := sObserver.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error { + _, err := ie.Exec(ctx, "populate-cache", nil, `TABLE defaultdb.public.t`) + return err + }); err != nil { + t.Fatal(err) + } + + t.Logf("unlocking range") + // Now remove the lease preference from the first node, so the lease freely can move to the target node. + if err := s0.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error { + _, err := ie.Exec(ctx, "move-range", nil, + `ALTER TABLE defaultdb.public.t CONFIGURE ZONE USING constraints = '[+storage=true]', lease_preferences = '[]'`) + return err + }); err != nil { + t.Fatal(err) + } + + // Now drain the first node asynchronously. + var nodeStopped syncutil.AtomicBool + tc.Stopper().RunAsyncTask(ctx, "drain-first-node", func(ctx context.Context) { + t.Logf("starting async drain on first node") + for { + remaining, _, err := s0.Drain(ctx) + if err != nil { + t.Logf("graceful drain failed: %v", err) + break + } + if remaining == 0 { + break + } + } + t.Logf("async drain complete; node stopping") + tc.StopServer(0) + nodeStopped.Set(true) + }) + + t.Logf("waiting for transfer to complete") + // Wait for the lease transfer phase during the drain to complete, + // but before the final wait on the node. + testutils.SucceedsSoon(t, func() error { + if !s0.TestingLeaseTransferDoneAfterDrain.Get() { + return errors.New("leases not transferred yet") + } + return nil + }) + + t.Logf("asserting that the lease has arrived in the right place") + // As a sanity check, assert that the lease has landed on the target node. + // In particular we don't want it on the observer node, because it + // would give us a false negative on the test result. + sTarget := tc.Server(1).(*server.TestServer) + if err := sTarget.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error { + _, err := ie.Exec(ctx, "wait-lease", nil, ` +SELECT IF(lease_holder = 1, + crdb_internal.force_error('UU000', 'not moved to target node'), + 0) + FROM [SHOW RANGES FROM TABLE defaultdb.public.t]`) + if err != nil && !testutils.IsError(err, "not moved to target node") { + t.Fatal(err) + } + return err + }); err != nil { + t.Fatal(err) + } + + // If the node has already stopped at this point, it's too late for + // the test. The query below is bound to get bogus results. + if nodeStopped.Get() { + t.Error("TEST DESIGN ERROR: node already stopped; too late for checking") + } + + t.Logf("checking query speed on observer node") + qStart := timeutil.Now() + if err := sObserver.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error { + _, err := ie.Exec(ctx, "reread-table", nil, `TABLE defaultdb.public.t`) + return err + }); err != nil { + t.Fatal(err) + } + qEnd := timeutil.Now() + if qDur := qEnd.Sub(qStart); qDur > 20*time.Millisecond { + t.Error("query took too long, maybe NLE redirect did not refresh the cache") + } +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 8b5615dcf586..76e248b0bc0a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -80,6 +80,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -163,6 +164,10 @@ type Server struct { // The following fields are populated at start time, i.e. in `(*Server).Start`. startTime time.Time + + // TestingLeaseTransferDoneAfterDrain is set to true afte there are no more leases to be + // transferred away during a drain, but before the final wait that precedes server shutdown. + TestingLeaseTransferDoneAfterDrain syncutil.AtomicBool } // externalStorageBuilder is a wrapper around the ExternalStorage factory