diff --git a/cmd/run.go b/cmd/run.go index 8b1c0ac..cf3c9da 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -116,15 +116,12 @@ type message struct { } func generateInBackground(name string, config *digest.Config, wg *sync.WaitGroup, channel chan<- message) { - digest, sourceMap, err := digest.Create(config) - if err != nil { - panic(err) - } + digest := digest.Create(config) if debug { log.Println("Generated Digest for " + name) } - channel <- message{digestMap: digest, sourceMap: sourceMap} + channel <- message{digestMap: digest} close(channel) wg.Done() } diff --git a/pkg/digest/compare.go b/pkg/digest/compare.go index 0c08e1e..390b446 100644 --- a/pkg/digest/compare.go +++ b/pkg/digest/compare.go @@ -2,7 +2,6 @@ package digest import ( "encoding/csv" - "io" "runtime" "strings" "sync" @@ -55,7 +54,7 @@ type diffMessage struct { // Diff will differentiate between two given configs func Diff(baseConfig, deltaConfig *Config) Difference { maxProcs := runtime.NumCPU() - base, _, _ := Create(baseConfig) + base := Create(baseConfig) additions := make([]string, 0, len(base)) modifications := make([]string, 0, len(base)) @@ -79,26 +78,15 @@ func Diff(baseConfig, deltaConfig *Config) Difference { func readAndCompare(base map[uint64]uint64, config *Config, msgChannel chan<- []diffMessage) { reader := csv.NewReader(config.Reader) - eofReached := false var wg sync.WaitGroup - for !eofReached { - lines := make([][]string, bufferSize) - - lineCount := 0 - for ; lineCount < bufferSize; lineCount++ { - line, err := reader.Read() - lines[lineCount] = line - if err != nil { - if err == io.EOF { - eofReached = true - break - } - return - } - } - + for { + lines, eofReached := getNextNLines(reader) wg.Add(1) - go compareDigestForNLines(base, lines[:lineCount], config, msgChannel, &wg) + go compareDigestForNLines(base, lines, config, msgChannel, &wg) + + if eofReached { + break + } } wg.Wait() close(msgChannel) diff --git a/pkg/digest/digest.go b/pkg/digest/digest.go index 73802f1..8075a7a 100644 --- a/pkg/digest/digest.go +++ b/pkg/digest/digest.go @@ -51,7 +51,7 @@ const bufferSize = 512 // Create can create a Digest using the Configurations passed. // It returns the digest as a map[uint64]uint64. // It can also keep track of the Source line. -func Create(config *Config) (map[uint64]uint64, map[uint64]string, error) { +func Create(config *Config) map[uint64]uint64 { maxProcs := runtime.NumCPU() reader := csv.NewReader(config.Reader) @@ -67,30 +67,19 @@ func Create(config *Config) (map[uint64]uint64, map[uint64]string, error) { } } - return output, nil, nil + return output } func readAndProcess(config *Config, reader *csv.Reader, digestChannel chan<- []Digest) { - eofReached := false var wg sync.WaitGroup - for !eofReached { - lines := make([][]string, bufferSize) - - lineCount := 0 - for ; lineCount < bufferSize; lineCount++ { - line, err := reader.Read() - lines[lineCount] = line - if err != nil { - if err == io.EOF { - eofReached = true - break - } - return - } - } - + for { + lines, eofReached := getNextNLines(reader) wg.Add(1) - go createDigestForNLines(lines[:lineCount], config, digestChannel, &wg) + go createDigestForNLines(lines, config, digestChannel, &wg) + + if eofReached { + break + } } wg.Wait() close(digestChannel) diff --git a/pkg/digest/digest_test.go b/pkg/digest/digest_test.go index 73ece81..b456bfa 100644 --- a/pkg/digest/digest_test.go +++ b/pkg/digest/digest_test.go @@ -1,10 +1,10 @@ -package digest +package digest_test import ( - "os" "strings" "testing" + "github.com/aswinkarthik93/csvdiff/pkg/digest" "github.com/cespare/xxhash" "github.com/stretchr/testify/assert" ) @@ -14,9 +14,9 @@ func TestCreateDigest(t *testing.T) { firstKey := xxhash.Sum64String("1") firstLineDigest := xxhash.Sum64String(firstLine) - expectedDigest := Digest{Key: firstKey, Value: firstLineDigest} + expectedDigest := digest.Digest{Key: firstKey, Value: firstLineDigest} - actualDigest := CreateDigest(strings.Split(firstLine, Separator), []int{0}, []int{}) + actualDigest := digest.CreateDigest(strings.Split(firstLine, digest.Separator), []int{0}, []int{}) assert.Equal(t, expectedDigest, actualDigest) } @@ -25,35 +25,32 @@ func TestDigestForFile(t *testing.T) { firstLine := "1,first-line,some-columne,friday" firstKey := xxhash.Sum64String("1") firstDigest := xxhash.Sum64String(firstLine) + fridayDigest := xxhash.Sum64String("friday") secondLine := "2,second-line,nobody-needs-this,saturday" secondKey := xxhash.Sum64String("2") secondDigest := xxhash.Sum64String(secondLine) + saturdayDigest := xxhash.Sum64String("saturday") - testConfig := &Config{ + testConfig := &digest.Config{ Reader: strings.NewReader(firstLine + "\n" + secondLine), Key: []int{0}, } - actualDigest, _, err := Create(testConfig) + actualDigest := digest.Create(testConfig) expectedDigest := map[uint64]uint64{firstKey: firstDigest, secondKey: secondDigest} - assert.Nil(t, err, "error at DigestForFile") assert.Equal(t, expectedDigest, actualDigest) -} - -func TestCreatePerformance(t *testing.T) { - file, err := os.Open("../../benchmark/majestic_million.csv") - defer file.Close() - assert.NoError(t, err) - config := &Config{ - Reader: file, - Key: []int{}, + testConfig = &digest.Config{ + Reader: strings.NewReader(firstLine + "\n" + secondLine), + Key: []int{0}, + Value: []int{3}, } - result, _, _ := Create(config) + actualDigest = digest.Create(testConfig) + expectedDigest = map[uint64]uint64{firstKey: fridayDigest, secondKey: saturdayDigest} - assert.Equal(t, 998390, len(result)) + assert.Equal(t, expectedDigest, actualDigest) } diff --git a/pkg/digest/positions.go b/pkg/digest/positions.go index e69b2cf..03a8581 100644 --- a/pkg/digest/positions.go +++ b/pkg/digest/positions.go @@ -9,23 +9,12 @@ type Positions []int // their respective positions and concatenates // them using Separator as a string. func (p Positions) MapToValue(csv []string) string { - if p.Length() == 0 { + if len(p) == 0 { return strings.Join(csv, Separator) } - output := make([]string, p.Length()) - for i, pos := range p.Items() { + output := make([]string, len(p)) + for i, pos := range p { output[i] = csv[pos] } return strings.Join(output, Separator) } - -// Length returns the size of the Positions array. -func (p Positions) Length() int { - return len([]int(p)) -} - -// Items returns the elements of the Positions array -// as an array of int -func (p Positions) Items() []int { - return []int(p) -} diff --git a/pkg/digest/positions_test.go b/pkg/digest/positions_test.go index 90b0c2f..2749d5e 100644 --- a/pkg/digest/positions_test.go +++ b/pkg/digest/positions_test.go @@ -1,14 +1,15 @@ -package digest +package digest_test import ( "strings" "testing" + "github.com/aswinkarthik93/csvdiff/pkg/digest" "github.com/stretchr/testify/assert" ) func TestPositionsMapValues(t *testing.T) { - positions := Positions([]int{0, 3}) + positions := digest.Positions([]int{0, 3}) csv := []string{"zero", "one", "two", "three"} actual := positions.MapToValue(csv) @@ -18,24 +19,11 @@ func TestPositionsMapValues(t *testing.T) { } func TestPositionsMapValuesReturnsCompleteStringCsvIfEmpty(t *testing.T) { - positions := Positions([]int{}) + positions := digest.Positions([]int{}) csv := []string{"zero", "one", "two", "three"} actual := positions.MapToValue(csv) - expected := strings.Join(csv, Separator) + expected := strings.Join(csv, digest.Separator) assert.Equal(t, expected, actual) } - -func TestPositionsLength(t *testing.T) { - positions := Positions([]int{0, 3}) - - assert.Equal(t, 2, positions.Length()) -} - -func TestPositionsItems(t *testing.T) { - items := []int{0, 3} - positions := Positions(items) - - assert.Equal(t, items, positions.Items()) -} diff --git a/pkg/digest/utils.go b/pkg/digest/utils.go new file mode 100644 index 0000000..52048f3 --- /dev/null +++ b/pkg/digest/utils.go @@ -0,0 +1,26 @@ +package digest + +import ( + "encoding/csv" + "io" +) + +func getNextNLines(reader *csv.Reader) ([][]string, bool) { + lines := make([][]string, bufferSize) + + lineCount := 0 + eofReached := false + for ; lineCount < bufferSize; lineCount++ { + line, err := reader.Read() + lines[lineCount] = line + if err != nil { + if err == io.EOF { + eofReached = true + break + } + panic(err) + } + } + + return lines[:lineCount], eofReached +}