Skip to content

Commit

Permalink
Add support for gzip in csv output format
Browse files Browse the repository at this point in the history
Refactor:

- Add closeFn for CSV collector
- Made writeToFile protected

Signed-off-by: thejas <[email protected]>
  • Loading branch information
thejasbabu authored and imiric committed Aug 6, 2020
1 parent 8f4797d commit b83dbbd
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 74 deletions.
61 changes: 36 additions & 25 deletions stats/csv/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ package csv

import (
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"time"

Expand All @@ -40,7 +41,7 @@ import (

// Collector saving output to csv implements the lib.Collector interface
type Collector struct {
outfile io.WriteCloser
closeFn func() error
fname string
resTags []string
ignoredTags []string
Expand All @@ -55,13 +56,6 @@ type Collector struct {
// Verify that Collector implements lib.Collector
var _ lib.Collector = &Collector{}

// Similar to ioutil.NopCloser, but for writers
type nopCloser struct {
io.Writer
}

func (nopCloser) Close() error { return nil }

// New Creates new instance of CSV collector
func New(fs afero.Fs, tags stats.TagSet, config Config) (*Collector, error) {
resTags := []string{}
Expand All @@ -80,32 +74,46 @@ func New(fs afero.Fs, tags stats.TagSet, config Config) (*Collector, error) {
fname := config.FileName.String

if fname == "" || fname == "-" {
logfile := nopCloser{os.Stdout}
stdoutWriter := csv.NewWriter(os.Stdout)
return &Collector{
outfile: logfile,
fname: "-",
resTags: resTags,
ignoredTags: ignoredTags,
csvWriter: csv.NewWriter(logfile),
csvWriter: stdoutWriter,
row: make([]string, 3+len(resTags)+1),
saveInterval: saveInterval,
closeFn: func() error { return nil },
}, nil
}

logfile, err := fs.Create(fname)
logFile, err := fs.Create(fname)
if err != nil {
return nil, err
}

return &Collector{
outfile: logfile,
c := Collector{
fname: fname,
resTags: resTags,
ignoredTags: ignoredTags,
csvWriter: csv.NewWriter(logfile),
row: make([]string, 3+len(resTags)+1),
saveInterval: saveInterval,
}, nil
}

if strings.HasSuffix(fname, ".gz") {
outfile := gzip.NewWriter(logFile)
csvWriter := csv.NewWriter(outfile)
c.csvWriter = csvWriter
c.closeFn = func() error {
_ = outfile.Close()
return logFile.Close()
}
} else {
csvWriter := csv.NewWriter(logFile)
c.csvWriter = csvWriter
c.closeFn = logFile.Close
}

return &c, nil
}

// Init writes column names to csv file
Expand All @@ -125,16 +133,19 @@ func (c *Collector) SetRunStatus(status lib.RunStatus) {}
// Run just blocks until the context is done
func (c *Collector) Run(ctx context.Context) {
ticker := time.NewTicker(c.saveInterval)
defer func() {
err := c.closeFn()
if err != nil {
logrus.WithField("filename", c.fname).Errorf("CSV: Error closing the file: %v", err)
}
}()

for {
select {
case <-ticker.C:
c.WriteToFile()
c.writeToFile()
case <-ctx.Done():
c.WriteToFile()
err := c.outfile.Close()
if err != nil {
logrus.WithField("filename", c.fname).Error("CSV: Error closing the file")
}
c.writeToFile()
return
}
}
Expand All @@ -149,8 +160,8 @@ func (c *Collector) Collect(scs []stats.SampleContainer) {
}
}

// WriteToFile Writes samples to the csv file
func (c *Collector) WriteToFile() {
// writeToFile Writes samples to the csv file
func (c *Collector) writeToFile() {
c.bufferLock.Lock()
samples := c.buffer
c.buffer = nil
Expand Down
185 changes: 136 additions & 49 deletions stats/csv/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
package csv

import (
"compress/gzip"
"context"
"fmt"
"io/ioutil"
"sort"
"sync"
"testing"
Expand Down Expand Up @@ -239,62 +241,128 @@ func TestRun(t *testing.T) {
wg.Wait()
}

func readUnCompressedFile(fileName string, fs afero.Fs) string {
csvbytes, err := afero.ReadFile(fs, fileName)
if err != nil {
return err.Error()
}

return fmt.Sprintf("%s", csvbytes)
}

func readCompressedFile(fileName string, fs afero.Fs) string {
file, err := fs.Open(fileName)
if err != nil {
return err.Error()
}

gzf, err := gzip.NewReader(file)
if err != nil {
return err.Error()
}

csvbytes, err := ioutil.ReadAll(gzf)
if err != nil {
return err.Error()
}

return fmt.Sprintf("%s", csvbytes)
}

func TestRunCollect(t *testing.T) {
testSamples := []stats.SampleContainer{
stats.Sample{
Time: time.Unix(1562324643, 0),
Metric: stats.New("my_metric", stats.Gauge),
Value: 1,
Tags: stats.NewSampleTags(map[string]string{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
}),
testData := []struct {
samples []stats.SampleContainer
fileName string
fileReaderFunc func(fileName string, fs afero.Fs) string
outputContent string
}{
{
samples: []stats.SampleContainer{
stats.Sample{
Time: time.Unix(1562324643, 0),
Metric: stats.New("my_metric", stats.Gauge),
Value: 1,
Tags: stats.NewSampleTags(map[string]string{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
}),
},
stats.Sample{
Time: time.Unix(1562324644, 0),
Metric: stats.New("my_metric", stats.Gauge),
Value: 1,
Tags: stats.NewSampleTags(map[string]string{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
"tag4": "val4",
}),
},
},
fileName: "test",
fileReaderFunc: readUnCompressedFile,
outputContent: "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n" + "my_metric,1562324643,1.000000,val1,val3,\n" + "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n",
},
stats.Sample{
Time: time.Unix(1562324644, 0),
Metric: stats.New("my_metric", stats.Gauge),
Value: 1,
Tags: stats.NewSampleTags(map[string]string{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
"tag4": "val4",
}),
{
samples: []stats.SampleContainer{
stats.Sample{
Time: time.Unix(1562324643, 0),
Metric: stats.New("my_metric", stats.Gauge),
Value: 1,
Tags: stats.NewSampleTags(map[string]string{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
}),
},
stats.Sample{
Time: time.Unix(1562324644, 0),
Metric: stats.New("my_metric", stats.Gauge),
Value: 1,
Tags: stats.NewSampleTags(map[string]string{
"tag1": "val1",
"tag2": "val2",
"tag3": "val3",
"tag4": "val4",
}),
},
},
fileName: "test.gz",
fileReaderFunc: readCompressedFile,
outputContent: "metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n" + "my_metric,1562324643,1.000000,val1,val3,\n" + "my_metric,1562324644,1.000000,val1,val3,tag4=val4\n",
},
}

mem := afero.NewMemMapFs()
collector, err := New(
mem,
stats.TagSet{"tag1": true, "tag2": false, "tag3": true},
Config{FileName: null.StringFrom("path"), SaveInterval: types.NewNullDuration(time.Duration(1), true)},
)
assert.NoError(t, err)
assert.NotNil(t, collector)
for _, data := range testData {
mem := afero.NewMemMapFs()
collector, err := New(
mem,
stats.TagSet{"tag1": true, "tag2": false, "tag3": true},
Config{FileName: null.StringFrom(data.fileName), SaveInterval: types.NewNullDuration(time.Duration(1), true)},
)
assert.NoError(t, err)
assert.NotNil(t, collector)

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
collector.Run(ctx)
wg.Done()
}()
err = collector.Init()
assert.NoError(t, err)
collector.Collect(testSamples)
time.Sleep(1 * time.Second)
cancel()
wg.Wait()
csvbytes, _ := afero.ReadFile(mem, "path")
csvstr := fmt.Sprintf("%s", csvbytes)
assert.Equal(t,
"metric_name,timestamp,metric_value,tag1,tag3,extra_tags\n"+
"my_metric,1562324643,1.000000,val1,val3,\n"+
"my_metric,1562324644,1.000000,val1,val3,tag4=val4\n",
csvstr)
}
ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup
wg.Add(1)
go func() {
collector.Run(ctx)
wg.Done()
}()
err = collector.Init()

assert.NoError(t, err)
collector.Collect(data.samples)
time.Sleep(1 * time.Second)
cancel()
wg.Wait()

assert.Equal(t, data.outputContent, data.fileReaderFunc(data.fileName, mem))
}
}
func TestNew(t *testing.T) {
configs := []struct {
cfg Config
Expand All @@ -308,6 +376,14 @@ func TestNew(t *testing.T) {
"tag3": true,
},
},
{
cfg: Config{FileName: null.StringFrom("name.csv.gz"), SaveInterval: types.NewNullDuration(time.Duration(1), true)},
tags: stats.TagSet{
"tag1": true,
"tag2": false,
"tag3": true,
},
},
{
cfg: Config{FileName: null.StringFrom("-"), SaveInterval: types.NewNullDuration(time.Duration(1), true)},
tags: stats.TagSet{
Expand All @@ -326,6 +402,7 @@ func TestNew(t *testing.T) {
fname string
resTags []string
ignoredTags []string
closeFn func() error
}{
{
fname: "name",
Expand All @@ -336,6 +413,15 @@ func TestNew(t *testing.T) {
"tag2",
},
},
{
fname: "name.csv.gz",
resTags: []string{
"tag1", "tag3",
},
ignoredTags: []string{
"tag2",
},
},
{
fname: "-",
resTags: []string{
Expand Down Expand Up @@ -365,6 +451,7 @@ func TestNew(t *testing.T) {
sort.Strings(expected.ignoredTags)
sort.Strings(collector.ignoredTags)
assert.Equal(t, expected.ignoredTags, collector.ignoredTags)
assert.NoError(t, collector.closeFn())
})
}
}
Expand Down

0 comments on commit b83dbbd

Please sign in to comment.