forked from sakishum/grsync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.go
150 lines (121 loc) · 3.1 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package grsync
import (
"bufio"
"io"
"math"
"strconv"
"strings"
"sync"
)
// Task is high-level API under rsync
type Task struct {
rsync *Rsync
state *State
log *Log
}
// State contains information about rsync process
type State struct {
Remain int `json:"remain"`
Total int `json:"total"`
Speed string `json:"speed"`
Progress float64 `json:"progress"`
}
// Log contains raw stderr and stdout outputs
type Log struct {
Stderr string `json:"stderr"`
Stdout string `json:"stdout"`
}
// State returns inforation about rsync processing task
func (t Task) State() State {
return *t.state
}
// Log return structure which contains raw stderr and stdout outputs
func (t Task) Log() Log {
return Log{
Stderr: t.log.Stderr,
Stdout: t.log.Stdout,
}
}
// Run starts rsync process with options
func (t *Task) Run() error {
stderr, err := t.rsync.StderrPipe()
if err != nil {
return err
}
defer stderr.Close()
stdout, err := t.rsync.StdoutPipe()
if err != nil {
return err
}
defer stdout.Close()
var wg sync.WaitGroup
go processStdout(&wg, t, stdout)
go processStderr(&wg, t, stderr)
wg.Add(2)
err = t.rsync.Run()
wg.Wait()
return err
}
// NewTask returns new rsync task
func NewTask(source, destination string, rsyncOptions RsyncOptions) *Task {
// Force set required options
rsyncOptions.HumanReadable = true
rsyncOptions.Partial = true
rsyncOptions.Progress = true
rsyncOptions.Archive = true
return &Task{
rsync: NewRsync(source, destination, rsyncOptions),
state: &State{},
log: &Log{},
}
}
func processStdout(wg *sync.WaitGroup, task *Task, stdout io.Reader) {
const maxPercents = float64(100)
const minDivider = 1
defer wg.Done()
progressMatcher := newMatcher(`\(.+-chk=(\d+.\d+)`)
speedMatcher := newMatcher(`(\d+\.\d+.{2}\/s)`)
// Extract data from strings:
// 999,999 99% 999.99kB/s 0:00:59 (xfr#9, to-chk=999/9999)
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
logStr := scanner.Text()
if progressMatcher.Match(logStr) {
task.state.Remain, task.state.Total = getTaskProgress(progressMatcher.Extract(logStr))
copiedCount := float64(task.state.Total - task.state.Remain)
task.state.Progress = copiedCount / math.Max(float64(task.state.Total), float64(minDivider)) * maxPercents
}
if speedMatcher.Match(logStr) {
task.state.Speed = getTaskSpeed(speedMatcher.ExtractAllStringSubmatch(logStr, 2))
}
task.log.Stdout += logStr + "\n"
}
}
func processStderr(wg *sync.WaitGroup, task *Task, stderr io.Reader) {
defer wg.Done()
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
task.log.Stderr += scanner.Text() + "\n"
}
}
func getTaskProgress(remTotalString string) (int, int) {
const remTotalSeparator = "/"
const numbersCount = 2
const (
indexRem = iota
indexTotal
)
info := strings.Split(remTotalString, remTotalSeparator)
if len(info) < numbersCount {
return 0, 0
}
remain, _ := strconv.Atoi(info[indexRem])
total, _ := strconv.Atoi(info[indexTotal])
return remain, total
}
func getTaskSpeed(data [][]string) string {
if len(data) < 2 || len(data[1]) < 2 {
return ""
}
return data[1][1]
}