-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmountpoint_selector.go
185 lines (155 loc) · 4.02 KB
/
mountpoint_selector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package ntrip_client
import (
"errors"
"io"
"log/slog"
"sync"
"sync/atomic"
"github.com/bezineb5/ntrip-client/input"
"github.com/chewxy/math32"
)
const (
SIGNIFICANT_CHANGE = 0.01
)
type Selector interface {
io.Closer
Stream() (<-chan []byte, error)
SetLocation(lat float32, lng float32) error
Invalidate() error
}
type registrySelector struct {
registry Registry
significantChange float32
// Mutable.
invalidated atomic.Bool
refLat float32
refLng float32
refMountpoint string
refRtcm input.RtcmInput
mu sync.Mutex
newMountpoints chan input.RtcmInput
stop chan struct{}
consecutiveFailures int
}
func NewRegistrySelector(registry Registry, significantChange float32) Selector {
return ®istrySelector{
registry: registry,
significantChange: significantChange,
refLat: 0,
refLng: 0,
refMountpoint: "",
refRtcm: nil,
newMountpoints: make(chan input.RtcmInput, 1),
consecutiveFailures: 0,
}
}
func (s *registrySelector) Stream() (<-chan []byte, error) {
// We need to lock if there are multiple Stream, Close or SetLocation
// calls simultaneously.
s.mu.Lock()
defer s.mu.Unlock()
// First release the current continuous reading if there is one
if s.stop != nil {
s.stop <- struct{}{}
s.stop = nil
}
s.stop = make(chan struct{})
ch := make(chan []byte, 4)
go func(sc <-chan struct{}, newMountpoints <-chan input.RtcmInput) {
defer close(ch)
currentRtcm := s.refRtcm
var currentMountpoint <-chan []byte = nil
var err error
if currentRtcm != nil {
if currentMountpoint, err = s.refRtcm.Stream(); err != nil {
slog.Error("Error in streaming mountpoint", slog.Any("error", err))
}
}
for {
select {
case <-sc:
// Stop streaming
return
case mp := <-newMountpoints:
// Listen to a new mountpoint
if currentRtcm != nil {
currentRtcm.Close()
}
currentRtcm = mp
s.consecutiveFailures = 0
if mp != nil {
currentMountpoint, err = mp.Stream()
if err != nil {
slog.Error("Error in streaming new mountpoint", slog.Any("error", err))
}
} else {
currentMountpoint = nil
}
case data, ok := <-currentMountpoint:
// RTCM data received from mountpoint
if ok {
ch <- data
} else {
// The mountpoint stopped its channel. Do something about it!
currentMountpoint = nil
s.consecutiveFailures += 1
// Try reconnecting
if currentRtcm != nil {
currentMountpoint, err = currentRtcm.Stream()
if err != nil {
slog.Error("Error in streaming current mountpoint", slog.Any("error", err))
}
}
}
}
}
}(s.stop, s.newMountpoints)
return ch, nil
}
func (s *registrySelector) SetLocation(lat float32, lng float32) error {
if !s.invalidated.Load() && math32.Abs(lat-s.refLat) <= s.significantChange &&
math32.Abs(lng-s.refLng) <= s.significantChange {
// No significant change
return nil
}
s.invalidated.Store(false)
s.refLat = lat
s.refLng = lng
// Determine the new mountpoint to use
nearests, err := s.registry.NearestStations(lat, lng)
if err != nil {
return err
}
if len(nearests) <= 0 {
return errors.New("no station found nearby")
}
nearestMP := nearests[0].mountpoint
if nearestMP == s.refMountpoint {
// No change
return nil
}
// Update the mountpoint without breaking the stream
slog.Info("Switching to new mountpoint", slog.String("mountpoint", nearestMP), slog.Float64("distance", float64(nearests[0].distanceInM)))
client := input.NewNtripV2MountPointClient(nearestMP)
s.refMountpoint = nearestMP
s.newMountpoints <- client
return nil
}
func (s *registrySelector) Close() error {
// We need to lock if there are multiple
// calls simultaneously.
s.mu.Lock()
defer s.mu.Unlock()
if s.stop != nil {
s.stop <- struct{}{}
s.stop = nil
}
return nil
}
func (s *registrySelector) Invalidate() error {
s.invalidated.Store(true)
if s.refLat != 0 && s.refLng != 0 {
return s.SetLocation(s.refLat, s.refLng)
}
return nil
}