diff --git a/etcdserver/server.go b/etcdserver/server.go index 002e91e1be3..097a413d4c1 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -235,6 +235,8 @@ type EtcdServer struct { msgSnapC chan raftpb.Message + // wgMu blocks concurrent waitgroup mutation while server stopping + wgMu sync.RWMutex // wg is used to wait for the go routines that depends on the server state // to exit when stopping the server. wg sync.WaitGroup @@ -644,7 +646,9 @@ func (s *EtcdServer) run() { } defer func() { + s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping close(s.stopping) + s.wgMu.Unlock() sched.Stop() @@ -1609,6 +1613,16 @@ func (s *EtcdServer) setCommittedIndex(v uint64) { // goAttach creates a goroutine on a given function and tracks it using // the etcdserver waitgroup. func (s *EtcdServer) goAttach(f func()) { + s.wgMu.RLock() // this blocks with ongoing close(s.stopping) + defer s.wgMu.RUnlock() + select { + case <-s.stopping: + plog.Warning("server has stopped (skipping goAttach)") + return + default: + } + + // now safe to add since waitgroup wait has not started yet s.wg.Add(1) go func() { defer s.wg.Done() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 1ccbf31d3a5..22d59d77fba 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -939,8 +939,8 @@ func TestTriggerSnap(t *testing.T) { srv.Do(context.Background(), pb.Request{Method: "PUT"}) } - srv.Stop() <-donec + srv.Stop() } // TestConcurrentApplyAndSnapshotV3 will send out snapshots concurrently with