-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathentry.go
309 lines (263 loc) · 9.82 KB
/
entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package tenant
import (
"context"
"fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
)
// tenantEntry is an entry in the tenant directory that records information
// about a single tenant, including its ID, cluster name, and the IP addresses
// for available pods.
type tenantEntry struct {
// These fields can be read by callers without synchronization, since
// they're written once during initialization, and are immutable thereafter.
// TenantID is the identifier for this tenant which is unique within a CRDB
// cluster.
TenantID roachpb.TenantID
// Full name of the tenant's cluster i.e. dim-dog.
ClusterName string
// RefreshDelay is the minimum amount of time that must elapse between
// attempts to refresh pods for this tenant after ReportFailure is
// called.
RefreshDelay time.Duration
// initialized is set to true once Initialized has been successfully called
// (i.e. with no resulting error). Access is synchronized via atomics.
initialized atomic.Bool
// pods synchronizes access to information about the tenant's SQL pods.
// These fields can be updated over time, so a lock must be obtained before
// accessing them.
pods struct {
syncutil.Mutex
// addrs is the set of IP:port addresses of the tenant's currently
// RUNNING pods.
addrs []string
}
// calls synchronizes calls to the Directory service for this tenant (e.g.
// calls to GetTenant or ListPods). Synchronization is needed to ensure that
// only one thread at a time is calling on behalf of a tenant, and that
// calls are rate limited to prevent storms.
calls struct {
syncutil.Mutex
// lastRefresh is the last time the list of pods for the tenant have been
// fetched from the server. It's used to rate limit refreshes.
lastRefresh time.Time
// initError is set to any error that occurs in Initialized (or nil if no
// error occurred).
initError error
}
}
// Initialize fetches metadata about a tenant, such as its cluster name, and
// stores that in the entry. After this is called once, all future calls return
// the same result (and do nothing).
func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) error {
// If Initialize has already been successfully called, nothing to do.
if e.initialized.Load() {
return nil
}
// Synchronize multiple threads trying to initialize. Only the first thread
// does the initialization.
e.calls.Lock()
defer e.calls.Unlock()
// If Initialize has already been called, return any error that occurred.
// This also handles the case where multiple thread were waiting on the
// above calls lock; all but the first will stop here.
if e.initialized.Load() || e.calls.initError != nil {
return e.calls.initError
}
tenantResp, err := client.GetTenant(ctx, &GetTenantRequest{TenantID: e.TenantID.ToUint64()})
if err != nil {
e.calls.initError = err
return err
}
e.ClusterName = tenantResp.ClusterName
e.initialized.Store(true)
return nil
}
// RefreshPods makes a synchronous directory server call to fetch the latest
// information about the tenant's available pods, such as their IP addresses.
func (e *tenantEntry) RefreshPods(ctx context.Context, client DirectoryClient) error {
if !e.initialized.Load() {
return errors.AssertionFailedf("entry for tenant %d is not initialized", e.TenantID)
}
// Lock so that only one thread at a time will refresh, since there's no
// point in multiple threads doing it within a short span of time - it's
// likely nothing has changed.
e.calls.Lock()
defer e.calls.Unlock()
// If refreshed recently, no-op.
if !e.canRefreshLocked() {
return nil
}
log.Infof(ctx, "refreshing tenant %d pods", e.TenantID)
_, err := e.fetchPodsLocked(ctx, client)
return err
}
// ChoosePodAddr returns the IP address of one of this tenant's available pods.
// If a tenant has multiple pods, then ChoosePodAddr returns the IP address of
// one of those pods. If the tenant is suspended and no pods are available, then
// ChoosePodAddr will trigger resumption of the tenant and return the IP address
// of the new pod. Note that resuming a tenant requires directory server calls,
// so ChoosePodAddr can block for some time, until the resumption process is
// complete. However, if errorIfNoPods is true, then ChoosePodAddr returns an
// error if there are no pods available rather than blocking.
//
// TODO(andyk): Use better load-balancing algorithm once tenants can have more
// than one pod.
func (e *tenantEntry) ChoosePodAddr(
ctx context.Context, client DirectoryClient, errorIfNoPods bool,
) (string, error) {
if !e.initialized.Load() {
return "", errors.AssertionFailedf("entry for tenant %d is not initialized", e.TenantID)
}
addrs := e.getPodAddrs()
if len(addrs) == 0 {
// There are no known pod IP addresses, so fetch pod information
// from the directory server. Resume the tenant if it is suspended; that
// will always result in at least one pod IP address (or an error).
var err error
if addrs, err = e.ensureTenantPod(ctx, client, errorIfNoPods); err != nil {
return "", err
}
}
return addrs[0], nil
}
// AddPodAddr inserts the given IP address into the tenant's list of pod IPs. If
// it is already present, then AddPodAddr returns false.
func (e *tenantEntry) AddPodAddr(addr string) bool {
e.pods.Lock()
defer e.pods.Unlock()
for _, existing := range e.pods.addrs {
if existing == addr {
return false
}
}
e.pods.addrs = append(e.pods.addrs, addr)
return true
}
// RemovePodAddr removes the given IP address from the tenant's list of pod
// addresses. If it was not present, RemovePodAddr returns false.
func (e *tenantEntry) RemovePodAddr(addr string) bool {
e.pods.Lock()
defer e.pods.Unlock()
for i, existing := range e.pods.addrs {
if existing == addr {
copy(e.pods.addrs[i:], e.pods.addrs[i+1:])
e.pods.addrs = e.pods.addrs[:len(e.pods.addrs)-1]
return true
}
}
return false
}
// getPodAddrs gets the current list of pod IP addresses within scope of lock
// and returns them.
func (e *tenantEntry) getPodAddrs() []string {
e.pods.Lock()
defer e.pods.Unlock()
return e.pods.addrs
}
// ensureTenantPod ensures that at least one SQL process exists for this tenant,
// and is ready for connection attempts to its IP address. If errorIfNoPods is
// true, then ensureTenantPod returns an error if there are no pods available
// rather than blocking.
func (e *tenantEntry) ensureTenantPod(
ctx context.Context, client DirectoryClient, errorIfNoPods bool,
) (addrs []string, err error) {
const retryDelay = 100 * time.Millisecond
e.calls.Lock()
defer e.calls.Unlock()
// If an IP address is already available, nothing more to do. Check this
// immediately after obtaining the lock so that only the first thread does
// the work to get information about the tenant.
addrs = e.getPodAddrs()
if len(addrs) != 0 {
return addrs, nil
}
for {
// Check for context cancellation or timeout.
if err = ctx.Err(); err != nil {
return nil, err
}
// Try to resume the tenant if not yet resumed.
_, err = client.EnsurePod(ctx, &EnsurePodRequest{e.TenantID.ToUint64()})
if err != nil {
return nil, err
}
// Get pod information for the newly resumed tenant. Except in rare
// race conditions, this is expected to immediately find an IP address,
// since the above call started a tenant process that already has an IP
// address.
addrs, err = e.fetchPodsLocked(ctx, client)
if err != nil {
return nil, err
}
if len(addrs) != 0 {
log.Infof(ctx, "resumed tenant %d", e.TenantID)
break
}
// In rare case where no IP address is ready, wait for a bit before
// retrying.
if errorIfNoPods {
return nil, fmt.Errorf("no pods available for tenant %s", e.TenantID)
}
sleepContext(ctx, retryDelay)
}
return addrs, nil
}
// fetchPodsLocked makes a synchronous directory server call to get the latest
// information about the tenant's available pods, such as their IP addresses.
//
// NOTE: Caller must lock the "calls" mutex before calling fetchPodsLocked.
func (e *tenantEntry) fetchPodsLocked(
ctx context.Context, client DirectoryClient,
) (addrs []string, err error) {
// List the pods for the given tenant.
// TODO(andyk): This races with the pod watcher, which may receive updates
// that are newer than what ListPods returns. This could be fixed by adding
// version values to the pods in order to detect races.
list, err := client.ListPods(ctx, &ListPodsRequest{e.TenantID.ToUint64()})
if err != nil {
return nil, err
}
// Get updated list of RUNNING pod IP addresses and save it to the entry.
addrs = make([]string, 0, len(list.Pods))
for i := range list.Pods {
pod := list.Pods[i]
if pod.State == RUNNING {
addrs = append(addrs, pod.Addr)
}
}
// Need to lock in case another thread is reading the IP addresses (e.g. in
// ChoosePodAddr).
e.pods.Lock()
defer e.pods.Unlock()
e.pods.addrs = addrs
if len(addrs) != 0 {
log.Infof(ctx, "fetched IP addresses: %v", addrs)
}
return addrs, nil
}
// canRefreshLocked returns true if it's been at least X milliseconds since the
// last time the tenant pod information was refreshed. This has the effect of
// rate limiting RefreshPods calls.
//
// NOTE: Caller must lock the "calls" mutex before calling canRefreshLocked.
func (e *tenantEntry) canRefreshLocked() bool {
now := timeutil.Now()
if now.Sub(e.calls.lastRefresh) < e.RefreshDelay {
return false
}
e.calls.lastRefresh = now
return true
}