-
Notifications
You must be signed in to change notification settings - Fork 0
/
importer.go
106 lines (91 loc) · 2.65 KB
/
importer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/kneu-messenger-pigeon/events"
"github.com/segmentio/kafka-go"
"io"
"time"
)
const LessonQuery = `SELECT ID, NUM_PREDM, DATEZAN, NUM_VARZAN, HALF,
(case FSTATUS when 0 then 1 else 0 end) as isDeleted
FROM T_PRJURN WHERE REGDATE BETWEEN ? AND ? ORDER BY ID DESC`
const LessonTypesQuery = `SELECT ID, SHIRTNAME, LONGNAME FROM T_VARZAN`
const AdditionalDateRangeInDays = 2
type ImporterInterface interface {
execute(startDatetime time.Time, endDatetime time.Time, year int) error
importLessonTypes() ([]events.LessonType, error)
}
type LessonsImporter struct {
out io.Writer
db *sql.DB
writer events.WriterInterface
writeThreshold int
}
func (importer *LessonsImporter) execute(startDatetime time.Time, endDatetime time.Time, year int) (err error) {
if err = importer.db.Ping(); err != nil {
return
}
startDatetime = time.Date(
startDatetime.Year(), startDatetime.Month(), startDatetime.Day()-AdditionalDateRangeInDays,
0, 0, 0, 0, startDatetime.Location(),
)
startedAt := time.Now()
fmt.Fprintf(importer.out, "Start import lessons: \n")
rows, err := importer.db.Query(
LessonQuery,
startDatetime.Format(dateFormat),
endDatetime.Format(dateFormat),
)
if err != nil {
return
}
defer rows.Close()
var messages []kafka.Message
var nextErr error
writeMessages := func(threshold int) bool {
if len(messages) != 0 && len(messages) >= threshold {
nextErr = importer.writer.WriteMessages(context.Background(), messages...)
messages = []kafka.Message{}
fmt.Fprintf(importer.out, ".")
if err == nil && nextErr != nil {
err = nextErr
}
}
return err == nil
}
var event events.LessonEvent
i := 0
for rows.Next() && writeMessages(importer.writeThreshold) {
i++
err = rows.Scan(&event.Id, &event.DisciplineId, &event.Date, &event.TypeId, &event.Semester, &event.IsDeleted)
if err == nil {
event.Year = year
payload, _ := json.Marshal(event)
messages = append(messages, kafka.Message{
Key: []byte(events.LessonEventName),
Value: payload,
})
}
}
writeMessages(0)
fmt.Fprintf(
importer.out, " finished.\n Send %d lessons. Error: %v. Done in %d seconds \n",
i, err, int(time.Now().Sub(startedAt).Seconds()),
)
return
}
func (importer *LessonsImporter) importLessonTypes() (list []events.LessonType, err error) {
rows, err := importer.db.Query(LessonTypesQuery)
if rows != nil {
defer rows.Close()
}
var lessonType events.LessonType
for err == nil && rows.Next() {
err = rows.Scan(&lessonType.Id, &lessonType.ShortName, &lessonType.LongName)
list = append(list, lessonType)
}
return
}