diff --git a/stats/csv/collector.go b/stats/csv/collector.go index 4669d959012..d438c322a39 100644 --- a/stats/csv/collector.go +++ b/stats/csv/collector.go @@ -22,12 +22,13 @@ package csv import ( "bytes" + "compress/gzip" "context" "encoding/csv" "fmt" - "io" "os" "sort" + "strings" "sync" "time" @@ -40,7 +41,7 @@ import ( // Collector saving output to csv implements the lib.Collector interface type Collector struct { - outfile io.WriteCloser + closeFn func() error fname string resTags []string ignoredTags []string @@ -55,13 +56,6 @@ type Collector struct { // Verify that Collector implements lib.Collector var _ lib.Collector = &Collector{} -// Similar to ioutil.NopCloser, but for writers -type nopCloser struct { - io.Writer -} - -func (nopCloser) Close() error { return nil } - // New Creates new instance of CSV collector func New(fs afero.Fs, tags stats.TagSet, config Config) (*Collector, error) { resTags := []string{} @@ -80,32 +74,46 @@ func New(fs afero.Fs, tags stats.TagSet, config Config) (*Collector, error) { fname := config.FileName.String if fname == "" || fname == "-" { - logfile := nopCloser{os.Stdout} + stdoutWriter := csv.NewWriter(os.Stdout) return &Collector{ - outfile: logfile, fname: "-", resTags: resTags, ignoredTags: ignoredTags, - csvWriter: csv.NewWriter(logfile), + csvWriter: stdoutWriter, row: make([]string, 3+len(resTags)+1), saveInterval: saveInterval, + closeFn: func() error { return nil }, }, nil } - logfile, err := fs.Create(fname) + logFile, err := fs.Create(fname) if err != nil { return nil, err } - return &Collector{ - outfile: logfile, + c := Collector{ fname: fname, resTags: resTags, ignoredTags: ignoredTags, - csvWriter: csv.NewWriter(logfile), row: make([]string, 3+len(resTags)+1), saveInterval: saveInterval, - }, nil + } + + if strings.HasSuffix(fname, ".gz") { + outfile := gzip.NewWriter(logFile) + csvWriter := csv.NewWriter(outfile) + c.csvWriter = csvWriter + c.closeFn = func() error { + _ = outfile.Close() + return logFile.Close() + } + } else { + csvWriter := csv.NewWriter(logFile) + c.csvWriter = csvWriter + c.closeFn = logFile.Close + } + + return &c, nil } // Init writes column names to csv file @@ -125,16 +133,19 @@ func (c *Collector) SetRunStatus(status lib.RunStatus) {} // Run just blocks until the context is done func (c *Collector) Run(ctx context.Context) { ticker := time.NewTicker(c.saveInterval) + defer func() { + err := c.closeFn() + if err != nil { + logrus.WithField("filename", c.fname).Errorf("CSV: Error closing the file: %v", err) + } + }() + for { select { case <-ticker.C: - c.WriteToFile() + c.writeToFile() case <-ctx.Done(): - c.WriteToFile() - err := c.outfile.Close() - if err != nil { - logrus.WithField("filename", c.fname).Error("CSV: Error closing the file") - } + c.writeToFile() return } } @@ -149,8 +160,8 @@ func (c *Collector) Collect(scs []stats.SampleContainer) { } } -// WriteToFile Writes samples to the csv file -func (c *Collector) WriteToFile() { +// writeToFile Writes samples to the csv file +func (c *Collector) writeToFile() { c.bufferLock.Lock() samples := c.buffer c.buffer = nil diff --git a/stats/csv/collector_test.go b/stats/csv/collector_test.go index 5520d0f89a6..f1d45ca19d1 100644 --- a/stats/csv/collector_test.go +++ b/stats/csv/collector_test.go @@ -21,8 +21,10 @@ package csv import ( + "compress/gzip" "context" "fmt" + "io/ioutil" "sort" "sync" "testing" @@ -239,62 +241,128 @@ func TestRun(t *testing.T) { wg.Wait() } +func readUnCompressedFile(fileName string, fs afero.Fs) string { + csvbytes, err := afero.ReadFile(fs, fileName) + if err != nil { + return err.Error() + } + + return fmt.Sprintf("%s", csvbytes) +} + +func readCompressedFile(fileName string, fs afero.Fs) string { + file, err := fs.Open(fileName) + if err != nil { + return err.Error() + } + + gzf, err := gzip.NewReader(file) + if err != nil { + return err.Error() + } + + csvbytes, err := ioutil.ReadAll(gzf) + if err != nil { + return err.Error() + } + + return fmt.Sprintf("%s", csvbytes) +} + func TestRunCollect(t *testing.T) { - testSamples := []stats.SampleContainer{ - stats.Sample{ - Time: time.Unix(1562324643, 0), - Metric: stats.New("my_metric", stats.Gauge), - Value: 1, - Tags: stats.NewSampleTags(map[string]string{ - "tag1": "val1", - "tag2": "val2", - "tag3": "val3", - }), + testData := []struct { + samples []stats.SampleContainer + fileName string + fileReaderFunc func(fileName string, fs afero.Fs) string + outputContent string + }{ + { + samples: []stats.SampleContainer{ + stats.Sample{ + Time: time.Unix(1562324643, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + }), + }, + stats.Sample{ + Time: time.Unix(1562324644, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + "tag4": "val4", + }), + }, + }, + fileName: "test", + fileReaderFunc: readUnCompressedFile, + outputContent: "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n" + "my_metric,1562324643,1.000000,val1,val3,\n" + "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n", }, - stats.Sample{ - Time: time.Unix(1562324644, 0), - Metric: stats.New("my_metric", stats.Gauge), - Value: 1, - Tags: stats.NewSampleTags(map[string]string{ - "tag1": "val1", - "tag2": "val2", - "tag3": "val3", - "tag4": "val4", - }), + { + samples: []stats.SampleContainer{ + stats.Sample{ + Time: time.Unix(1562324643, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + }), + }, + stats.Sample{ + Time: time.Unix(1562324644, 0), + Metric: stats.New("my_metric", stats.Gauge), + Value: 1, + Tags: stats.NewSampleTags(map[string]string{ + "tag1": "val1", + "tag2": "val2", + "tag3": "val3", + "tag4": "val4", + }), + }, + }, + fileName: "test.gz", + fileReaderFunc: readCompressedFile, + outputContent: "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n" + "my_metric,1562324643,1.000000,val1,val3,\n" + "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n", }, } - mem := afero.NewMemMapFs() - collector, err := New( - mem, - stats.TagSet{"tag1": true, "tag2": false, "tag3": true}, - Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, - ) - assert.NoError(t, err) - assert.NotNil(t, collector) + for _, data := range testData { + mem := afero.NewMemMapFs() + collector, err := New( + mem, + stats.TagSet{"tag1": true, "tag2": false, "tag3": true}, + Config{FileName: null.StringFrom(data.fileName), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, + ) + assert.NoError(t, err) + assert.NotNil(t, collector) - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - collector.Run(ctx) - wg.Done() - }() - err = collector.Init() - assert.NoError(t, err) - collector.Collect(testSamples) - time.Sleep(1 * time.Second) - cancel() - wg.Wait() - csvbytes, _ := afero.ReadFile(mem, "path") - csvstr := fmt.Sprintf("%s", csvbytes) - assert.Equal(t, - "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n"+ - "my_metric,1562324643,1.000000,val1,val3,\n"+ - "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n", - csvstr) -} + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + collector.Run(ctx) + wg.Done() + }() + err = collector.Init() + assert.NoError(t, err) + collector.Collect(data.samples) + time.Sleep(1 * time.Second) + cancel() + wg.Wait() + + assert.Equal(t, data.outputContent, data.fileReaderFunc(data.fileName, mem)) + } +} func TestNew(t *testing.T) { configs := []struct { cfg Config @@ -308,6 +376,14 @@ func TestNew(t *testing.T) { "tag3": true, }, }, + { + cfg: Config{FileName: null.StringFrom("name.csv.gz"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, + tags: stats.TagSet{ + "tag1": true, + "tag2": false, + "tag3": true, + }, + }, { cfg: Config{FileName: null.StringFrom("-"), SaveInterval: types.NewNullDuration(time.Duration(1), true)}, tags: stats.TagSet{ @@ -326,6 +402,7 @@ func TestNew(t *testing.T) { fname string resTags []string ignoredTags []string + closeFn func() error }{ { fname: "name", @@ -336,6 +413,15 @@ func TestNew(t *testing.T) { "tag2", }, }, + { + fname: "name.csv.gz", + resTags: []string{ + "tag1", "tag3", + }, + ignoredTags: []string{ + "tag2", + }, + }, { fname: "-", resTags: []string{ @@ -365,6 +451,7 @@ func TestNew(t *testing.T) { sort.Strings(expected.ignoredTags) sort.Strings(collector.ignoredTags) assert.Equal(t, expected.ignoredTags, collector.ignoredTags) + assert.NoError(t, collector.closeFn()) }) } }