-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventloop.go
62 lines (52 loc) · 1.6 KB
/
eventloop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/kneu-messenger-pigeon/events"
"github.com/segmentio/kafka-go"
"io"
"os/signal"
"syscall"
)
type EventLoop struct {
out io.Writer
metaEventbus MetaEventbusInterface
reader events.ReaderInterface
importer ImporterInterface
}
func (eventLoop EventLoop) execute() (err error) {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer stop()
var event events.SecondaryDbLessonProcessedEvent
var m kafka.Message
for err == nil {
m, err = eventLoop.reader.FetchMessage(ctx)
if err == nil && string(m.Key) == events.SecondaryDbLessonProcessedEventName {
_ = json.Unmarshal(m.Value, &event)
fmt.Fprintf(
eventLoop.out, "Receive %s %s - %s\n", string(m.Key),
event.PreviousSecondaryDatabaseDatetime.Format(dateFormat),
event.CurrentSecondaryDatabaseDatetime.Format(dateFormat),
)
err = eventLoop.importer.execute(
event.PreviousSecondaryDatabaseDatetime, event.CurrentSecondaryDatabaseDatetime,
event.Year,
)
fmt.Fprintf(
eventLoop.out, "Finish processing %s %s - %s. Error: %v \n", string(m.Key),
event.PreviousSecondaryDatabaseDatetime.Format(dateFormat),
event.CurrentSecondaryDatabaseDatetime.Format(dateFormat),
err,
)
if err == nil {
err = eventLoop.metaEventbus.sendSecondaryDbScoreProcessedEvent(event)
}
}
fmt.Fprintf(eventLoop.out, "Message %s processed - error: %v \n", string(m.Key), err)
if err == nil {
err = eventLoop.reader.CommitMessages(context.Background(), m)
}
}
return
}