-
Notifications
You must be signed in to change notification settings - Fork 1
/
execute.go
268 lines (234 loc) · 6.84 KB
/
execute.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package lgrep
import (
"context"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/juju/errors"
"gopkg.in/olivere/elastic.v3"
)
const (
// MaxSearchSize is the maximum search size that is able to be
// performed before the search will necessitate a scroll.
MaxSearchSize = 10000
scrollChunk = 100
scrollKeepalive = "30s"
)
// SearchStream is a stream of results that manages the execution and
// consumption of that stream.
type SearchStream struct {
// Results is a channel of results that are read from the server.
Results chan Result
// Errors is a channel of errors that are encountered.
Errors chan error
// control holds internal variables that are used to control the
// stream workers.
control struct {
*sync.WaitGroup
sync.Mutex
stopped bool
quit chan struct{}
}
}
// Wait ensures that the stream has cleaned up after reading all of
// the stream, this should be called after reading the stream in its
// entirety.
func (s *SearchStream) Wait() {
s.control.Lock()
defer s.control.Unlock()
s.control.Wait()
}
// Quit instructs the stream to close down cleanly early blocking
// until that happens, this function is safe to call several times.
func (s *SearchStream) Quit() {
log.Debug("Sending stream quit signal")
s.control.Lock()
defer s.control.Unlock()
if s.control.stopped {
return
}
s.control.quit <- struct{}{}
timeout := time.NewTimer(time.Second * 1)
stopped := make(chan struct{}, 1)
go func() { s.control.Wait(); stopped <- struct{}{} }()
select {
case <-timeout.C:
case <-stopped:
}
s.control.stopped = true
}
// All reads the entire stream into memory and returns the results
// that were read, this exits immediately on any error that is
// encountered.
func (s *SearchStream) All() (results []Result, err error) {
resultFn := func(r Result) error {
results = append(results, r)
return nil
}
// Exit immediately on error!
errFn := func(err error) error { return err }
return results, s.Each(resultFn, errFn)
}
// Each executes a function with each result that is read from the
// channel, resultFn and errFn are called when messages are read from
// their respective messages are received. If errFn or resultFn
// returns an error, the stream will shutdown early. The resultFn will
// be passed a nil value when the stream is finished thereby
// indicating the end of the stream.
func (s *SearchStream) Each(resultFn func(Result) error, errFn func(error) error) (err error) {
stream:
for {
select {
case streamErr, ok := <-s.Errors:
if streamErr == nil && !ok {
continue
}
err = errFn(streamErr)
if err == nil {
continue
}
log.Debug("Error encountered, stopping any ongoing search")
s.Quit()
break stream
case result, ok := <-s.Results:
if result == nil && !ok {
log.Debug("Stream results dried up, breaking out.")
break stream
}
err = resultFn(result)
if err != nil {
log.Debug("An error occurred with upstream handler, breaking out")
break stream
}
}
}
log.Debug("Exiting stream loop, waiting for stream to clean up")
s.Wait()
return err
}
// execute runs the search and accommodates any necessary work to
// ensure the search is executed properly.
func (l LGrep) execute(search *elastic.SearchService, query elastic.Query, spec SearchOptions) (stream *SearchStream, err error) {
stream = &SearchStream{
Results: make(chan Result, scrollChunk),
Errors: make(chan error, 1),
}
if spec.QueryDebug {
log.SetLevel(log.DebugLevel)
}
stream.control.quit = make(chan struct{}, 1)
stream.control.WaitGroup = &sync.WaitGroup{}
if spec.Size > MaxSearchSize {
log.Debugf("searching with scroll for large size (%d)", spec.Size)
if spec.Index == "" || (spec.Index == "" && len(spec.Indices) == 0) {
return nil, errors.New("An index pattern must be given for large requests")
}
source, err := query.Source()
if err != nil {
return nil, err
}
scroll := l.Scroll()
scroll.KeepAlive(scrollKeepalive)
spec.configureScroll(scroll)
// reset to the chunk size, otherwise the entire result will
// (attempt to) be pulled in a single request
scroll.Size(scrollChunk)
if queryMap, ok := source.(map[string]interface{}); ok {
log.Debugf("QueryMap provided, merging with specifications")
qm := QueryMap(queryMap)
spec.configureQueryMap(qm)
qm["size"] = scrollChunk
log.Debugf("QueryMap result: %#v", qm)
scroll.Body(qm)
} else {
// TODO: Verify any other query type and pass it into the query for the user.
log.Errorf("cannot execute scroll with provided query, unhandled")
return nil, errors.New("cannot execute scroll with provided query, unhandled")
}
go l.executeScroll(scroll, query, spec, stream)
} else {
log.Debugf("searching with regular query for small size (%d)", spec.Size)
go l.executeSearcher(search, query, spec, stream)
}
return stream, nil
}
func (l LGrep) executeScroll(scroll *elastic.ScrollService, query elastic.Query, spec SearchOptions, stream *SearchStream) {
stream.control.Add(1)
defer stream.control.Done()
var (
resultCount int
nextScrollID string
)
defer close(stream.Results)
defer close(stream.Errors)
ctx, cancelReq := context.WithCancel(context.TODO())
scrollLoop:
for {
if nextScrollID != "" {
log.Debugf("Fetching next page using scrollID %s", nextScrollID[:10])
scroll.ScrollId(nextScrollID)
if resultCount >= spec.Size {
break scrollLoop
}
} else {
log.Debug("Fetching first page of scroll")
}
results, err := scroll.DoC(ctx)
if err != nil {
log.Debugf("An error was returned during scroll after %d results.", resultCount)
if err != elastic.EOS {
stream.Errors <- errors.Annotate(err, "Server responded with error while scrolling.")
}
break scrollLoop
}
if results.ScrollId != "" {
if results.ScrollId != nextScrollID {
nextScrollID = results.ScrollId
}
}
for _, hit := range results.Hits.Hits {
result, err := extractResult(hit, spec)
if err != nil {
stream.Errors <- err
}
select {
case <-stream.control.quit:
cancelReq()
log.Debug("Stream instructed to quit")
break scrollLoop
case stream.Results <- result:
resultCount++
}
if resultCount == spec.Size {
log.Debug("Scroll streamed the required amount of results, begin shutdown")
break scrollLoop
}
}
}
l.ClearScroll(nextScrollID).Do()
}
func (l LGrep) executeSearcher(service Searcher, query elastic.Query, spec SearchOptions, stream *SearchStream) {
// Start worker
stream.control.Add(1)
defer stream.control.Done()
defer close(stream.Results)
defer close(stream.Errors)
result, err := service.Do()
if err != nil {
stream.Errors <- err
return
}
for i := range result.Hits.Hits {
select {
case <-stream.control.quit:
return
default:
doc, err := extractResult(result.Hits.Hits[i], spec)
if err != nil {
stream.Errors <- err
continue
}
stream.Results <- doc
}
}
}