-
Notifications
You must be signed in to change notification settings - Fork 2k
/
operator.go
473 lines (380 loc) · 12.9 KB
/
operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package api
import (
"encoding/json"
"errors"
"io"
"net/http"
"strconv"
"strings"
"time"
)
// Operator can be used to perform low-level operator tasks for Nomad.
type Operator struct {
c *Client
}
// Operator returns a handle to the operator endpoints.
func (c *Client) Operator() *Operator {
return &Operator{c}
}
// RaftServer has information about a server in the Raft configuration.
type RaftServer struct {
// ID is the unique ID for the server. These are currently the same
// as the address, but they will be changed to a real GUID in a future
// release of Nomad.
ID string
// Node is the node name of the server, as known by Nomad, or this
// will be set to "(unknown)" otherwise.
Node string
// Address is the IP:port of the server, used for Raft communications.
Address string
// Leader is true if this server is the current cluster leader.
Leader bool
// Voter is true if this server has a vote in the cluster. This might
// be false if the server is staging and still coming online, or if
// it's a non-voting server, which will be added in a future release of
// Nomad.
Voter bool
// RaftProtocol is the version of the Raft protocol spoken by this server.
RaftProtocol string
}
// RaftConfiguration is returned when querying for the current Raft configuration.
type RaftConfiguration struct {
// Servers has the list of servers in the Raft configuration.
Servers []*RaftServer
// Index has the Raft index of this configuration.
Index uint64
}
// RaftGetConfiguration is used to query the current Raft peer set.
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
r, err := op.c.newRequest("GET", "/v1/operator/raft/configuration")
if err != nil {
return nil, err
}
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r)) //nolint:bodyclose
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out RaftConfiguration
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}
// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
// quorum but no longer known to Serf or the catalog) by address in the form of
// "IP:port".
func (op *Operator) RaftRemovePeerByAddress(address string, q *WriteOptions) error {
r, err := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
if err != nil {
return err
}
r.setWriteOptions(q)
r.params.Set("address", address)
_, resp, err := requireOK(op.c.doRequest(r)) //nolint:bodyclose
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
// quorum but no longer known to Serf or the catalog) by ID.
func (op *Operator) RaftRemovePeerByID(id string, q *WriteOptions) error {
r, err := op.c.newRequest("DELETE", "/v1/operator/raft/peer")
if err != nil {
return err
}
r.setWriteOptions(q)
r.params.Set("id", id)
_, resp, err := requireOK(op.c.doRequest(r)) //nolint:bodyclose
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// RaftTransferLeadershipByAddress is used to transfer leadership to a
// different peer using its address in the form of "IP:port".
func (op *Operator) RaftTransferLeadershipByAddress(address string, q *WriteOptions) error {
r, err := op.c.newRequest("PUT", "/v1/operator/raft/transfer-leadership")
if err != nil {
return err
}
r.setWriteOptions(q)
r.params.Set("address", address)
_, resp, err := requireOK(op.c.doRequest(r)) //nolint:bodyclose
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// RaftTransferLeadershipByID is used to transfer leadership to a
// different peer using its Raft ID.
func (op *Operator) RaftTransferLeadershipByID(id string, q *WriteOptions) error {
r, err := op.c.newRequest("PUT", "/v1/operator/raft/transfer-leadership")
if err != nil {
return err
}
r.setWriteOptions(q)
r.params.Set("id", id)
_, resp, err := requireOK(op.c.doRequest(r)) //nolint:bodyclose
if err != nil {
return err
}
resp.Body.Close()
return nil
}
// SchedulerConfiguration is the config for controlling scheduler behavior
type SchedulerConfiguration struct {
// SchedulerAlgorithm lets you select between available scheduling algorithms.
SchedulerAlgorithm SchedulerAlgorithm
// PreemptionConfig specifies whether to enable eviction of lower
// priority jobs to place higher priority jobs.
PreemptionConfig PreemptionConfig
// MemoryOversubscriptionEnabled specifies whether memory oversubscription is enabled
MemoryOversubscriptionEnabled bool
// RejectJobRegistration disables new job registrations except with a
// management ACL token
RejectJobRegistration bool
// PauseEvalBroker stops the leader evaluation broker process from running
// until the configuration is updated and written to the Nomad servers.
PauseEvalBroker bool
// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
}
// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration
type SchedulerConfigurationResponse struct {
// SchedulerConfig contains scheduler config options
SchedulerConfig *SchedulerConfiguration
QueryMeta
}
// SchedulerSetConfigurationResponse is the response object used
// when updating scheduler configuration
type SchedulerSetConfigurationResponse struct {
// Updated returns whether the config was actually updated
// Only set when the request uses CAS
Updated bool
WriteMeta
}
// SchedulerAlgorithm is an enum string that encapsulates the valid options for a
// SchedulerConfiguration block's SchedulerAlgorithm. These modes will allow the
// scheduler to be user-selectable.
type SchedulerAlgorithm string
const (
SchedulerAlgorithmBinpack SchedulerAlgorithm = "binpack"
SchedulerAlgorithmSpread SchedulerAlgorithm = "spread"
)
// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
SysBatchSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
}
// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) {
var resp SchedulerConfigurationResponse
qm, err := op.c.query("/v1/operator/scheduler/configuration", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) {
var out SchedulerSetConfigurationResponse
wm, err := op.c.put("/v1/operator/scheduler/configuration", conf, &out, q)
if err != nil {
return nil, nil, err
}
return &out, wm, nil
}
// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) {
var out SchedulerSetConfigurationResponse
wm, err := op.c.put("/v1/operator/scheduler/configuration?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return nil, nil, err
}
return &out, wm, nil
}
// Snapshot is used to capture a snapshot state of a running cluster.
// The returned reader that must be consumed fully
func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) {
r, err := op.c.newRequest("GET", "/v1/operator/snapshot")
if err != nil {
return nil, err
}
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r)) //nolint:bodyclose
if err != nil {
return nil, err
}
digest := resp.Header.Get("Digest")
cr, err := newChecksumValidatingReader(resp.Body, digest)
if err != nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return nil, err
}
return cr, nil
}
// SnapshotRestore is used to restore a running nomad cluster to an original
// state.
func (op *Operator) SnapshotRestore(in io.Reader, q *WriteOptions) (*WriteMeta, error) {
wm, err := op.c.put("/v1/operator/snapshot", in, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
type License struct {
// The unique identifier of the license
LicenseID string
// The customer ID associated with the license
CustomerID string
// If set, an identifier that should be used to lock the license to a
// particular site, cluster, etc.
InstallationID string
// The time at which the license was issued
IssueTime time.Time
// The time at which the license starts being valid
StartTime time.Time
// The time after which the license expires
ExpirationTime time.Time
// The time at which the license ceases to function and can
// no longer be used in any capacity
TerminationTime time.Time
// The product the license is valid for
Product string
// License Specific Flags
Flags map[string]interface{}
// Modules is a list of the licensed enterprise modules
Modules []string
// List of features enabled by the license
Features []string
}
type LicenseReply struct {
License *License
ConfigOutdated bool
QueryMeta
}
type ApplyLicenseOptions struct {
Force bool
}
func (op *Operator) LicensePut(license string, q *WriteOptions) (*WriteMeta, error) {
return op.ApplyLicense(license, nil, q)
}
func (op *Operator) ApplyLicense(license string, opts *ApplyLicenseOptions, q *WriteOptions) (*WriteMeta, error) {
r, err := op.c.newRequest("PUT", "/v1/operator/license")
if err != nil {
return nil, err
}
if opts != nil && opts.Force {
r.params.Add("force", "true")
}
r.setWriteOptions(q)
r.body = strings.NewReader(license)
rtt, resp, err := requireOK(op.c.doRequest(r)) //nolint:bodyclose
if err != nil {
return nil, err
}
defer resp.Body.Close()
wm := &WriteMeta{RequestTime: rtt}
parseWriteMeta(resp, wm)
return wm, nil
}
func (op *Operator) LicenseGet(q *QueryOptions) (*LicenseReply, *QueryMeta, error) {
req, err := op.c.newRequest("GET", "/v1/operator/license")
if err != nil {
return nil, nil, err
}
req.setQueryOptions(q)
var reply LicenseReply
rtt, resp, err := op.c.doRequest(req) //nolint:bodyclose
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil, nil, errors.New("Nomad Enterprise only endpoint")
}
if resp.StatusCode != http.StatusOK {
return nil, nil, newUnexpectedResponseError(
fromHTTPResponse(resp),
withExpectedStatuses([]int{http.StatusOK, http.StatusNoContent}),
)
}
err = json.NewDecoder(resp.Body).Decode(&reply)
if err != nil {
return nil, nil, err
}
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
return &reply, qm, nil
}
type LeadershipTransferResponse struct {
From RaftServer
To RaftServer
Noop bool
Err error
WriteMeta
}
// VaultWorkloadIdentityUpgradeCheck is the result of verifying if the cluster
// is ready to switch to workload identities for Vault.
type VaultWorkloadIdentityUpgradeCheck struct {
// JobsWithoutVaultIdentity is the list of jobs that have a `vault` block
// but do not have an `identity` for Vault.
JobsWithoutVaultIdentity []*JobListStub
// OutdatedNodes is the list of nodes running a version of Nomad that does
// not support workload identities for Vault.
OutdatedNodes []*NodeListStub
// VaultTokens is the list of Vault ACL token accessors that Nomad created
// and will no longer manage after the cluster is migrated to workload
// identities.
VaultTokens []*VaultAccessor
}
// Ready returns true if the cluster is ready to migrate to workload identities
// with Vault.
func (v *VaultWorkloadIdentityUpgradeCheck) Ready() bool {
return v != nil &&
len(v.VaultTokens) == 0 &&
len(v.OutdatedNodes) == 0 &&
len(v.JobsWithoutVaultIdentity) == 0
}
// VaultAccessor is a Vault ACL token created by Nomad for a task to access
// Vault using the legacy authentication flow.
type VaultAccessor struct {
// AllocID is the ID of the allocation that requested this token.
AllocID string
// Task is the name of the task that requested this token.
Task string
// NodeID is the ID of the node running the allocation that requested this
// token.
NodeID string
// Accessor is the Vault ACL token accessor ID.
Accessor string
// CreationTTL is the TTL set when the token was created.
CreationTTL int
// CreateIndex is the Raft index when the token was created.
CreateIndex uint64
}
// UpgradeCheckVaultWorkloadIdentity retrieves the cluster status for migrating
// to workload identities with Vault.
func (op *Operator) UpgradeCheckVaultWorkloadIdentity(q *QueryOptions) (*VaultWorkloadIdentityUpgradeCheck, *QueryMeta, error) {
var resp VaultWorkloadIdentityUpgradeCheck
qm, err := op.c.query("/v1/operator/upgrade-check/vault-workload-identity", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}