Skip to content

Commit

Permalink
kvserver: synchronize some rare operations with shutdown
Browse files Browse the repository at this point in the history
This patch runs some infrequent operations that might use the storage
engine as tasks, and thus synchronizes them with server shutdown.
In cockroachdb#51544 we've seen one of these cause a crash when executing after
Pebble was shut down.

Release note: None
  • Loading branch information
andreimatei committed Aug 14, 2020
1 parent 63ff574 commit 1d48708
Showing 1 changed file with 54 additions and 48 deletions.
102 changes: 54 additions & 48 deletions pkg/kv/kvserver/stores_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,24 @@ func MakeServer(descriptor *roachpb.NodeDescriptor, stores *Stores) Server {
return Server{stores}
}

func (is Server) execStoreCommand(h StoreRequestHeader, f func(*Store) error) error {
func (is Server) execStoreCommand(
ctx context.Context, taskName string, h StoreRequestHeader, f func(*Store) error,
) error {
store, err := is.stores.GetStore(h.StoreID)
if err != nil {
return err
}
return f(store)
return store.stopper.RunTaskWithErr(ctx, taskName, func(ctx context.Context) error {
return f(store)
})
}

// CollectChecksum implements PerReplicaServer.
func (is Server) CollectChecksum(
ctx context.Context, req *CollectChecksumRequest,
) (*CollectChecksumResponse, error) {
resp := &CollectChecksumResponse{}
err := is.execStoreCommand(req.StoreRequestHeader,
err := is.execStoreCommand(ctx, "collect-checksum", req.StoreRequestHeader,
func(s *Store) error {
r, err := s.GetReplica(req.RangeID)
if err != nil {
Expand Down Expand Up @@ -84,40 +88,41 @@ func (is Server) WaitForApplication(
ctx context.Context, req *WaitForApplicationRequest,
) (*WaitForApplicationResponse, error) {
resp := &WaitForApplicationResponse{}
err := is.execStoreCommand(req.StoreRequestHeader, func(s *Store) error {
// TODO(benesch): Once Replica changefeeds land, see if we can implement
// this request handler without polling.
retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
// Long-lived references to replicas are frowned upon, so re-fetch the
// replica on every turn of the loop.
repl, err := s.GetReplica(req.RangeID)
if err != nil {
return err
err := is.execStoreCommand(ctx, "wait-for-application", req.StoreRequestHeader,
func(s *Store) error {
// TODO(benesch): Once Replica changefeeds land, see if we can implement
// this request handler without polling.
retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
// Long-lived references to replicas are frowned upon, so re-fetch the
// replica on every turn of the loop.
repl, err := s.GetReplica(req.RangeID)
if err != nil {
return err
}
repl.mu.RLock()
leaseAppliedIndex := repl.mu.state.LeaseAppliedIndex
repl.mu.RUnlock()
if leaseAppliedIndex >= req.LeaseIndex {
// For performance reasons, we don't sync to disk when
// applying raft commands. This means that if a node restarts
// after applying but before the next sync, its
// LeaseAppliedIndex could temporarily regress (until it
// reapplies its latest raft log entries).
//
// Merging relies on the monotonicity of the log applied
// index, so before returning ensure that rocksdb has synced
// everything up to this point to disk.
//
// https://github.com/cockroachdb/cockroach/issues/33120
return storage.WriteSyncNoop(ctx, s.engine)
}
}
repl.mu.RLock()
leaseAppliedIndex := repl.mu.state.LeaseAppliedIndex
repl.mu.RUnlock()
if leaseAppliedIndex >= req.LeaseIndex {
// For performance reasons, we don't sync to disk when
// applying raft commands. This means that if a node restarts
// after applying but before the next sync, its
// LeaseAppliedIndex could temporarily regress (until it
// reapplies its latest raft log entries).
//
// Merging relies on the monotonicity of the log applied
// index, so before returning ensure that rocksdb has synced
// everything up to this point to disk.
//
// https://github.com/cockroachdb/cockroach/issues/33120
return storage.WriteSyncNoop(ctx, s.engine)
if ctx.Err() == nil {
log.Fatal(ctx, "infinite retry loop exited but context has no error")
}
}
if ctx.Err() == nil {
log.Fatal(ctx, "infinite retry loop exited but context has no error")
}
return ctx.Err()
})
return ctx.Err()
})
return resp, err
}

Expand All @@ -129,19 +134,20 @@ func (is Server) WaitForReplicaInit(
ctx context.Context, req *WaitForReplicaInitRequest,
) (*WaitForReplicaInitResponse, error) {
resp := &WaitForReplicaInitResponse{}
err := is.execStoreCommand(req.StoreRequestHeader, func(s *Store) error {
retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
// Long-lived references to replicas are frowned upon, so re-fetch the
// replica on every turn of the loop.
if repl, err := s.GetReplica(req.RangeID); err == nil && repl.IsInitialized() {
return nil
err := is.execStoreCommand(ctx, "wait-for-rep-init", req.StoreRequestHeader,
func(s *Store) error {
retryOpts := retry.Options{InitialBackoff: 10 * time.Millisecond}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
// Long-lived references to replicas are frowned upon, so re-fetch the
// replica on every turn of the loop.
if repl, err := s.GetReplica(req.RangeID); err == nil && repl.IsInitialized() {
return nil
}
}
}
if ctx.Err() == nil {
log.Fatal(ctx, "infinite retry loop exited but context has no error")
}
return ctx.Err()
})
if ctx.Err() == nil {
log.Fatal(ctx, "infinite retry loop exited but context has no error")
}
return ctx.Err()
})
return resp, err
}

0 comments on commit 1d48708

Please sign in to comment.