-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.go
110 lines (85 loc) · 2.01 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
)
func startWorker(dir string, argset argumentSet) {
in := make(chan string, 2000)
out := make(chan listEntry, 2000)
workerWG := sync.WaitGroup{}
workerWG.Add(1)
inputWorker(dir, in, &workerWG)
time.Sleep(2 * time.Second)
for i := 0; i < runtime.NumCPU(); i++ {
go compareWorker(in, out, argset)
}
time.Sleep(2 * time.Second)
go outputWorker(in, out, &workerWG)
workerWG.Wait()
close(in)
close(out)
}
func inputWorker(dir string, in chan string, workerWG *sync.WaitGroup) {
workerWG.Add(1)
files, err := ioutil.ReadDir(dir)
evalErr(err, dir)
defer workerWG.Done()
for _, file := range files {
if !file.IsDir() && file.Name() != "" {
in <- filepath.Join(dir, file.Name())
}
}
}
func compareWorker(in chan string, out chan listEntry, argset argumentSet) {
for {
path := <-in
file, err := os.Open(path)
evalErr(err, path)
scnr := bufio.NewScanner(file)
if strings.HasSuffix(path, "ipset") || strings.HasSuffix(path, "netset") {
out <- parseFile(scnr, argset, file)
}
err0 := file.Close()
evalErr(err0, file.Name())
}
}
func outputWorker(in chan string, out chan listEntry, workerWG *sync.WaitGroup) {
workerWG.Add(1)
go releaseWorker(in, out)
for {
output := <-out
if output.match {
switch output.category {
case "geolocation":
fmt.Println("IP:" + output.ip + " | List: " + output.list + " | Country: " +
output.country + " | URL: " + output.url)
default:
fmt.Println("IP:" + output.ip + " | List: " + output.list + " | Category: " +
output.category + " | URL: " + output.url)
}
}
if output.release {
workerWG.Done()
// Due to initial increment of waitgroup to block while executing workers
workerWG.Done()
return
}
}
}
func releaseWorker(in chan string, out chan listEntry) {
for {
if len(out) == 0 && len(in) == 0 {
tmp := listEntry{}
tmp.release = true
out <- tmp
}
time.Sleep(2 * time.Second)
}
}