Skip to content

Commit

Permalink
Do not block Store.Start while waiting for the first gossip.
Browse files Browse the repository at this point in the history
In tests related to removed replicas, the first gossip run will block
while attempting to acquire the lease until the range GC queue can
determine that the range has been removed.
  • Loading branch information
bdarnell committed May 8, 2015
1 parent 67ad8d8 commit d13eba7
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
1 change: 1 addition & 0 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (m *multiTestContext) addStore(t *testing.T) {
if err := store.Start(stopper); err != nil {
t.Fatal(err)
}
store.WaitForInit()
m.stores = append(m.stores, store)
if len(m.senders) == idx {
m.senders = append(m.senders, kv.NewLocalSender())
Expand Down
1 change: 1 addition & 0 deletions storage/range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (tc *testContext) Start(t testing.TB) {
if err := tc.store.Start(tc.stopper); err != nil {
t.Fatal(err)
}
tc.store.WaitForInit()
}

initConfigs(tc.engine, t)
Expand Down
31 changes: 22 additions & 9 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ type Store struct {
stopper *util.Stopper
startedAt int64
nodeDesc *proto.NodeDescriptor
initComplete sync.WaitGroup // Signaled by async init tasks

mu sync.RWMutex // Protects variables below...
ranges map[int64]*Range // Map of ranges by Raft ID
Expand Down Expand Up @@ -493,20 +494,27 @@ func (s *Store) Start(stopper *util.Stopper) error {
return nil
}

// WaitForInit waits for any asynchronous processes begun in Start()
// to complete their initialization. In particular, this includes
// gossiping. In some cases this may block until the range GC queue
// has completed its scan. Only for testing.
func (s *Store) WaitForInit() {
s.initComplete.Wait()
}

// startGossip runs an infinite loop in a goroutine which regularly checks
// whether the store has a first range or config replica and asks those ranges
// to gossip accordingly.
func (s *Store) startGossip() error {
// Go through one iteration synchronously before returning. This makes sure
// that everything is gossiped when the store finishes starting.
if err := s.maybeGossipConfigs(); err != nil {
return err
}
if err := s.maybeGossipFirstRange(); err != nil {
return err
}
// Periodic updates run in a goroutine.
// Periodic updates run in a goroutine and signal a WaitGroup upon completion
// of their first iteration.
s.initComplete.Add(1)
s.stopper.RunWorker(func() {
// Run the first time without waiting for the Ticker and signal the WaitGroup.
if err := s.maybeGossipFirstRange(); err != nil {
log.Warningf("error gossiping first range data: %s", err)
}
s.initComplete.Done()
ticker := time.NewTicker(clusterIDGossipInterval)
for {
select {
Expand All @@ -520,7 +528,12 @@ func (s *Store) startGossip() error {
}
})

s.initComplete.Add(1)
s.stopper.RunWorker(func() {
if err := s.maybeGossipConfigs(); err != nil {
log.Warningf("error gossiping configs: %s", err)
}
s.initComplete.Done()
ticker := time.NewTicker(configGossipInterval)
for {
select {
Expand Down
1 change: 1 addition & 0 deletions storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func createTestStore(t *testing.T) (*Store, *hlc.ManualClock, *util.Stopper) {
if err := store.Start(stopper); err != nil {
t.Fatal(err)
}
store.WaitForInit()
return store, manual, stopper
}

Expand Down

0 comments on commit d13eba7

Please sign in to comment.