From 5e8cf02d4dcf932b5fc40cf65ee7dc2c1ce49553 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Sat, 5 Sep 2020 03:00:15 +0530 Subject: [PATCH] release/v1.2 - Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313) (#6406) * Fix(Alpha): MASA: Make Alpha Shutdown Again (#6313) This PR removes the usage of context.Background() in groups.go and ensures that all the various goroutines exit as intended. Changes: * Shutdown SubscribeForUpdates correctly. * Fix up all the closing conditions. * Consolidate updaters into one closer * Update Badger to master * fix(build): Update ResetAcl args for OSS build. * chore: Remove TODO comment. Co-authored-by: Daniel Mai (cherry picked from commit f1941b3c072971fb1112569879afc3dd80621f70) --- dgraph/cmd/alpha/run.go | 21 +++++--- dgraph/cmd/zero/run.go | 7 ++- edgraph/access.go | 2 +- edgraph/access_ee.go | 35 +++++++------ edgraph/server.go | 4 +- go.sum | 9 ---- graphql/admin/admin.go | 10 ++-- worker/groups.go | 113 +++++++++++++++++++++++++--------------- worker/server_state.go | 29 +++++++---- 9 files changed, 142 insertions(+), 88 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index ca389591160..f57766d9eb4 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -707,23 +707,32 @@ func run() { } }() - // Setup external communication. - aclCloser := z.NewCloser(1) + updaters := z.NewCloser(2) go func() { worker.StartRaftNodes(worker.State.WALstore, bindall) atomic.AddUint32(&initDone, 1) // initialization of the admin account can only be done after raft nodes are running // and health check passes - edgraph.ResetAcl() - edgraph.RefreshAcls(aclCloser) + edgraph.ResetAcl(updaters) + edgraph.RefreshAcls(updaters) }() setupServer() glog.Infoln("GRPC and HTTP stopped.") - aclCloser.SignalAndWait() + + // This might not close until group is given the signal to close. So, only signal here, + // wait for it after group is closed. + updaters.Signal() + worker.BlockingStop() - glog.Info("Disposing server state.") + glog.Infoln("worker stopped.") + worker.State.Dispose() + glog.Info("worker.State disposed.") + + updaters.Wait() + glog.Infoln("updaters closed.") + glog.Infoln("Server shutdown. Bye!") } diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index c887e96a221..bbf0841bd6c 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -347,5 +347,10 @@ func run() { glog.Infoln("Running Dgraph Zero...") st.zero.closer.Wait() - glog.Infoln("All done.") + glog.Infoln("Closer closed.") + + err = kv.Close() + glog.Infof("Badger closed with err: %v\n", err) + + glog.Infoln("All done. Goodbye!") } diff --git a/edgraph/access.go b/edgraph/access.go index 098941621e3..5d67ca131b0 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -42,7 +42,7 @@ func (s *Server) Login(ctx context.Context, } // ResetAcl is an empty method since ACL is only supported in the enterprise version. -func ResetAcl() { +func ResetAcl(closer *z.Closer) { // do nothing } diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index 2841c329f26..85183c25e97 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -297,15 +297,15 @@ func authorizeUser(ctx context.Context, userid string, password string) ( // RefreshAcls queries for the ACL triples and refreshes the ACLs accordingly. func RefreshAcls(closer *z.Closer) { - defer closer.Done() + defer func() { + glog.Infoln("RefreshAcls closed") + closer.Done() + }() if len(worker.Config.HmacSecret) == 0 { // the acl feature is not turned on return } - ticker := time.NewTicker(worker.Config.AclRefreshInterval) - defer ticker.Stop() - // retrieve the full data set of ACLs from the corresponding alpha server, and update the // aclCachePtr retrieveAcls := func() error { @@ -315,9 +315,7 @@ func RefreshAcls(closer *z.Closer) { ReadOnly: true, } - ctx := context.Background() - var err error - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize) + queryResp, err := (&Server{}).doQuery(closer.Ctx(), &queryRequest, NoAuthorize) if err != nil { return errors.Errorf("unable to retrieve acls: %v", err) } @@ -331,14 +329,16 @@ func RefreshAcls(closer *z.Closer) { return nil } + ticker := time.NewTicker(worker.Config.AclRefreshInterval) + defer ticker.Stop() for { select { - case <-closer.HasBeenClosed(): - return case <-ticker.C: if err := retrieveAcls(); err != nil { - glog.Errorf("Error while retrieving acls:%v", err) + glog.Errorf("Error while retrieving acls: %v", err) } + case <-closer.HasBeenClosed(): + return } } } @@ -353,7 +353,12 @@ const queryAcls = ` ` // ResetAcl clears the aclCachePtr and upserts the Groot account. -func ResetAcl() { +func ResetAcl(closer *z.Closer) { + defer func() { + glog.Infof("ResetAcl closed") + closer.Done() + }() + if len(worker.Config.HmacSecret) == 0 { // The acl feature is not turned on. return @@ -420,8 +425,8 @@ func ResetAcl() { return nil } - for { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) defer cancel() if err := upsertGuardians(ctx); err != nil { glog.Infof("Unable to upsert the guardian group. Error: %v", err) @@ -431,8 +436,8 @@ func ResetAcl() { break } - for { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + for closer.Ctx().Err() == nil { + ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute) defer cancel() if err := upsertGroot(ctx); err != nil { glog.Infof("Unable to upsert the groot account. Error: %v", err) diff --git a/edgraph/server.go b/edgraph/server.go index 0be364fa451..7d7e139fbb7 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -130,7 +130,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er _, err := query.ApplyMutations(ctx, m) // recreate the admin account after a drop all operation - ResetAcl() + ResetAcl(nil) return empty, err } @@ -143,7 +143,7 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er _, err := query.ApplyMutations(ctx, m) // recreate the admin account after a drop data operation - ResetAcl() + ResetAcl(nil) return empty, err } diff --git a/go.sum b/go.sum index e086768e12f..5366ae15d82 100644 --- a/go.sum +++ b/go.sum @@ -6,7 +6,6 @@ contrib.go.opencensus.io/exporter/prometheus v0.1.0 h1:SByaIoWwNgMdPSgl5sMqM2KDE contrib.go.opencensus.io/exporter/prometheus v0.1.0/go.mod h1:cGFniUXGZlKRjzOyuZJ6mgB+PgBcCIa79kEKR8YCW+A= github.com/99designs/gqlgen v0.10.1 h1:1BgB6XKGTHq7uH4G1/PYyKe2Kz7/vw3AlvMZlD3TEEY= github.com/99designs/gqlgen v0.10.1/go.mod h1:IviubpnyI4gbBcj8IcxSSc/Q/+af5riwCmJmwF0uaPE= -github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DATA-DOG/go-sqlmock v1.3.2 h1:2L2f5t3kKnCLxnClDD/PrDfExFFa1wjESgxHG/B1ibo= @@ -63,17 +62,10 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger v1.6.1 h1:w9pSFNSdq/JPM1N12Fz/F/bzo993Is1W+Q7HjPzi7yg= -github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlNV5bjgnuU= github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 h1:DUDFTVgqZysKplH39/ya0aI4+zGm91L9QttXgITT2YE= github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= -github.com/dgraph-io/dgo v1.0.0 h1:DRuI66G+j0XWDOXly4v5PSk2dGkbIopAZIirRjq7lzI= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI= -github.com/dgraph-io/dgo/v200 v200.0.0-20200825025457-a38d5eaacbf8 h1:twtbiz+2PsuJEZWP+WGYdJEtD/NW1d7T5m3EN9JSBXI= -github.com/dgraph-io/dgo/v200 v200.0.0-20200825025457-a38d5eaacbf8/go.mod h1:Co+FwJrnndSrPORO8Gdn20dR7FPTfmXr0W/su0Ve/Ig= -github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= -github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 h1:ectpJv2tGhTudyk0JhqE/53o/ObH30u5yt/yThsAn3I= github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= @@ -159,7 +151,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.0.0/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= diff --git a/graphql/admin/admin.go b/graphql/admin/admin.go index 239174768ea..eeabe3d34f9 100644 --- a/graphql/admin/admin.go +++ b/graphql/admin/admin.go @@ -35,6 +35,7 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -130,7 +131,7 @@ type adminServer struct { // NewServers initializes the GraphQL servers. It sets up an empty server for the // main /graphql endpoint and an admin server. The result is mainServer, adminServer. -func NewServers(withIntrospection bool) (web.IServeGraphQL, web.IServeGraphQL) { +func NewServers(withIntrospection bool, closer *z.Closer) (web.IServeGraphQL, web.IServeGraphQL) { gqlSchema, err := schema.FromString("") if err != nil { @@ -146,7 +147,7 @@ func NewServers(withIntrospection bool) (web.IServeGraphQL, web.IServeGraphQL) { Urw: resolve.NewUpdateRewriter, Drw: resolve.NewDeleteRewriter(), } - adminResolvers := newAdminResolver(mainServer, fns, withIntrospection) + adminResolvers := newAdminResolver(mainServer, fns, withIntrospection, closer) adminServer := web.NewServer(adminResolvers) return mainServer, adminServer @@ -156,7 +157,8 @@ func NewServers(withIntrospection bool) (web.IServeGraphQL, web.IServeGraphQL) { func newAdminResolver( gqlServer web.IServeGraphQL, fns *resolve.ResolverFns, - withIntrospection bool) *resolve.RequestResolver { + withIntrospection bool, + closer *z.Closer) *resolve.RequestResolver { adminSchema, err := schema.FromString(graphqlAdminSchema) if err != nil { @@ -230,7 +232,7 @@ func newAdminResolver( server.schema = newSchema server.status = healthy server.resetSchema(gqlSchema) - }, 1) + }, 1, closer) go server.initServer() diff --git a/worker/groups.go b/worker/groups.go index 2b28b36c038..239ecd08aa6 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -43,9 +43,6 @@ import ( type groupi struct { x.SafeMutex - // TODO: Is this context being used? - ctx context.Context - cancel context.CancelFunc state *pb.MembershipState Node *node gid uint32 @@ -74,8 +71,6 @@ func groups() *groupi { // This function triggers RAFT nodes to be created, and is the entrance to the RAFT // world from main.go. func StartRaftNodes(walStore *badger.DB, bindall bool) { - gr.ctx, gr.cancel = context.WithCancel(context.Background()) - if x.WorkerConfig.MyAddr == "" { x.WorkerConfig.MyAddr = fmt.Sprintf("localhost:%d", workerPort()) } else { @@ -124,7 +119,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { continue } zc := pb.NewZeroClient(pl.Get()) - connState, err = zc.Connect(gr.ctx, m) + connState, err = zc.Connect(gr.Ctx(), m) if err == nil || x.ShouldCrash(err) { break } @@ -164,6 +159,14 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) { gr.proposeInitialSchema() } +func (g *groupi) Ctx() context.Context { + return g.closer.Ctx() +} + +func (g *groupi) IsClosed() bool { + return g.closer.Ctx().Err() != nil +} + func (g *groupi) informZeroAboutTablets() { // Before we start this Alpha, let's pick up all the predicates we have in our postings // directory, and ask Zero if we are allowed to serve it. Do this irrespective of whether @@ -216,7 +219,7 @@ func (g *groupi) upsertSchema(sch *pb.SchemaUpdate) { // Propose schema mutation. var m pb.Mutations // schema for a reserved predicate is not changed once set. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) ts, err := Timestamps(ctx, &pb.Num{Val: 1}) cancel() if err != nil { @@ -230,7 +233,7 @@ func (g *groupi) upsertSchema(sch *pb.SchemaUpdate) { // This would propose the schema mutation and make sure some node serves this predicate // and has the schema defined above. for { - _, err := MutateOverNetwork(gr.ctx, &m) + _, err := MutateOverNetwork(gr.Ctx(), &m) if err == nil { break } @@ -347,8 +350,7 @@ func (g *groupi) applyState(state *pb.MembershipState) { } } - if err := g.Node.ProposePeerRemoval( - context.Background(), member.GetId()); err != nil { + if err := g.Node.ProposePeerRemoval(g.Ctx(), member.GetId()); err != nil { glog.Errorf("Error while proposing node removal: %+v", err) } }() @@ -415,7 +417,7 @@ func (g *groupi) BelongsToReadOnly(key string, ts uint64) (uint32, error) { Predicate: key, ReadOnly: true, } - out, err := zc.ShouldServe(context.Background(), tablet) + out, err := zc.ShouldServe(g.Ctx(), tablet) if err != nil { glog.Errorf("Error while ShouldServe grpc call %v", err) return 0, err @@ -459,7 +461,7 @@ func (g *groupi) Tablet(key string) (*pb.Tablet, error) { zc := pb.NewZeroClient(pl.Get()) tablet = &pb.Tablet{GroupId: g.groupId(), Predicate: key} - out, err := zc.ShouldServe(context.Background(), tablet) + out, err := zc.ShouldServe(g.Ctx(), tablet) if err != nil { glog.Errorf("Error while ShouldServe grpc call %v", err) return nil, err @@ -613,7 +615,7 @@ func (g *groupi) connToZeroLeader() *conn.Pool { glog.V(1).Infof("No healthy Zero leader found. Trying to find a Zero leader...") getLeaderConn := func(zc pb.ZeroClient) *conn.Pool { - ctx, cancel := context.WithTimeout(g.ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) defer cancel() connState, err := zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true}) @@ -633,6 +635,10 @@ func (g *groupi) connToZeroLeader() *conn.Pool { delay := connBaseDelay maxHalfDelay := time.Second for i := 0; ; i++ { // Keep on retrying. See: https://github.com/dgraph-io/dgraph/issues/2289 + if g.IsClosed() { + return nil + } + time.Sleep(delay) if delay <= maxHalfDelay { delay *= 2 @@ -685,7 +691,7 @@ func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error { return errNoConnection } c := pb.NewZeroClient(pl.Get()) - ctx, cancel := context.WithTimeout(g.ctx, 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) defer cancel() reply, err := c.UpdateMembership(ctx, group) if err != nil { @@ -700,7 +706,10 @@ func (g *groupi) doSendMembership(tablets map[string]*pb.Tablet) error { // sendMembershipUpdates sends the membership update to Zero leader. If this Alpha is the leader, it // would also calculate the tablet sizes and send them to Zero. func (g *groupi) sendMembershipUpdates() { - defer g.closer.Done() // CLOSER:1 + defer func() { + glog.Infoln("Closing sendMembershipUpdates") + g.closer.Done() // CLOSER:1 + }() ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -744,7 +753,10 @@ func (g *groupi) sendMembershipUpdates() { // connection which tells Alpha about the state of the cluster, including the latest Zero leader. // All the other connections to Zero, are only made only to the leader. func (g *groupi) receiveMembershipUpdates() { - defer g.closer.Done() // CLOSER:1 + defer func() { + glog.Infoln("Closing receiveMembershipUpdates") + g.closer.Done() // CLOSER:1 + }() ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() @@ -766,7 +778,7 @@ START: glog.Infof("Got address of a Zero leader: %s", pl.Addr) c := pb.NewZeroClient(pl.Get()) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(g.Ctx()) stream, err := c.StreamMembership(ctx, &api.Payload{}) if err != nil { cancel() @@ -839,7 +851,10 @@ OUTER: // processOracleDeltaStream is used to process oracle delta stream from Zero. // Zero sends information about aborted/committed transactions and maxPending. func (g *groupi) processOracleDeltaStream() { - defer g.closer.Done() // CLOSER:1 + defer func() { + glog.Infoln("Closing processOracleDeltaStream") + g.closer.Done() // CLOSER:1 + }() ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -851,6 +866,9 @@ func (g *groupi) processOracleDeltaStream() { pl := g.connToZeroLeader() if pl == nil { glog.Warningln("Oracle delta stream: No Zero leader known.") + if g.IsClosed() { + return + } time.Sleep(time.Second) return } @@ -861,8 +879,9 @@ func (g *groupi) processOracleDeltaStream() { // batching. Once a batch is created, it gets proposed. Thus, we can reduce the number of // times proposals happen, which is a great optimization to have (and a common one in our // code base). - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(g.Ctx()) defer cancel() + c := pb.NewZeroClient(pl.Get()) stream, err := c.Oracle(ctx, &api.Payload{}) if err != nil { @@ -976,7 +995,7 @@ func (g *groupi) processOracleDeltaStream() { for { // Block forever trying to propose this. Also this proposal should not be counted // towards num pending proposals and be proposed right away. - err := g.Node.proposeAndWait(context.Background(), &pb.Proposal{Delta: delta}) + err := g.Node.proposeAndWait(g.Ctx(), &pb.Proposal{Delta: delta}) if err == nil { break } @@ -1007,30 +1026,25 @@ func EnterpriseEnabled() bool { } g := groups() if g.state == nil { - return askZeroForEE() + return g.askZeroForEE() } g.RLock() defer g.RUnlock() return g.state.GetLicense().GetEnabled() } -func askZeroForEE() bool { +func (g *groupi) askZeroForEE() bool { var err error var connState *pb.ConnectionState - grp := &groupi{} - createConn := func() bool { - grp.ctx, grp.cancel = context.WithCancel(context.Background()) - defer grp.cancel() - - pl := grp.connToZeroLeader() + pl := g.connToZeroLeader() if pl == nil { return false } zc := pb.NewZeroClient(pl.Get()) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(g.Ctx(), 10*time.Second) defer cancel() connState, err = zc.Connect(ctx, &pb.Member{ClusterInfoOnly: true}) @@ -1046,7 +1060,7 @@ func askZeroForEE() bool { return false } - for { + for !g.IsClosed() { if createConn() { break } @@ -1056,34 +1070,51 @@ func askZeroForEE() bool { } // SubscribeForUpdates will listen for updates for the given group. -func SubscribeForUpdates(prefixes [][]byte, cb func(kvs *badgerpb.KVList), group uint32) { - for { +func SubscribeForUpdates(prefixes [][]byte, cb func(kvs *badgerpb.KVList), + group uint32, closer *z.Closer) { + + var prefix []byte + if len(prefixes) > 0 { + prefix = prefixes[0] + } + defer func() { + glog.Infof("SubscribeForUpdates closing for prefix: %q\n", prefix) + closer.Done() + }() + + listen := func() error { // Connect to any of the group 1 nodes. members := groups().AnyTwoServers(group) // There may be a lag while starting so keep retrying. if len(members) == 0 { - continue + return fmt.Errorf("Unable to find any servers for group: %d", group) } pool := conn.GetPools().Connect(members[0]) client := pb.NewWorkerClient(pool.Get()) // Get Subscriber stream. - stream, err := client.Subscribe(context.Background(), &pb.SubscriptionRequest{ - Prefixes: prefixes, - }) + stream, err := client.Subscribe(closer.Ctx(), &pb.SubscriptionRequest{Prefixes: prefixes}) if err != nil { - glog.Errorf("Error from alpha client subscribe: %v", err) - continue + return errors.Wrapf(err, "error from client.subscribe") } - receiver: for { // Listen for updates. kvs, err := stream.Recv() if err != nil { - glog.Errorf("Error from worker subscribe stream: %v", err) - break receiver + return errors.Wrapf(err, "while receiving from stream") } cb(kvs) } } + + for { + if err := listen(); err != nil { + glog.Errorf("Error during SubscribeForUpdates for prefix %q: %v. closer err: %v\n", + prefix, err, closer.Ctx().Err()) + } + if closer.Ctx().Err() != nil { + return + } + time.Sleep(time.Second) + } } diff --git a/worker/server_state.go b/worker/server_state.go index 792c46546ef..9694ff916ed 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -212,20 +212,28 @@ func (s *ServerState) fillTimestampRequests() { maxDelay = time.Second ) + defer func() { + glog.Infoln("Exiting fillTimestampRequests") + }() + var reqs []tsReq for { // Reset variables. reqs = reqs[:0] delay := initDelay - req := <-s.needTs - slurpLoop: - for { - reqs = append(reqs, req) - select { - case req = <-s.needTs: - default: - break slurpLoop + select { + case <-s.gcCloser.HasBeenClosed(): + return + case req := <-s.needTs: + slurpLoop: + for { + reqs = append(reqs, req) + select { + case req = <-s.needTs: + default: + break slurpLoop + } } } @@ -241,7 +249,10 @@ func (s *ServerState) fillTimestampRequests() { // Execute the request with infinite retries. retry: - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if s.gcCloser.Ctx().Err() != nil { + return + } + ctx, cancel := context.WithTimeout(s.gcCloser.Ctx(), 10*time.Second) ts, err := Timestamps(ctx, num) cancel() if err != nil {