Skip to content

Commit

Permalink
rule: Fix panic on send on closed channel (#225)
Browse files Browse the repository at this point in the history
Closes #224
  • Loading branch information
hypnoglow authored and aeneasr committed Jul 23, 2019
1 parent 8f4a518 commit 2112ab6
Showing 1 changed file with 22 additions and 17 deletions.
39 changes: 22 additions & 17 deletions rule/fetcher_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type FetcherDefault struct {
watching []url.URL

lock sync.Mutex
wg sync.WaitGroup
}

func NewFetcherDefault(
Expand Down Expand Up @@ -119,9 +120,7 @@ func (f *FetcherDefault) configUpdate(ctx context.Context, watcher *fsnotify.Wat
}
}
for _, source := range replace {
go func(s url.URL) {
events <- event{et: eventFileChanged, path: s, source: "config_update"}
}(source)
f.enqueueEvent(events, event{et: eventFileChanged, path: source, source: "config_update"})
}
return nil
}
Expand Down Expand Up @@ -162,8 +161,13 @@ func (f *FetcherDefault) Watch(ctx context.Context) error {
defer watcher.Close()

events := make(chan event)
defer close(events)
return f.watch(ctx, watcher, events)
err = f.watch(ctx, watcher, events)

// Close the channel only when all child goroutines exit
f.wg.Wait()
close(events)

return err
}

func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, events chan event) error {
Expand All @@ -176,16 +180,12 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
return nil
}

go func() {
events <- event{et: eventRepositoryConfigChange, source: "viper_watcher"}
}()
f.enqueueEvent(events, event{et: eventRepositoryConfigChange, source: "viper_watcher"})

return nil
})

go func() {
events <- event{et: eventRepositoryConfigChange, source: "entrypoint"}
}()
f.enqueueEvent(events, event{et: eventRepositoryConfigChange, source: "entrypoint"})

for {
select {
Expand All @@ -199,9 +199,7 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
f.r.Logger().
Debugf("Detected that a access rule repository file has been removed, reloading config.")
// If a file was removed it's likely that the config changed as well - reload!
go func() {
events <- event{et: eventRepositoryConfigChange, source: "fsnotify_remove"}
}()
f.enqueueEvent(events, event{et: eventRepositoryConfigChange, source: "fsnotify_remove"})
continue
}

Expand All @@ -216,9 +214,7 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
WithField("op", e.Op.String()).
Debugf("Detected access rule repository file change.")

go func() {
events <- event{et: eventFileChanged, path: *source, source: "fsnotify_update"}
}()
f.enqueueEvent(events, event{et: eventFileChanged, path: *source, source: "fsnotify_update"})
case e, ok := <-events:
if !ok {
// channel was closed
Expand Down Expand Up @@ -255,6 +251,15 @@ func (f *FetcherDefault) watch(ctx context.Context, watcher *fsnotify.Watcher, e
}
}

func (f *FetcherDefault) enqueueEvent(events chan event, evt event) {
f.wg.Add(1)
go func() {
defer f.wg.Done()

events <- evt
}()
}

func (f *FetcherDefault) fetch(source url.URL) ([]Rule, error) {
switch source.Scheme {
case "http":
Expand Down

0 comments on commit 2112ab6

Please sign in to comment.