Skip to content

Commit

Permalink
fix: panic send on closed channel in reader pkg (#220)
Browse files Browse the repository at this point in the history
* fix: panic send on closed channel in reader pkg
issues/219

* add log

fix #219 
fix #95
  • Loading branch information
veezhang authored Dec 1, 2022
1 parent 2429eec commit 7e2bd76
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/client/clientmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func NewNebulaClientMgr(settings *config.NebulaClientSettings, statsCh chan<- ba
}

func (m *NebulaClientMgr) Close() {
m.runnerLogger.Infof("Client manager closing")
m.pool.Close()
m.runnerLogger.Infof("Client manager closed")
}

func (m *NebulaClientMgr) GetRequestChans() []chan base.ClientRequest {
Expand Down
21 changes: 21 additions & 0 deletions pkg/cmd/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"errors"
"fmt"
"sync"

"github.com/vesoft-inc/nebula-importer/pkg/base"
"github.com/vesoft-inc/nebula-importer/pkg/client"
Expand Down Expand Up @@ -57,6 +58,7 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {

freaders := make([]*reader.FileReader, len(yaml.Files))

var wgReaders sync.WaitGroup
for i, file := range yaml.Files {
errCh, err := errHandler.Init(file, clientMgr.GetNumConnections(), *yaml.RemoveTempFiles, runnerLogger)
if err != nil {
Expand All @@ -70,7 +72,13 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
statsMgr.StatsCh <- base.NewFileDoneStats(*file.Path)
continue
} else {
runnerLogger.Infof("Start to read %s", *file.Path)
wgReaders.Add(1)
go func(fr *reader.FileReader, filename string) {
defer func() {
runnerLogger.Infof("Finish to read %s", filename)
wgReaders.Done()
}()
numReadFailed, err := fr.Read()
statsMgr.Stats.NumReadFailed += numReadFailed
if err != nil {
Expand All @@ -85,7 +93,20 @@ func (r *Runner) Run(yaml *config.YAMLConfig) {
r.Readers = freaders
r.stataMgr = statsMgr

runnerLogger.Infof("Waiting for stats manager done")
<-statsMgr.DoneCh
runnerLogger.Infof("Waiting for all readers exit")
for _, r := range freaders {
if r != nil {
r.Stop()
}
}
// fix issues/219
// The number of times `statsMgr.StatsCh <- base.NewFileDoneStats(filename)` has reached the number of readers,
// then <-statsMgr.DoneCh return, but not all readers have exited.
// So, it's need to wait for it exit.
wgReaders.Wait()
runnerLogger.Infof("All readers exited")

r.stataMgr.CountFileBytes(r.Readers)
r.Readers = nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/stats/statsmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func NewStatsMgr(files []*config.File, runnerLogger *logger.RunnerLogger) *Stats
}

func (s *StatsMgr) Close() {
s.runnerLogger.Infof("Stats manager closing")
close(s.StatsCh)
close(s.DoneCh)
close(s.OutputStatsCh)
s.Done = true
s.runnerLogger.Infof("Stats manager closed")
}

func (s *StatsMgr) updateStat(stat base.Stats) {
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *StatsMgr) startWorker(numReadingFiles int) {
case base.FILEDONE:
s.print(fmt.Sprintf("Done(%s)", stat.Filename), now)
numReadingFiles--
s.runnerLogger.Infof("Remaining read files %d", numReadingFiles)
if numReadingFiles == 0 {
s.DoneCh <- true
}
Expand Down

0 comments on commit 7e2bd76

Please sign in to comment.