Skip to content

Commit

Permalink
Introduce parallelism while processing a single file
Browse files Browse the repository at this point in the history
  • Loading branch information
aswinkarthik committed Apr 29, 2018
1 parent 2f97dc6 commit 967ee52
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 27 deletions.
88 changes: 61 additions & 27 deletions pkg/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package digest
import (
"encoding/csv"
"io"
"strings"
"runtime"
"sync"

"github.com/cespare/xxhash"
)
Expand All @@ -16,63 +17,96 @@ const Separator = ","
type Digest struct {
Key uint64
Value uint64
Row string
}

// CreateDigest creates a Digest for each line of csv.
// There will be one Digest per line
func CreateDigest(csv []string, pKey Positions, pRow Positions) Digest {
row := strings.Join(csv, Separator)
key := xxhash.Sum64String(pKey.MapToValue(csv))
digest := xxhash.Sum64String(pRow.MapToValue(csv))

return Digest{Key: key, Value: digest, Row: row}
return Digest{Key: key, Value: digest}

}

// Config represents configurations that can be passed
// to create a Digest.
type Config struct {
KeyPositions []int
Key Positions
Value Positions
Reader io.Reader
Writer io.Writer
SourceMap bool
Key Positions
Value Positions
Reader io.Reader
}

// NewConfig creates an instance of Config struct.
func NewConfig(r io.Reader, createSourceMap bool, primaryKey Positions, valueColumns Positions) *Config {
return &Config{
Reader: r,
SourceMap: createSourceMap,
Key: primaryKey,
Value: valueColumns,
Reader: r,
Key: primaryKey,
Value: valueColumns,
}
}

const bufferSize = 512

// Create can create a Digest using the Configurations passed.
// It returns the digest as a map[uint64]uint64.
// It can also keep track of the Source line.
func Create(config *Config) (map[uint64]uint64, map[uint64]string, error) {
maxProcs := runtime.NumCPU()
reader := csv.NewReader(config.Reader)

output := make(map[uint64]uint64)
sourceMap := make(map[uint64]string)
for {
line, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
return nil, nil, err

digestChannel := make(chan []Digest, bufferSize*maxProcs)

go readAndProcess(config, reader, digestChannel)

for digests := range digestChannel {
for _, digest := range digests {
output[digest.Key] = digest.Value
}
digest := CreateDigest(line, config.Key, config.Value)
output[digest.Key] = digest.Value
if config.SourceMap {
sourceMap[digest.Key] = digest.Row
}

return output, nil, nil
}

func readAndProcess(config *Config, reader *csv.Reader, digestChannel chan<- []Digest) {
eofReached := false
var wg sync.WaitGroup
for !eofReached {
lines := make([][]string, bufferSize)

lineCount := 0
for ; lineCount < bufferSize; lineCount++ {
line, err := reader.Read()
lines[lineCount] = line
if err != nil {
if err == io.EOF {
eofReached = true
break
}
return
}
}

wg.Add(1)
go createDigestForNLines(lines[:lineCount], config, digestChannel, &wg)
}
wg.Wait()
close(digestChannel)
}

func createDigestForNLines(lines [][]string,
config *Config,
digestChannel chan<- []Digest,
wg *sync.WaitGroup,
) {
output := make([]Digest, len(lines))

for i, line := range lines {
output[i] = CreateDigest(line, config.Key, config.Value)
}

return output, sourceMap, nil
digestChannel <- output
wg.Done()
}
18 changes: 18 additions & 0 deletions pkg/digest/digest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package digest

import (
"bytes"
"os"
"strings"
"testing"

Expand Down Expand Up @@ -64,3 +65,20 @@ func TestDigestForFile(t *testing.T) {
assert.Equal(t, expectedDigest, actualDigest)
assert.Equal(t, map[uint64]string{}, sourceMap)
}

func TestCreatePerformance(t *testing.T) {
file, err := os.Open("../../benchmark/majestic_million.csv")
defer file.Close()
assert.NoError(t, err)

config := &Config{
Reader: file,
KeyPositions: []int{0},
Key: []int{},
SourceMap: false,
}

result, _, _ := Create(config)

assert.Equal(t, 998390, len(result))
}

0 comments on commit 967ee52

Please sign in to comment.