Skip to content

Commit

Permalink
release/v20.03 - Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313) (#…
Browse files Browse the repository at this point in the history
…6403)

* Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313)

This PR removes the usage of context.Background() in groups.go and ensures that all the various goroutines exit as intended.

Changes:
* Shutdown SubscribeForUpdates correctly.
* Fix up all the closing conditions.
* Consolidate updaters into one closer
* Update Badger to master
* fix(build): Update ResetAcl args for OSS build.
* chore: Remove TODO comment.

Co-authored-by: Daniel Mai <[email protected]>
(cherry picked from commit f1941b3)
  • Loading branch information
Ibrahim Jarif authored Sep 4, 2020
1 parent 69f8cd7 commit bdcc422
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 93 deletions.
23 changes: 17 additions & 6 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,14 +711,13 @@ func run() {
}
}()

// Setup external communication.
aclCloser := z.NewCloser(1)
updaters := z.NewCloser(2)
go func() {
worker.StartRaftNodes(worker.State.WALstore, bindall)
// initialization of the admin account can only be done after raft nodes are running
// and health check passes
edgraph.ResetAcl()
edgraph.RefreshAcls(aclCloser)
edgraph.ResetAcl(updaters)
edgraph.RefreshAcls(updaters)
}()

// Graphql subscribes to alpha to get schema updates. We need to close that before we
Expand All @@ -727,11 +726,23 @@ func run() {

setupServer(adminCloser)
glog.Infoln("GRPC and HTTP stopped.")
aclCloser.SignalAndWait()

// This might not close until group is given the signal to close. So, only signal here,
// wait for it after group is closed.
updaters.Signal()

worker.BlockingStop()
glog.Infoln("worker stopped.")

adminCloser.SignalAndWait()
glog.Info("Disposing server state.")
glog.Infoln("adminCloser closed.")

worker.State.Dispose()
x.RemoveCidFile()
glog.Info("worker.State disposed.")

updaters.Wait()
glog.Infoln("updaters closed.")

glog.Infoln("Server shutdown. Bye!")
}
7 changes: 6 additions & 1 deletion dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,5 +376,10 @@ func run() {

glog.Infoln("Running Dgraph Zero...")
st.zero.closer.Wait()
glog.Infoln("All done.")
glog.Infoln("Closer closed.")

err = kv.Close()
glog.Infof("Badger closed with err: %v\n", err)

glog.Infoln("All done. Goodbye!")
}
2 changes: 1 addition & 1 deletion edgraph/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Server) Login(ctx context.Context,
}

// ResetAcl is an empty method since ACL is only supported in the enterprise version.
func ResetAcl() {
func ResetAcl(closer *z.Closer) {
// do nothing
}

Expand Down
30 changes: 16 additions & 14 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,6 @@ func RefreshAcls(closer *z.Closer) {
return
}

ticker := time.NewTicker(worker.Config.AclRefreshInterval)
defer ticker.Stop()

// retrieve the full data set of ACLs from the corresponding alpha server, and update the
// aclCachePtr
retrieveAcls := func() error {
Expand All @@ -313,9 +310,7 @@ func RefreshAcls(closer *z.Closer) {
ReadOnly: true,
}

ctx := context.Background()
var err error
queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize)
queryResp, err := (&Server{}).doQuery(closer.Ctx(), &queryRequest, NoAuthorize)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
Expand All @@ -329,14 +324,16 @@ func RefreshAcls(closer *z.Closer) {
return nil
}

ticker := time.NewTicker(worker.Config.AclRefreshInterval)
defer ticker.Stop()
for {
select {
case <-closer.HasBeenClosed():
return
case <-ticker.C:
if err := retrieveAcls(); err != nil {
glog.Errorf("Error while retrieving acls:%v", err)
glog.Errorf("Error while retrieving acls: %v", err)
}
case <-closer.HasBeenClosed():
return
}
}
}
Expand All @@ -354,7 +351,12 @@ const queryAcls = `
`

// ResetAcl clears the aclCachePtr and upserts the Groot account.
func ResetAcl() {
func ResetAcl(closer *z.Closer) {
defer func() {
glog.Infof("ResetAcl closed")
closer.Done()
}()

if len(worker.Config.HmacSecret) == 0 {
// The acl feature is not turned on.
return
Expand Down Expand Up @@ -421,8 +423,8 @@ func ResetAcl() {
return nil
}

for {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
if err := upsertGuardians(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
Expand All @@ -432,8 +434,8 @@ func ResetAcl() {
break
}

for {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
if err := upsertGroot(ctx); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
Expand Down
5 changes: 3 additions & 2 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
_, err := query.ApplyMutations(ctx, m)

// recreate the admin account after a drop all operation
ResetAcl()
ResetAcl(nil)
return empty, err
}

Expand All @@ -191,7 +192,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
_, err := query.ApplyMutations(ctx, m)

// recreate the admin account after a drop data operation
ResetAcl()
ResetAcl(nil)
return empty, err
}

Expand Down
Loading

0 comments on commit bdcc422

Please sign in to comment.