-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
directory_cache.go
483 lines (431 loc) · 17.5 KB
/
directory_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package tenant
import (
"context"
"io"
"sync"
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// DirectoryCache is the external interface for the tenant directory cache.
//
// See directoryCache for more information.
type DirectoryCache interface {
// LookupTenantPods returns an IP address of one of the given tenant's SQL
// processes based on the tenantID and clusterName fields. This should block
// until the process associated with the IP is ready.
//
// If no matching pods are found (e.g. cluster name mismatch, or tenant was
// deleted), this will return a GRPC NotFound error.
LookupTenantPods(ctx context.Context, tenantID roachpb.TenantID, clusterName string) ([]*Pod, error)
// TryLookupTenantPods returns the IP addresses for all available SQL
// processes for the given tenant. It returns a GRPC NotFound error if the
// tenant does not exist.
//
// Unlike LookupTenantPods which blocks until there is an associated
// process, TryLookupTenantPods will just return an empty set if no processes
// are available for the tenant.
TryLookupTenantPods(ctx context.Context, tenantID roachpb.TenantID) ([]*Pod, error)
// ReportFailure is used to indicate to the directory cache that a
// connection attempt to connect to a particular SQL tenant pod have failed.
ReportFailure(ctx context.Context, tenantID roachpb.TenantID, addr string) error
}
// dirOptions control the behavior of directoryCache.
type dirOptions struct {
deterministic bool
refreshDelay time.Duration
podWatcher chan *Pod
}
// DirOption defines an option that can be passed to directoryCache in order
// to control its behavior.
type DirOption func(opts *dirOptions)
// RefreshDelay specifies the minimum amount of time that must elapse between
// attempts to refresh pods for a given tenant after ReportFailure is
// called. This delay has the effect of throttling calls to directory server, in
// order to avoid overloading it.
//
// RefreshDelay defaults to 100ms. Use -1 to never throttle.
func RefreshDelay(delay time.Duration) func(opts *dirOptions) {
return func(opts *dirOptions) {
opts.refreshDelay = delay
}
}
// PodWatcher provides a callback channel to which tenant pod change
// notifications will be sent by the directory. Notifications will be sent when
// a tenant pod is created, modified, or destroyed.
// NOTE: The caller is responsible for handling the notifications by receiving
// from the channel; if it does not, it may block the background pod watcher
// goroutine.
func PodWatcher(podWatcher chan *Pod) func(opts *dirOptions) {
return func(opts *dirOptions) {
opts.podWatcher = podWatcher
}
}
// directoryCache tracks the network locations of SQL tenant processes. It is
// used by the sqlproxy to route incoming traffic to the correct backend process.
// Process information is populated and kept relatively up-to-date using a
// streaming watcher. However, since watchers deliver slightly stale
// information, the directory will also make direct server calls to fetch the
// latest information about a process that is not yet in the cache, or when a
// process is suspected to have failed. When a new tenant is created, or is
// resumed from suspension, this capability allows the directory to immediately
// return the IP address for the new process.
//
// All methods in the directory are thread-safe. Methods are intended to be
// called concurrently by many threads at once, and so locking is carefully
// designed to minimize contention. While a lock shared across tenants is used
// to synchronize access to shared in-memory data structures, each tenant also
// has its own locks that are used to synchronize per-tenant operations such as
// making directory server calls to fetch updated tenant information.
type directoryCache struct {
// client is the directory client instance used to make directory server
// calls.
client DirectoryClient
// stopper is used for graceful shutdown of the pod watcher.
stopper *stop.Stopper
// options control how the environment operates.
options dirOptions
// mut synchronizes access to the in-memory tenant entry caches. Take care
// to never hold this lock during directory server calls - it should only be
// used while adding and removing tenant entries to/from the caches.
mut struct {
syncutil.Mutex
// tenants is a cache of tenant entries. Each entry tracks available IP
// addresses for SQL processes for a given tenant. Entries may not be
// fully initialized.
tenants map[roachpb.TenantID]*tenantEntry
}
}
var _ DirectoryCache = &directoryCache{}
// NewDirectoryCache constructs a new directoryCache instance that tracks SQL
// tenant processes managed by a given directory server. The given context is
// used for tracing pod watcher activity.
//
// NOTE: stopper.Stop must be called on the directory when it is no longer
// needed.
func NewDirectoryCache(
ctx context.Context, stopper *stop.Stopper, client DirectoryClient, opts ...DirOption,
) (DirectoryCache, error) {
dir := &directoryCache{client: client, stopper: stopper}
dir.mut.tenants = make(map[roachpb.TenantID]*tenantEntry)
for _, opt := range opts {
opt(&dir.options)
}
if dir.options.refreshDelay == 0 {
// Default to a delay of 100ms between refresh attempts for a given tenant.
dir.options.refreshDelay = 100 * time.Millisecond
}
// Start the pod watcher on a background goroutine.
if err := dir.watchPods(ctx, stopper); err != nil {
return nil, err
}
return dir, nil
}
// LookupTenantPods returns a list of SQL pods in the RUNNING and DRAINING
// states for the given tenant. If the tenant was just created or is suspended,
// such that there are no available RUNNING processes, then LookupTenantPods
// will trigger resumption of a new instance (or a conversion of a DRAINING pod
// to a RUNNING one) and block until that happens.
//
// If clusterName is non-empty, then a GRPC NotFound error is returned if no
// pods match the cluster name. This can be used to ensure that the incoming SQL
// connection "knows" some additional information about the tenant, such as the
// name of the cluster, before being allowed to connect. Similarly, if the
// tenant does not exist (e.g. because it was deleted), LookupTenantPods returns
// a GRPC NotFound error.
//
// WARNING: Callers should never attempt to modify values returned by this
// method, or else they may be a race. Other instances may be reading from the
// same slice.
//
// LookupTenantPods implements the DirectoryCache interface.
func (d *directoryCache) LookupTenantPods(
ctx context.Context, tenantID roachpb.TenantID, clusterName string,
) ([]*Pod, error) {
// Ensure that a directory entry has been created for this tenant.
entry, err := d.getEntry(ctx, tenantID, true /* allowCreate */)
if err != nil {
return nil, err
}
// Check if the cluster name matches. This can be skipped if clusterName
// is empty, or the ClusterName returned by the directory server is empty.
if clusterName != "" && entry.ClusterName != "" && clusterName != entry.ClusterName {
// Return a GRPC NotFound error.
log.Errorf(ctx, "cluster name %s doesn't match expected %s", clusterName, entry.ClusterName)
return nil, status.Errorf(codes.NotFound,
"cluster name %s doesn't match expected %s", clusterName, entry.ClusterName)
}
ctx, _ = d.stopper.WithCancelOnQuiesce(ctx)
tenantPods := entry.GetPods()
// Trigger resumption if there are no RUNNING pods.
runningPods := make([]*Pod, 0, len(tenantPods))
for _, pod := range tenantPods {
if pod.State == RUNNING {
runningPods = append(runningPods, pod)
}
}
if len(runningPods) == 0 {
// There are no known pod IP addresses, so fetch pod information from
// the directory server. Resume the tenant if it is suspended; that
// will always result in at least one pod IP address (or an error).
var err error
if tenantPods, err = entry.EnsureTenantPod(ctx, d.client, d.options.deterministic); err != nil {
if status.Code(err) == codes.NotFound {
d.deleteEntry(entry)
}
return nil, err
}
}
return tenantPods, nil
}
// TryLookupTenantPods returns a list of SQL pods in the RUNNING and DRAINING
// states for thegiven tenant. It returns a GRPC NotFound error if the tenant
// does not exist (e.g. it has not yet been created) or if it has not yet been
// fetched into the directory's cache (TryLookupTenantPods will never attempt to
// fetch it). If no processes are available for the tenant, TryLookupTenantPods
// will return the empty set (unlike LookupTenantPod).
//
// WARNING: Callers should never attempt to modify values returned by this
// method, or else they may be a race. Other instances may be reading from the
// same slice.
//
// TryLookupTenantPods implements the DirectoryCache interface.
func (d *directoryCache) TryLookupTenantPods(
ctx context.Context, tenantID roachpb.TenantID,
) ([]*Pod, error) {
// Ensure that a directory entry has been created for this tenant.
entry, err := d.getEntry(ctx, tenantID, false /* allowCreate */)
if err != nil {
return nil, err
}
if entry == nil {
return nil, status.Errorf(
codes.NotFound, "tenant %d not in directory cache", tenantID.ToUint64())
}
return entry.GetPods(), nil
}
// ReportFailure should be called when attempts to connect to a particular SQL
// tenant pod have failed. Since this could be due to a failed process,
// ReportFailure will attempt to refresh the cache with the latest information
// about available tenant processes.
//
// TODO(andyk): In the future, the ip parameter will be used to mark a
// particular pod as "unhealthy" so that it's less likely to be chosen.
// However, today there can be at most one pod for a given tenant, so it
// must always be chosen. Keep the parameter as a placeholder for the future.
//
// TODO(jaylim-crl): To implement the TODO above, one strawman idea is to add
// a healthy/unhealthy field (or failureCount) to *tenant.Pod. ReportFailure
// sets that field to unhealthy, and we'll have another ReportSuccess API that
// will reset that field to healthy once we have sufficient connection counts.
// When routing a connection to a SQL pod, the balancer could then use that
// field when calculating likelihoods.
//
// ReportFailure implements the DirectoryCache interface.
func (d *directoryCache) ReportFailure(
ctx context.Context, tenantID roachpb.TenantID, addr string,
) error {
entry, err := d.getEntry(ctx, tenantID, false /* allowCreate */)
if err != nil {
return err
} else if entry == nil {
// If no tenant is in the cache, no-op.
return nil
}
// Refresh the entry in case there is a new pod IP address.
return entry.RefreshPods(ctx, d.client)
}
// getEntry returns a directory entry for the given tenant. If the directory
// does not contain such an entry, then getEntry will create one if allowCreate
// is true. Otherwise, it returns nil. If an entry is returned, then getEntry
// ensures that it is fully initialized with tenant metadata. Obtaining this
// metadata requires making a separate directory server call;
// getEntry will block until that's complete.
func (d *directoryCache) getEntry(
ctx context.Context, tenantID roachpb.TenantID, allowCreate bool,
) (*tenantEntry, error) {
entry := func() *tenantEntry {
// Acquire the directory lock just long enough to check the tenants map
// for the given tenant ID. Don't complete initialization while holding
// this lock, since that requires directory server calls.
d.mut.Lock()
defer d.mut.Unlock()
entry, ok := d.mut.tenants[tenantID]
if ok {
// Entry exists, so return it.
return entry
}
if !allowCreate {
// No entry, but not allowed to create one, so done.
return nil
}
// Create the tenant entry and enter it into the tenants map.
log.Infof(ctx, "creating directory entry for tenant %d", tenantID)
entry = &tenantEntry{TenantID: tenantID, RefreshDelay: d.options.refreshDelay}
d.mut.tenants[tenantID] = entry
return entry
}()
if entry == nil {
return nil, nil
}
// Initialize the entry now if not yet done.
err := entry.Initialize(ctx, d.client)
if err != nil {
// Remove the entry from the tenants map, since initialization failed.
if d.deleteEntry(entry) {
log.Infof(ctx, "error initializing tenant %d: %v", tenantID, err)
}
return nil, err
}
return entry, nil
}
// deleteEntry removes the given directory entry for the given tenant, if it
// exists. It returns true if an entry was actually deleted.
func (d *directoryCache) deleteEntry(entry *tenantEntry) bool {
// Remove the entry from the tenants map, since initialization failed.
d.mut.Lock()
defer d.mut.Unlock()
// Threads can race to add/remove entries, so ensure that right entry is
// removed.
existing, ok := d.mut.tenants[entry.TenantID]
if ok && entry == existing {
delete(d.mut.tenants, entry.TenantID)
return true
}
return false
}
// watchPods establishes a watcher that looks for changes to tenant pods.
// Whenever tenant pods start or terminate, the watcher will get a notification
// and update the directory to reflect that change.
func (d *directoryCache) watchPods(ctx context.Context, stopper *stop.Stopper) error {
req := WatchPodsRequest{}
// The loop that processes the event stream is running in a separate go
// routine. It is desirable however, before we return, to have a guarantee
// that the separate go routine started processing events. This wait group
// helps us achieve this. Without the wait group, it will be possible to:
//
// 1. call watchPods
// 2. call LookupTenantPods
// 3. wait forever to receive notification about the tenant that just started.
//
// The reason why the notification may not ever arrive is because the
// watchPods goroutine can start listening after the server started the
// tenant and sent notifications.
var waitInit sync.WaitGroup
waitInit.Add(1)
err := stopper.RunAsyncTask(ctx, "watch-pods-client", func(ctx context.Context) {
var client Directory_WatchPodsClient
var err error
firstRun := true
ctx, _ = stopper.WithCancelOnQuiesce(ctx)
watchPodsErr := log.Every(10 * time.Second)
recvErr := log.Every(10 * time.Second)
for ctx.Err() == nil {
if client == nil {
client, err = d.client.WatchPods(ctx, &req)
if firstRun {
waitInit.Done()
firstRun = false
}
if err != nil {
if watchPodsErr.ShouldLog() {
log.Errorf(ctx, "err creating new watch pod client: %s", err)
}
sleepContext(ctx, time.Second)
continue
} else {
log.Info(ctx, "established watch on pods")
}
}
// Read the next watcher event.
resp, err := client.Recv()
if err != nil {
if recvErr.ShouldLog() {
log.Errorf(ctx, "err receiving stream events: %s", err)
}
// If stream ends, immediately try to establish a new one. Otherwise,
// wait for a second to avoid slamming server.
if err != io.EOF {
time.Sleep(time.Second)
}
client = nil
continue
}
// If caller is watching pods, send to its channel now.
if d.options.podWatcher != nil {
select {
case d.options.podWatcher <- resp.Pod:
case <-ctx.Done():
return
}
}
// Update the directory entry for the tenant with the latest
// information about this pod.
d.updateTenantEntry(ctx, resp.Pod)
}
})
if err != nil {
return err
}
// Block until the initial pod watcher client stream is constructed.
waitInit.Wait()
return err
}
// updateTenantEntry keeps tenant directory entries up-to-date by handling pod
// watcher events. When a pod is created, destroyed, or modified, it updates the
// tenant's entry to reflect that change.
func (d *directoryCache) updateTenantEntry(ctx context.Context, pod *Pod) {
if pod.Addr == "" {
// Nothing needs to be done if there is no IP address specified.
return
}
// Ensure that a directory entry exists for this tenant.
entry, err := d.getEntry(ctx, roachpb.MakeTenantID(pod.TenantID), true /* allowCreate */)
if err != nil {
if !grpcutil.IsContextCanceled(err) {
// This should only happen in case of a deleted tenant or a transient
// error during fetch of tenant metadata (i.e. very rarely).
log.Errorf(ctx, "ignoring error getting entry for tenant %d: %v", pod.TenantID, err)
}
return
}
switch pod.State {
case RUNNING, DRAINING:
// Add entries of RUNNING and DRAINING pods if they are not already present.
if entry.AddPod(pod) {
log.Infof(ctx, "added IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
} else {
log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
}
// Update entries of UNKNOWN pods only if they are already present.
case UNKNOWN:
if entry.UpdatePod(pod) {
log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
}
default:
// Remove addresses of DELETING pods.
if entry.RemovePodByAddr(pod.Addr) {
log.Infof(ctx, "deleted IP address %s for tenant %d", pod.Addr, pod.TenantID)
}
}
}
// sleepContext sleeps for the given duration or until the given context is
// canceled, whichever comes first.
func sleepContext(ctx context.Context, delay time.Duration) {
select {
case <-ctx.Done():
case <-time.After(delay):
}
}