forked from cloudfoundry/bosh-io-web
-
Notifications
You must be signed in to change notification settings - Fork 0
/
periodic_watcher.go
132 lines (106 loc) · 3.36 KB
/
periodic_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package watcher
import (
"time"
bosherr "github.com/cloudfoundry/bosh-agent/errors"
boshlog "github.com/cloudfoundry/bosh-agent/logger"
bhfetcher "github.com/cppforlife/bosh-hub/release/fetcher"
bhimpsrepo "github.com/cppforlife/bosh-hub/release/importsrepo"
bhrelsrepo "github.com/cppforlife/bosh-hub/release/releasesrepo"
bhwatchersrepo "github.com/cppforlife/bosh-hub/release/watchersrepo"
)
type PeriodicWatcher struct {
p time.Duration
stopCh <-chan struct{}
releasesRepo bhrelsrepo.ReleasesRepository
watchersRepo bhwatchersrepo.WatchersRepository
importsRepo bhimpsrepo.ImportsRepository
fetcher bhfetcher.Fetcher
logTag string
logger boshlog.Logger
}
func NewPeriodicWatcher(
p time.Duration,
stopCh <-chan struct{},
releasesRepo bhrelsrepo.ReleasesRepository,
watchersRepo bhwatchersrepo.WatchersRepository,
importsRepo bhimpsrepo.ImportsRepository,
fetcher bhfetcher.Fetcher,
logger boshlog.Logger,
) PeriodicWatcher {
return PeriodicWatcher{
p: p,
stopCh: stopCh,
releasesRepo: releasesRepo,
watchersRepo: watchersRepo,
importsRepo: importsRepo,
fetcher: fetcher,
logTag: "PeriodicWatcher",
logger: logger,
}
}
func (w PeriodicWatcher) Watch() error {
w.logger.Info(w.logTag, "Starting watching releases every %s", w.p)
for {
select {
case <-time.After(w.p):
err := w.lookAtReleases()
if err != nil {
w.logger.Error(w.logTag, "Failed to look at releases: %s", err)
}
case <-w.stopCh:
w.logger.Info(w.logTag, "Stopped looking at releases")
return nil
}
}
}
func (w PeriodicWatcher) lookAtReleases() error {
w.logger.Debug(w.logTag, "Looking at releases")
watcherRecs, err := w.watchersRepo.ListAll()
if err != nil {
return bosherr.WrapError(err, "Listing all watcher records")
}
for _, watcherRec := range watcherRecs {
err := w.lookAtRelease(watcherRec)
if err != nil {
w.logger.Error(w.logTag, "Failed to look at release source '%s': %s", watcherRec.RelSource, err)
}
}
return nil
}
func (w PeriodicWatcher) lookAtRelease(watcherRec bhwatchersrepo.WatcherRec) error {
w.logger.Debug(w.logTag, "Looking at release '%v'", watcherRec)
releaseDir, err := w.fetcher.Fetch(watcherRec.RelSource)
if err != nil {
return bosherr.WrapError(err, "Fetching release")
}
defer releaseDir.Close()
watcherMinVersion := watcherRec.MinVersion()
pathToManifests, err := releaseDir.ReleaseManifests()
if err != nil {
return bosherr.WrapError(err, "Finding release manifests")
}
for _, manifest := range pathToManifests {
relVerRec, err := w.releasesRepo.Find(watcherRec.RelSource, manifest.Release.Version)
if err != nil {
return bosherr.WrapError(err, "Finding release version '%v'", relVerRec)
}
if relVerRec.Version().IsLt(watcherMinVersion) {
w.logger.Debug(w.logTag,
"Skipping release version '%v' because it is less than watcher minimum version", relVerRec)
continue
}
found, err := w.releasesRepo.Contains(relVerRec)
if err != nil {
return bosherr.WrapError(err, "Finding release version '%v'", relVerRec)
} else if found {
w.logger.Debug(w.logTag,
"Skipping release version '%v' because it is was already imported", relVerRec)
continue
}
err = w.importsRepo.Push(watcherRec.RelSource, manifest.Release.Version)
if err != nil {
return bosherr.WrapError(err, "Adding release version '%v' to import queue", relVerRec)
}
}
return nil
}