From bdcc422bb23cd5b715a51d164fe29b16c5a137a1 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Sat, 5 Sep 2020 01:13:52 +0530 Subject: [PATCH] release/v20.03 - Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313) (#6403) * Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313) This PR removes the usage of context.Background() in groups.go and ensures that all the various goroutines exit as intended. Changes: * Shutdown SubscribeForUpdates correctly. * Fix up all the closing conditions. * Consolidate updaters into one closer * Update Badger to master * fix(build): Update ResetAcl args for OSS build. * chore: Remove TODO comment. Co-authored-by: Daniel Mai (cherry picked from commit f1941b3c072971fb1112569879afc3dd80621f70) --- dgraph/cmd/alpha/run.go | 23 +++++-- dgraph/cmd/zero/run.go | 7 +- edgraph/access.go | 2 +- edgraph/access_ee.go | 30 +++++---- edgraph/server.go | 5 +- worker/groups.go | 143 +++++++++++++++++++++++----------------- worker/server_state.go | 29 +++++--- 7 files changed, 146 insertions(+), 93 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 48fe3f802e3..3304ddd93ae 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -711,14 +711,13 @@ func run() { } }() - // Setup external communication. - aclCloser := z.NewCloser(1) + updaters := z.NewCloser(2) go func() { worker.StartRaftNodes(worker.State.WALstore, bindall) // initialization of the admin account can only be done after raft nodes are running // and health check passes - edgraph.ResetAcl() - edgraph.RefreshAcls(aclCloser) + edgraph.ResetAcl(updaters) + edgraph.RefreshAcls(updaters) }() // Graphql subscribes to alpha to get schema updates. We need to close that before we @@ -727,11 +726,23 @@ func run() { setupServer(adminCloser) glog.Infoln("GRPC and HTTP stopped.") - aclCloser.SignalAndWait() + + // This might not close until group is given the signal to close. So, only signal here, + // wait for it after group is closed. + updaters.Signal() + worker.BlockingStop() + glog.Infoln("worker stopped.") + adminCloser.SignalAndWait() - glog.Info("Disposing server state.") + glog.Infoln("adminCloser closed.") + worker.State.Dispose() x.RemoveCidFile() + glog.Info("worker.State disposed.") + + updaters.Wait() + glog.Infoln("updaters closed.") + glog.Infoln("Server shutdown. Bye!") } diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index 52b81c3df5f..9df9a77cf57 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -376,5 +376,10 @@ func run() { glog.Infoln("Running Dgraph Zero...") st.zero.closer.Wait() - glog.Infoln("All done.") + glog.Infoln("Closer closed.") + + err = kv.Close() + glog.Infof("Badger closed with err: %v\n", err) + + glog.Infoln("All done. Goodbye!") } diff --git a/edgraph/access.go b/edgraph/access.go index 724e8819892..44b2f328d77 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -41,7 +41,7 @@ func (s *Server) Login(ctx context.Context, } // ResetAcl is an empty method since ACL is only supported in the enterprise version. -func ResetAcl() { +func ResetAcl(closer *z.Closer) { // do nothing } diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 5f54621ba24..21d164ee23e 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -301,9 +301,6 @@ func RefreshAcls(closer *z.Closer) { return } - ticker := time.NewTicker(worker.Config.AclRefreshInterval) - defer ticker.Stop() - // retrieve the full data set of ACLs from the corresponding alpha server, and update the // aclCachePtr retrieveAcls := func() error { @@ -313,9 +310,7 @@ func RefreshAcls(closer *z.Closer) { ReadOnly: true, } - ctx := context.Background() - var err error - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize) + queryResp, err := (&Server{}).doQuery(closer.Ctx(), &queryRequest, NoAuthorize) if err != nil { return errors.Errorf("unable to retrieve acls: %v", err) } @@ -329,14 +324,16 @@ func RefreshAcls(closer *z.Closer) { return nil } + ticker := time.NewTicker(worker.Config.AclRefreshInterval) + defer ticker.Stop() for { select { - case <-closer.HasBeenClosed(): - return case <-ticker.C: if err := retrieveAcls(); err != nil { - glog.Errorf("Error while retrieving acls:%v", err) + glog.Errorf("Error while retrieving acls: %v", err) } + case <-closer.HasBeenClosed(): + return } } } @@ -354,7 +351,12 @@ const queryAcls = ` ` // ResetAcl clears the aclCachePtr and upserts the Groot account. -func ResetAcl() { +func ResetAcl(closer *z.Closer) { + defer func() { + glog.Infof("ResetAcl closed") + closer.Done() + }() + if len(worker.Config.HmacSecret) == 0 { // The acl feature is not turned on. return @@ -421,8 +423,8 @@ func ResetAcl() { return nil } - for { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) defer cancel() if err := upsertGuardians(ctx); err != nil { glog.Infof("Unable to upsert the guardian group. Error: %v", err) @@ -432,8 +434,8 @@ func ResetAcl() { break } - for { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) defer cancel() if err := upsertGroot(ctx); err != nil { glog.Infof("Unable to upsert the groot account. Error: %v", err) diff --git a/edgraph/server.go b/edgraph/server.go index 7fb3803d432..24bd2b1924a 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -42,6 +42,7 @@ import ( "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/dgraph/chunker" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" @@ -178,7 +179,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er _, err := query.ApplyMutations(ctx, m) // recreate the admin account after a drop all operation - ResetAcl() + ResetAcl(nil) return empty, err } @@ -191,7 +192,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er _, err := query.ApplyMutations(ctx, m) // recreate the admin account after a drop data operation - ResetAcl() + ResetAcl(nil) return empty, err } diff --git a/worker/groups.go b/worker/groups.go index 9924ee20124..9070c030521 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -42,9 +42,6 @@ import ( type groupi struct { x.SafeMutex - // TODO: Is this context being used? - ctx context.Context - cancel context.CancelFunc state *pb.MembershipState Node *node gid uint32 @@ -73,8 +70,6 @@ func groups() *groupi { // This function triggers RAFT nodes to be created, and is the entrance to the RAFT // world from main.go. func StartRaftNodes(walStore *badger.DB, bindall bool) { - gr.ctx, gr.cancel = context.WithCancel(context.Background()) - if x.WorkerConfig.MyAddr == "" { x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", workerPort()) } else { @@ -123,7 +118,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { continue } zc := pb.NewZeroClient(pl.Get()) - connState, err = zc.Connect(gr.ctx, m) + connState, err = zc.Connect(gr.Ctx(), m) if err == nil || x.ShouldCrash(err) { break } @@ -164,6 +159,14 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { gr.proposeInitialTypes() } +func (g *groupi) Ctx() context.Context { + return g.closer.Ctx() +} + +func (g *groupi) IsClosed() bool { + return g.closer.Ctx().Err() != nil +} + func (g *groupi) informZeroAboutTablets() { // Before we start this Alpha, let's pick up all the predicates we have in our postings // directory, and ask Zero if we are allowed to serve it. Do this irrespective of whether @@ -202,7 +205,7 @@ func (g *groupi) proposeInitialTypes() { func (g *groupi) proposeInitialSchema() { initialSchema := schema.InitialSchema() - ctx := context.Background() + ctx := g.Ctx() for _, s := range initialSchema { if gid, err := g.BelongsToReadOnly(s.Predicate, 0); err != nil { glog.Errorf("Error getting tablet for predicate %s. Will force schema proposal.", @@ -227,7 +230,7 @@ func (g *groupi) upsertSchema(sch *pb.SchemaUpdate, typ *pb.TypeUpdate) { // Propose schema mutation. var m pb.Mutations // schema for a reserved predicate is not changed once set. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) ts, err := Timestamps(ctx, &pb.Num{Val: 1}) cancel() if err != nil { @@ -246,7 +249,7 @@ func (g *groupi) upsertSchema(sch *pb.SchemaUpdate, typ *pb.TypeUpdate) { // This would propose the schema mutation and make sure some node serves this predicate // and has the schema defined above. for { - _, err := MutateOverNetwork(gr.ctx, &m) + _, err := MutateOverNetwork(gr.Ctx(), &m) if err == nil { break } @@ -363,8 +366,7 @@ func (g *groupi) applyState(state *pb.MembershipState) { } } - if err := g.Node.ProposePeerRemoval( - context.Background(), member.GetId()); err != nil { + if err := g.Node.ProposePeerRemoval(g.Ctx(), member.GetId()); err != nil { glog.Errorf("Error while proposing node removal: %+v", err) } }() @@ -431,7 +433,7 @@ func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) { Predicate: key, ReadOnly: true, } - out, err := zc.ShouldServe(context.Background(), tablet) + out, err := zc.ShouldServe(g.Ctx(), tablet) if err != nil { glog.Errorf("Error while ShouldServe grpc call %v", err) return 0, err @@ -475,7 +477,7 @@ func (g *groupi) Tablet(key string) (*pb.Tablet, error) { zc := pb.NewZeroClient(pl.Get()) tablet = &pb.Tablet{GroupId: g.groupId(), Predicate: key} - out, err := zc.ShouldServe(context.Background(), tablet) + out, err := zc.ShouldServe(g.Ctx(), tablet) if err != nil { glog.Errorf("Error while ShouldServe grpc call %v", err) return nil, err @@ -629,7 +631,7 @@ func (g *groupi) connToZeroLeader() *conn.Pool { glog.V(1).Infof("No healthy Zero leader found. Trying to find a Zero leader...") getLeaderConn := func(zc pb.ZeroClient) *conn.Pool { - ctx, cancel := context.WithTimeout(g.ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) defer cancel() connState, err := zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true}) @@ -649,6 +651,10 @@ func (g *groupi) connToZeroLeader() *conn.Pool { delay := connBaseDelay maxHalfDelay := time.Second for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + if g.IsClosed() { + return nil + } + time.Sleep(delay) if delay <= maxHalfDelay { delay *= 2 @@ -701,7 +707,7 @@ func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error { return errNoConnection } c := pb.NewZeroClient(pl.Get()) - ctx, cancel := context.WithTimeout(g.ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) defer cancel() reply, err := c.UpdateMembership(ctx, group) if err != nil { @@ -716,7 +722,10 @@ func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error { // sendMembershipUpdates sends the membership update to Zero leader. If this Alpha is the leader, it // would also calculate the tablet sizes and send them to Zero. func (g *groupi) sendMembershipUpdates() { - defer g.closer.Done() // CLOSER:1 + defer func() { + glog.Infoln("Closing sendMembershipUpdates") + g.closer.Done() // CLOSER:1 + }() ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -760,7 +769,10 @@ func (g *groupi) sendMembershipUpdates() { // connection which tells Alpha about the state of the cluster, including the latest Zero leader. // All the other connections to Zero, are only made only to the leader. func (g *groupi) receiveMembershipUpdates() { - defer g.closer.Done() // CLOSER:1 + defer func() { + glog.Infoln("Closing receiveMembershipUpdates") + g.closer.Done() // CLOSER:1 + }() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -782,7 +794,7 @@ START: glog.Infof("Got address of a Zero leader: %s", pl.Addr) c := pb.NewZeroClient(pl.Get()) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(g.Ctx()) stream, err := c.StreamMembership(ctx, &api.Payload{}) if err != nil { cancel() @@ -856,7 +868,10 @@ OUTER: // processOracleDeltaStream is used to process oracle delta stream from Zero. // Zero sends information about aborted/committed transactions and maxPending. func (g *groupi) processOracleDeltaStream() { - defer g.closer.Done() // CLOSER:1 + defer func() { + glog.Infoln("Closing processOracleDeltaStream") + g.closer.Done() // CLOSER:1 + }() ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -868,6 +883,9 @@ func (g *groupi) processOracleDeltaStream() { pl := g.connToZeroLeader() if pl == nil { glog.Warningln("Oracle delta stream: No Zero leader known.") + if g.IsClosed() { + return + } time.Sleep(time.Second) return } @@ -878,8 +896,9 @@ func (g *groupi) processOracleDeltaStream() { // batching. Once a batch is created, it gets proposed. Thus, we can reduce the number of // times proposals happen, which is a great optimization to have (and a common one in our // code base). - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(g.Ctx()) defer cancel() + c := pb.NewZeroClient(pl.Get()) stream, err := c.Oracle(ctx, &api.Payload{}) if err != nil { @@ -993,7 +1012,7 @@ func (g *groupi) processOracleDeltaStream() { for { // Block forever trying to propose this. Also this proposal should not be counted // towards num pending proposals and be proposed right away. - err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta}) + err := g.Node.proposeAndWait(g.Ctx(), &pb.Proposal{Delta: delta}) if err == nil { break } @@ -1024,30 +1043,25 @@ func EnterpriseEnabled() bool { } g := groups() if g.state == nil { - return askZeroForEE() + return g.askZeroForEE() } g.RLock() defer g.RUnlock() return g.state.GetLicense().GetEnabled() } -func askZeroForEE() bool { +func (g *groupi) askZeroForEE() bool { var err error var connState *pb.ConnectionState - grp := &groupi{} - createConn := func() bool { - grp.ctx, grp.cancel = context.WithCancel(context.Background()) - defer grp.cancel() - - pl := grp.connToZeroLeader() + pl := g.connToZeroLeader() if pl == nil { return false } zc := pb.NewZeroClient(pl.Get()) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) defer cancel() connState, err = zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true}) @@ -1063,7 +1077,7 @@ func askZeroForEE() bool { return false } - for { + for !g.IsClosed() { if createConn() { break } @@ -1077,39 +1091,48 @@ func SubscribeForUpdates(prefixes [][]byte, cb func(kvs *badgerpb.KVList), group closer *z.Closer) { defer closer.Done() - for { - select { - case <-closer.HasBeenClosed(): - return - default: + var prefix []byte + if len(prefixes) > 0 { + prefix = prefixes[0] + } + defer func() { + glog.Infof("SubscribeForUpdates closing for prefix: %q\n", prefix) + closer.Done() + }() - // Connect to any of the group 1 nodes. - members := groups().AnyTwoServers(group) - // There may be a lag while starting so keep retrying. - if len(members) == 0 { - continue - } - pool := conn.GetPools().Connect(members[0]) - client := pb.NewWorkerClient(pool.Get()) + listen := func() error { + // Connect to any of the group 1 nodes. + members := groups().AnyTwoServers(group) + // There may be a lag while starting so keep retrying. + if len(members) == 0 { + return fmt.Errorf("Unable to find any servers for group: %d", group) + } + pool := conn.GetPools().Connect(members[0]) + client := pb.NewWorkerClient(pool.Get()) - // Get Subscriber stream. - stream, err := client.Subscribe(context.Background(), - &pb.SubscriptionRequest{Prefixes: prefixes}) + // Get Subscriber stream. + stream, err := client.Subscribe(closer.Ctx(), &pb.SubscriptionRequest{Prefixes: prefixes}) + if err != nil { + return errors.Wrapf(err, "error from client.subscribe") + } + for { + // Listen for updates. + kvs, err := stream.Recv() if err != nil { - glog.Errorf("Error from alpha client subscribe: %v", err) - time.Sleep(100 * time.Millisecond) - continue - } - receiver: - for { - // Listen for updates. - kvs, err := stream.Recv() - if err != nil { - glog.Errorf("Error from worker subscribe stream: %v", err) - break receiver - } - cb(kvs) + return errors.Wrapf(err, "while receiving from stream") } + cb(kvs) + } + } + + for { + if err := listen(); err != nil { + glog.Errorf("Error during SubscribeForUpdates for prefix %q: %v. closer err: %v\n", + prefix, err, closer.Ctx().Err()) } + if closer.Ctx().Err() != nil { + return + } + time.Sleep(time.Second) } } diff --git a/worker/server_state.go b/worker/server_state.go index 663bccc688d..8ab55b0a24f 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -212,20 +212,28 @@ func (s *ServerState) fillTimestampRequests() { maxDelay = time.Second ) + defer func() { + glog.Infoln("Exiting fillTimestampRequests") + }() + var reqs []tsReq for { // Reset variables. reqs = reqs[:0] delay := initDelay - req := <-s.needTs - slurpLoop: - for { - reqs = append(reqs, req) - select { - case req = <-s.needTs: - default: - break slurpLoop + select { + case <-s.gcCloser.HasBeenClosed(): + return + case req := <-s.needTs: + slurpLoop: + for { + reqs = append(reqs, req) + select { + case req = <-s.needTs: + default: + break slurpLoop + } } } @@ -241,7 +249,10 @@ func (s *ServerState) fillTimestampRequests() { // Execute the request with infinite retries. retry: - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if s.gcCloser.Ctx().Err() != nil { + return + } + ctx, cancel := context.WithTimeout(s.gcCloser.Ctx(), 10*time.Second) ts, err := Timestamps(ctx, num) cancel() if err != nil {