-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathserver.go
445 lines (406 loc) · 13.5 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package ts
import (
"context"
"math"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/ts/catalog"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// URLPrefix is the prefix for all time series endpoints hosted by the
// server.
URLPrefix = "/ts/"
// queryWorkerMax is the default maximum number of worker goroutines that
// the time series server can use to service incoming queries.
queryWorkerMax = 8
// queryMemoryMax is a soft limit for the amount of total memory used by
// time series queries. This is not currently enforced, but is used for
// monitoring purposes.
queryMemoryMax = int64(64 * 1024 * 1024) // 64MiB
// dumpBatchSize is the number of keys processed in each batch by the dump
// command.
dumpBatchSize = 100
)
// ClusterNodeCountFn is a function that returns the number of nodes active on
// the cluster.
type ClusterNodeCountFn func() int64
// ServerConfig provides a means for tests to override settings in the time
// series server.
type ServerConfig struct {
// The maximum number of query workers used by the server. If this
// value is zero, a default non-zero value is used instead.
QueryWorkerMax int
// The maximum amount of memory that should be used for processing queries
// across all workers. If this value is zero, a default non-zero value is
// used instead.
QueryMemoryMax int64
}
// Server handles incoming external requests related to time series data.
//
// The server attempts to constrain the total amount of memory it uses for
// processing incoming queries. This is accomplished with a multi-pronged
// strategy:
// + The server has a worker memory limit, which is a quota for the amount of
// memory that can be used across all currently executing queries.
// + The server also has a pre-set limit on the number of parallel workers that
// can be executing at one time. Each worker is given an even share of the
// server's total memory limit, which it should not exceed.
// + Each worker breaks its task into chunks which it will process sequentially;
// the size of each chunk is calculated to avoid exceeding the memory limit.
//
// In addition to this strategy, the server uses a memory monitor to track the
// amount of memory being used in reality by worker tasks. This is intended to
// verify the calculations of the individual workers are correct.
//
// A second memory monitor is used to track the space used by the results of
// query workers, which are longer lived; an incoming request may utilize
// several workers, but the results of each worker cannot be released until
// being returned to the requestor. Result memory is not currently limited,
// as in practical usage it is dwarfed by the memory needed by workers to
// generate the results.
type Server struct {
log.AmbientContext
db *DB
stopper *stop.Stopper
nodeCountFn ClusterNodeCountFn
queryMemoryMax int64
queryWorkerMax int
workerMemMonitor *mon.BytesMonitor
resultMemMonitor *mon.BytesMonitor
workerSem *quotapool.IntPool
}
// MakeServer instantiates a new Server which services requests with data from
// the supplied DB.
func MakeServer(
ambient log.AmbientContext,
db *DB,
nodeCountFn ClusterNodeCountFn,
cfg ServerConfig,
stopper *stop.Stopper,
) Server {
ambient.AddLogTag("ts-srv", nil)
// Override default values from configuration.
queryWorkerMax := queryWorkerMax
if cfg.QueryWorkerMax != 0 {
queryWorkerMax = cfg.QueryWorkerMax
}
queryMemoryMax := queryMemoryMax
if cfg.QueryMemoryMax != 0 {
queryMemoryMax = cfg.QueryMemoryMax
}
workerSem := quotapool.NewIntPool("ts.Server worker", uint64(queryWorkerMax))
stopper.AddCloser(workerSem.Closer("stopper"))
return Server{
AmbientContext: ambient,
db: db,
stopper: stopper,
nodeCountFn: nodeCountFn,
workerMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
"timeseries-workers",
mon.MemoryResource,
nil,
nil,
// Begin logging messages if we exceed our planned memory usage by
// more than double.
queryMemoryMax*2,
db.st,
),
resultMemMonitor: mon.NewUnlimitedMonitor(
context.Background(),
"timeseries-results",
mon.MemoryResource,
nil,
nil,
math.MaxInt64,
db.st,
),
queryMemoryMax: queryMemoryMax,
queryWorkerMax: queryWorkerMax,
workerSem: workerSem,
}
}
// RegisterService registers the GRPC service.
func (s *Server) RegisterService(g *grpc.Server) {
tspb.RegisterTimeSeriesServer(g, s)
}
// RegisterGateway starts the gateway (i.e. reverse proxy) that proxies HTTP requests
// to the appropriate gRPC endpoints.
func (s *Server) RegisterGateway(
ctx context.Context, mux *gwruntime.ServeMux, conn *grpc.ClientConn,
) error {
return tspb.RegisterTimeSeriesHandler(ctx, mux, conn)
}
// Query is an endpoint that returns data for one or more metrics over a
// specific time span.
func (s *Server) Query(
ctx context.Context, request *tspb.TimeSeriesQueryRequest,
) (*tspb.TimeSeriesQueryResponse, error) {
ctx = s.AnnotateCtx(ctx)
if len(request.Queries) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "Queries cannot be empty")
}
// If not set, sampleNanos should default to ten second resolution.
sampleNanos := request.SampleNanos
if sampleNanos == 0 {
sampleNanos = Resolution10s.SampleDuration()
}
// For the interpolation limit, use the time limit until stores are considered
// dead. This is a conservatively long span, but gives us a good indication of
// when a gap likely indicates an outage (and thus missing values should not
// be interpolated).
interpolationLimit := kvserver.TimeUntilStoreDead.Get(&s.db.st.SV).Nanoseconds()
// Get the estimated number of nodes on the cluster, used to compute more
// accurate memory usage estimates. Set a minimum of 1 in order to avoid
// divide-by-zero panics.
estimatedClusterNodeCount := s.nodeCountFn()
if estimatedClusterNodeCount == 0 {
estimatedClusterNodeCount = 1
}
response := tspb.TimeSeriesQueryResponse{
Results: make([]tspb.TimeSeriesQueryResponse_Result, len(request.Queries)),
}
// Defer cancellation of context passed to worker tasks; if main task
// returns early, worker tasks should be torn down quickly.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Channel which workers use to report their result, which is either an
// error or nil (when successful).
workerOutput := make(chan error)
// Create a separate memory management context for each query, allowing them
// to be run in parallel.
memContexts := make([]QueryMemoryContext, len(request.Queries))
defer func() {
for idx := range memContexts {
memContexts[idx].Close(ctx)
}
}()
timespan := QueryTimespan{
StartNanos: request.StartNanos,
EndNanos: request.EndNanos,
SampleDurationNanos: sampleNanos,
NowNanos: timeutil.Now().UnixNano(),
}
// Start a task which is itself responsible for starting per-query worker
// tasks. This is needed because RunLimitedAsyncTask can block; in the
// case where a single request has more queries than the semaphore limit,
// a deadlock would occur because queries cannot complete until
// they have written their result to the "output" channel, which is
// processed later in the main function.
if err := s.stopper.RunAsyncTask(ctx, "ts.Server: queries", func(ctx context.Context) {
for queryIdx, query := range request.Queries {
queryIdx := queryIdx
query := query
if err := s.stopper.RunLimitedAsyncTask(
ctx,
"ts.Server: query",
s.workerSem,
true, /* wait */
func(ctx context.Context) {
// Estimated source count is either the count of requested sources
// *or* the estimated cluster node count if no sources are specified.
var estimatedSourceCount int64
if len(query.Sources) > 0 {
estimatedSourceCount = int64(len(query.Sources))
} else {
estimatedSourceCount = estimatedClusterNodeCount
}
// Create a memory account for the results of this query.
memContexts[queryIdx] = MakeQueryMemoryContext(
s.workerMemMonitor,
s.resultMemMonitor,
QueryMemoryOptions{
BudgetBytes: s.queryMemoryMax / int64(s.queryWorkerMax),
EstimatedSources: estimatedSourceCount,
InterpolationLimitNanos: interpolationLimit,
},
)
datapoints, sources, err := s.db.Query(
ctx,
query,
Resolution10s,
timespan,
memContexts[queryIdx],
)
if err == nil {
response.Results[queryIdx] = tspb.TimeSeriesQueryResponse_Result{
Query: query,
Datapoints: datapoints,
}
response.Results[queryIdx].Sources = sources
}
select {
case workerOutput <- err:
case <-ctx.Done():
}
},
); err != nil {
// Stopper has been closed and is draining. Return an error and
// exit the worker-spawning loop.
select {
case workerOutput <- err:
case <-ctx.Done():
}
return
}
}
}); err != nil {
return nil, err
}
for range request.Queries {
select {
case err := <-workerOutput:
if err != nil {
// Return the first error encountered. This will cancel the
// worker context and cause all other in-progress workers to
// exit.
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
return &response, nil
}
// Dump returns a stream of raw timeseries data that has been stored on the
// server. Only data from the 10-second resolution is returned; rollup data is
// not currently returned. Data is returned in the order it is read from disk,
// and will thus not be totally organized by series.
//
// TODO(tbg): needs testing that restricting to individual timeseries works
// and that the date range restrictions are respected. Should be easy enough to
// set up a KV store and write some keys into it (`MakeDataKey`) to do so without
// setting up a `*Server`.
func (s *Server) Dump(req *tspb.DumpRequest, stream tspb.TimeSeries_DumpServer) error {
d := defaultDumper{stream}.Dump
return dumpImpl(stream.Context(), s.db.db, req, d)
}
func (s *Server) DumpRaw(req *tspb.DumpRequest, stream tspb.TimeSeries_DumpRawServer) error {
d := rawDumper{stream}.Dump
return dumpImpl(stream.Context(), s.db.db, req, d)
}
func dumpImpl(
ctx context.Context, db *kv.DB, req *tspb.DumpRequest, d func(*roachpb.KeyValue) error,
) error {
names := req.Names
if len(names) == 0 {
names = catalog.AllMetricsNames()
}
resolutions := req.Resolutions
if len(resolutions) == 0 {
resolutions = []tspb.TimeSeriesResolution{tspb.TimeSeriesResolution_RESOLUTION_10S}
}
for _, seriesName := range names {
for _, res := range resolutions {
if err := dumpTimeseriesAllSources(
ctx,
db,
seriesName,
ResolutionFromProto(res),
req.StartNanos,
req.EndNanos,
d,
); err != nil {
return err
}
}
}
return nil
}
type defaultDumper struct {
stream tspb.TimeSeries_DumpServer
}
func (dd defaultDumper) Dump(kv *roachpb.KeyValue) error {
name, source, _, _, err := DecodeDataKey(kv.Key)
if err != nil {
return err
}
var idata roachpb.InternalTimeSeriesData
if err := kv.Value.GetProto(&idata); err != nil {
return err
}
tsdata := &tspb.TimeSeriesData{
Name: name,
Source: source,
Datapoints: make([]tspb.TimeSeriesDatapoint, idata.SampleCount()),
}
for i := 0; i < idata.SampleCount(); i++ {
if idata.IsColumnar() {
tsdata.Datapoints[i].TimestampNanos = idata.TimestampForOffset(idata.Offset[i])
tsdata.Datapoints[i].Value = idata.Last[i]
} else {
tsdata.Datapoints[i].TimestampNanos = idata.TimestampForOffset(idata.Samples[i].Offset)
tsdata.Datapoints[i].Value = idata.Samples[i].Sum
}
}
return dd.stream.Send(tsdata)
}
type rawDumper struct {
stream tspb.TimeSeries_DumpRawServer
}
func (rd rawDumper) Dump(kv *roachpb.KeyValue) error {
return rd.stream.Send(kv)
}
func dumpTimeseriesAllSources(
ctx context.Context,
db *kv.DB,
seriesName string,
diskResolution Resolution,
startNanos, endNanos int64,
dump func(*roachpb.KeyValue) error,
) error {
if endNanos == 0 {
endNanos = math.MaxInt64
}
if delta := diskResolution.SlabDuration() - 1; endNanos > math.MaxInt64-delta {
endNanos = math.MaxInt64
} else {
endNanos += delta
}
span := &roachpb.Span{
Key: MakeDataKey(
seriesName, "" /* source */, diskResolution, startNanos,
),
EndKey: MakeDataKey(
seriesName, "" /* source */, diskResolution, endNanos,
),
}
for span != nil {
b := &kv.Batch{}
scan := roachpb.NewScan(span.Key, span.EndKey, false /* forUpdate */)
b.AddRawRequest(scan)
b.Header.MaxSpanRequestKeys = dumpBatchSize
err := db.Run(ctx, b)
if err != nil {
return err
}
resp := b.RawResponse().Responses[0].GetScan()
span = resp.ResumeSpan
for i := range resp.Rows {
if err := dump(&resp.Rows[i]); err != nil {
return err
}
}
}
return nil
}