From 1d4870895da6b5bf771e2193d1cf870814bec300 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 14 Aug 2020 18:17:00 -0400 Subject: [PATCH] kvserver: synchronize some rare operations with shutdown This patch runs some infrequent operations that might use the storage engine as tasks, and thus synchronizes them with server shutdown. In #51544 we've seen one of these cause a crash when executing after Pebble was shut down. Release note: None --- pkg/kv/kvserver/stores_server.go | 102 ++++++++++++++++--------------- 1 file changed, 54 insertions(+), 48 deletions(-) diff --git a/pkg/kv/kvserver/stores_server.go b/pkg/kv/kvserver/stores_server.go index d01807135ef8..f2e39d5a6b18 100644 --- a/pkg/kv/kvserver/stores_server.go +++ b/pkg/kv/kvserver/stores_server.go @@ -33,12 +33,16 @@ func MakeServer(descriptor *roachpb.NodeDescriptor, stores *Stores) Server { return Server{stores} } -func (is Server) execStoreCommand(h StoreRequestHeader, f func(*Store) error) error { +func (is Server) execStoreCommand( + ctx context.Context, taskName string, h StoreRequestHeader, f func(*Store) error, +) error { store, err := is.stores.GetStore(h.StoreID) if err != nil { return err } - return f(store) + return store.stopper.RunTaskWithErr(ctx, taskName, func(ctx context.Context) error { + return f(store) + }) } // CollectChecksum implements PerReplicaServer. @@ -46,7 +50,7 @@ func (is Server) CollectChecksum( ctx context.Context, req *CollectChecksumRequest, ) (*CollectChecksumResponse, error) { resp := &CollectChecksumResponse{} - err := is.execStoreCommand(req.StoreRequestHeader, + err := is.execStoreCommand(ctx, "collect-checksum", req.StoreRequestHeader, func(s *Store) error { r, err := s.GetReplica(req.RangeID) if err != nil { @@ -84,40 +88,41 @@ func (is Server) WaitForApplication( ctx context.Context, req *WaitForApplicationRequest, ) (*WaitForApplicationResponse, error) { resp := &WaitForApplicationResponse{} - err := is.execStoreCommand(req.StoreRequestHeader, func(s *Store) error { - // TODO(benesch): Once Replica changefeeds land, see if we can implement - // this request handler without polling. - retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond} - for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - // Long-lived references to replicas are frowned upon, so re-fetch the - // replica on every turn of the loop. - repl, err := s.GetReplica(req.RangeID) - if err != nil { - return err + err := is.execStoreCommand(ctx, "wait-for-application", req.StoreRequestHeader, + func(s *Store) error { + // TODO(benesch): Once Replica changefeeds land, see if we can implement + // this request handler without polling. + retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond} + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + // Long-lived references to replicas are frowned upon, so re-fetch the + // replica on every turn of the loop. + repl, err := s.GetReplica(req.RangeID) + if err != nil { + return err + } + repl.mu.RLock() + leaseAppliedIndex := repl.mu.state.LeaseAppliedIndex + repl.mu.RUnlock() + if leaseAppliedIndex >= req.LeaseIndex { + // For performance reasons, we don't sync to disk when + // applying raft commands. This means that if a node restarts + // after applying but before the next sync, its + // LeaseAppliedIndex could temporarily regress (until it + // reapplies its latest raft log entries). + // + // Merging relies on the monotonicity of the log applied + // index, so before returning ensure that rocksdb has synced + // everything up to this point to disk. + // + // https://github.com/cockroachdb/cockroach/issues/33120 + return storage.WriteSyncNoop(ctx, s.engine) + } } - repl.mu.RLock() - leaseAppliedIndex := repl.mu.state.LeaseAppliedIndex - repl.mu.RUnlock() - if leaseAppliedIndex >= req.LeaseIndex { - // For performance reasons, we don't sync to disk when - // applying raft commands. This means that if a node restarts - // after applying but before the next sync, its - // LeaseAppliedIndex could temporarily regress (until it - // reapplies its latest raft log entries). - // - // Merging relies on the monotonicity of the log applied - // index, so before returning ensure that rocksdb has synced - // everything up to this point to disk. - // - // https://github.com/cockroachdb/cockroach/issues/33120 - return storage.WriteSyncNoop(ctx, s.engine) + if ctx.Err() == nil { + log.Fatal(ctx, "infinite retry loop exited but context has no error") } - } - if ctx.Err() == nil { - log.Fatal(ctx, "infinite retry loop exited but context has no error") - } - return ctx.Err() - }) + return ctx.Err() + }) return resp, err } @@ -129,19 +134,20 @@ func (is Server) WaitForReplicaInit( ctx context.Context, req *WaitForReplicaInitRequest, ) (*WaitForReplicaInitResponse, error) { resp := &WaitForReplicaInitResponse{} - err := is.execStoreCommand(req.StoreRequestHeader, func(s *Store) error { - retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond} - for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - // Long-lived references to replicas are frowned upon, so re-fetch the - // replica on every turn of the loop. - if repl, err := s.GetReplica(req.RangeID); err == nil && repl.IsInitialized() { - return nil + err := is.execStoreCommand(ctx, "wait-for-rep-init", req.StoreRequestHeader, + func(s *Store) error { + retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond} + for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { + // Long-lived references to replicas are frowned upon, so re-fetch the + // replica on every turn of the loop. + if repl, err := s.GetReplica(req.RangeID); err == nil && repl.IsInitialized() { + return nil + } } - } - if ctx.Err() == nil { - log.Fatal(ctx, "infinite retry loop exited but context has no error") - } - return ctx.Err() - }) + if ctx.Err() == nil { + log.Fatal(ctx, "infinite retry loop exited but context has no error") + } + return ctx.Err() + }) return resp, err }