-
Notifications
You must be signed in to change notification settings - Fork 2k
/
driver.go
578 lines (485 loc) · 15.4 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
package drivers
import (
"context"
"crypto/md5"
"fmt"
"io"
"path/filepath"
"sort"
"strconv"
"time"
"github.com/hashicorp/nomad/client/allocdir"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers/proto"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/zclconf/go-cty/cty"
"github.com/zclconf/go-cty/cty/msgpack"
)
const (
// DriverHealthy is the default health description that should be used
// if the driver is nominal
DriverHealthy = "Healthy"
// Pre09TaskHandleVersion is the version used to identify that the task
// handle is from a driver that existed before driver plugins (v0.9). The
// driver should take appropriate action to handle the old driver state.
Pre09TaskHandleVersion = 0
)
// DriverPlugin is the interface with drivers will implement. It is also
// implemented by a plugin client which proxies the calls to go-plugin. See
// the proto/driver.proto file for detailed information about each RPC and
// message structure.
type DriverPlugin interface {
base.BasePlugin
TaskConfigSchema() (*hclspec.Spec, error)
Capabilities() (*Capabilities, error)
Fingerprint(context.Context) (<-chan *Fingerprint, error)
RecoverTask(*TaskHandle) error
StartTask(*TaskConfig) (*TaskHandle, *DriverNetwork, error)
WaitTask(ctx context.Context, taskID string) (<-chan *ExitResult, error)
StopTask(taskID string, timeout time.Duration, signal string) error
DestroyTask(taskID string, force bool) error
InspectTask(taskID string) (*TaskStatus, error)
TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error)
TaskEvents(context.Context) (<-chan *TaskEvent, error)
SignalTask(taskID string, signal string) error
ExecTask(taskID string, cmd []string, timeout time.Duration) (*ExecTaskResult, error)
}
// ExecTaskStreamingDriver marks that a driver supports streaming exec task. This represents a user friendly
// interface to implement, as an alternative to the ExecTaskStreamingRawDriver, the low level interface.
type ExecTaskStreamingDriver interface {
ExecTaskStreaming(ctx context.Context, taskID string, execOptions *ExecOptions) (*ExitResult, error)
}
type ExecOptions struct {
// Command is command to run
Command []string
// Tty indicates whether pseudo-terminal is to be allocated
Tty bool
// streams
Stdin io.ReadCloser
Stdout io.WriteCloser
Stderr io.WriteCloser
// terminal size channel
ResizeCh <-chan TerminalSize
}
// DriverNetworkManager is the interface with exposes function for creating a
// network namespace for which tasks can join. This only needs to be implemented
// if the driver MUST create the network namespace
type DriverNetworkManager interface {
CreateNetwork(allocID string) (*NetworkIsolationSpec, bool, error)
DestroyNetwork(allocID string, spec *NetworkIsolationSpec) error
}
// DriverSignalTaskNotSupported can be embedded by drivers which don't support
// the SignalTask RPC. This satisfies the SignalTask func requirement for the
// DriverPlugin interface.
type DriverSignalTaskNotSupported struct{}
func (DriverSignalTaskNotSupported) SignalTask(taskID, signal string) error {
return fmt.Errorf("SignalTask is not supported by this driver")
}
// DriverExecTaskNotSupported can be embedded by drivers which don't support
// the ExecTask RPC. This satisfies the ExecTask func requirement of the
// DriverPlugin interface.
type DriverExecTaskNotSupported struct{}
func (_ DriverExecTaskNotSupported) ExecTask(taskID string, cmd []string, timeout time.Duration) (*ExecTaskResult, error) {
return nil, fmt.Errorf("ExecTask is not supported by this driver")
}
type HealthState string
var (
HealthStateUndetected = HealthState("undetected")
HealthStateUnhealthy = HealthState("unhealthy")
HealthStateHealthy = HealthState("healthy")
)
type Fingerprint struct {
Attributes map[string]*pstructs.Attribute
Health HealthState
HealthDescription string
// Err is set by the plugin if an error occurred during fingerprinting
Err error
}
// FSIsolation is an enumeration to describe what kind of filesystem isolation
// a driver supports.
type FSIsolation string
var (
// FSIsolationNone means no isolation. The host filesystem is used.
FSIsolationNone = FSIsolation("none")
// FSIsolationChroot means the driver will use a chroot on the host
// filesystem.
FSIsolationChroot = FSIsolation("chroot")
// FSIsolationImage means the driver uses an image.
FSIsolationImage = FSIsolation("image")
)
type Capabilities struct {
// SendSignals marks the driver as being able to send signals
SendSignals bool
// Exec marks the driver as being able to execute arbitrary commands
// such as health checks. Used by the ScriptExecutor interface.
Exec bool
//FSIsolation indicates what kind of filesystem isolation the driver supports.
FSIsolation FSIsolation
//NetIsolationModes lists the set of isolation modes supported by the driver
NetIsolationModes []NetIsolationMode
// MustInitiateNetwork tells Nomad that the driver must create the network
// namespace and that the CreateNetwork and DestroyNetwork RPCs are implemented.
MustInitiateNetwork bool
// MountConfigs tells Nomad which mounting config options the driver supports.
MountConfigs MountConfigSupport
}
func (c *Capabilities) HasNetIsolationMode(m NetIsolationMode) bool {
for _, mode := range c.NetIsolationModes {
if mode == m {
return true
}
}
return false
}
type NetIsolationMode string
var (
// NetIsolationModeHost disables network isolation and uses the host network
NetIsolationModeHost = NetIsolationMode("host")
// NetIsolationModeGroup uses the group network namespace for isolation
NetIsolationModeGroup = NetIsolationMode("group")
// NetIsolationModeTask isolates the network to just the task
NetIsolationModeTask = NetIsolationMode("task")
// NetIsolationModeNone indicates that there is no network to isolate and is
// intended to be used for tasks that the client manages remotely
NetIsolationModeNone = NetIsolationMode("none")
)
type NetworkIsolationSpec struct {
Mode NetIsolationMode
Path string
Labels map[string]string
}
// MountConfigSupport is an enum that defaults to "all" for backwards
// compatibility with community drivers.
type MountConfigSupport int32
const (
MountConfigSupportAll MountConfigSupport = iota
MountConfigSupportNone
)
type TerminalSize struct {
Height int
Width int
}
type DNSConfig struct {
Servers []string
Searches []string
Options []string
}
func (c *DNSConfig) Copy() *DNSConfig {
if c == nil {
return nil
}
cfg := new(DNSConfig)
if len(c.Servers) > 0 {
cfg.Servers = make([]string, len(c.Servers))
copy(cfg.Servers, c.Servers)
}
if len(c.Searches) > 0 {
cfg.Searches = make([]string, len(c.Searches))
copy(cfg.Searches, c.Searches)
}
if len(c.Options) > 0 {
cfg.Options = make([]string, len(c.Options))
copy(cfg.Options, c.Options)
}
return cfg
}
type TaskConfig struct {
ID string
JobName string
TaskGroupName string
Name string
Env map[string]string
DeviceEnv map[string]string
Resources *Resources
Devices []*DeviceConfig
Mounts []*MountConfig
User string
AllocDir string
rawDriverConfig []byte
StdoutPath string
StderrPath string
AllocID string
NetworkIsolation *NetworkIsolationSpec
DNS *DNSConfig
}
func (tc *TaskConfig) Copy() *TaskConfig {
if tc == nil {
return nil
}
c := new(TaskConfig)
*c = *tc
c.Env = helper.CopyMapStringString(c.Env)
c.DeviceEnv = helper.CopyMapStringString(c.DeviceEnv)
c.Resources = tc.Resources.Copy()
c.DNS = tc.DNS.Copy()
if c.Devices != nil {
dc := make([]*DeviceConfig, len(c.Devices))
for i, c := range c.Devices {
dc[i] = c.Copy()
}
c.Devices = dc
}
if c.Mounts != nil {
mc := make([]*MountConfig, len(c.Mounts))
for i, m := range c.Mounts {
mc[i] = m.Copy()
}
c.Mounts = mc
}
return c
}
func (tc *TaskConfig) EnvList() []string {
l := make([]string, 0, len(tc.Env))
for k, v := range tc.Env {
l = append(l, k+"="+v)
}
sort.Strings(l)
return l
}
func (tc *TaskConfig) TaskDir() *allocdir.TaskDir {
taskDir := filepath.Join(tc.AllocDir, tc.Name)
return &allocdir.TaskDir{
Dir: taskDir,
SharedAllocDir: filepath.Join(tc.AllocDir, allocdir.SharedAllocName),
LogDir: filepath.Join(tc.AllocDir, allocdir.SharedAllocName, allocdir.LogDirName),
SharedTaskDir: filepath.Join(taskDir, allocdir.SharedAllocName),
LocalDir: filepath.Join(taskDir, allocdir.TaskLocal),
SecretsDir: filepath.Join(taskDir, allocdir.TaskSecrets),
}
}
func (tc *TaskConfig) DecodeDriverConfig(t interface{}) error {
return base.MsgPackDecode(tc.rawDriverConfig, t)
}
func (tc *TaskConfig) EncodeDriverConfig(val cty.Value) error {
data, err := msgpack.Marshal(val, val.Type())
if err != nil {
return err
}
tc.rawDriverConfig = data
return nil
}
func (tc *TaskConfig) EncodeConcreteDriverConfig(t interface{}) error {
data := []byte{}
err := base.MsgPackEncode(&data, t)
if err != nil {
return err
}
tc.rawDriverConfig = data
return nil
}
type Resources struct {
NomadResources *structs.AllocatedTaskResources
LinuxResources *LinuxResources
}
func (r *Resources) Copy() *Resources {
if r == nil {
return nil
}
res := new(Resources)
if r.NomadResources != nil {
res.NomadResources = r.NomadResources.Copy()
}
if r.LinuxResources != nil {
res.LinuxResources = r.LinuxResources.Copy()
}
return res
}
type LinuxResources struct {
CPUPeriod int64
CPUQuota int64
CPUShares int64
MemoryLimitBytes int64
OOMScoreAdj int64
CpusetCPUs string
CpusetMems string
// PrecentTicks is used to calculate the CPUQuota, currently the docker
// driver exposes cpu period and quota through the driver configuration
// and thus the calculation for CPUQuota cannot be done on the client.
// This is a capatability and should only be used by docker until the docker
// specific options are deprecated in favor of exposes CPUPeriod and
// CPUQuota at the task resource stanza.
PercentTicks float64
}
func (r *LinuxResources) Copy() *LinuxResources {
res := new(LinuxResources)
*res = *r
return res
}
type DeviceConfig struct {
TaskPath string
HostPath string
Permissions string
}
func (d *DeviceConfig) Copy() *DeviceConfig {
if d == nil {
return nil
}
dc := new(DeviceConfig)
*dc = *d
return dc
}
type MountConfig struct {
TaskPath string
HostPath string
Readonly bool
PropagationMode string
}
func (m *MountConfig) IsEqual(o *MountConfig) bool {
return m.TaskPath == o.TaskPath &&
m.HostPath == o.HostPath &&
m.Readonly == o.Readonly &&
m.PropagationMode == o.PropagationMode
}
func (m *MountConfig) Copy() *MountConfig {
if m == nil {
return nil
}
mc := new(MountConfig)
*mc = *m
return mc
}
const (
TaskStateUnknown TaskState = "unknown"
TaskStateRunning TaskState = "running"
TaskStateExited TaskState = "exited"
)
type TaskState string
type ExitResult struct {
ExitCode int
Signal int
OOMKilled bool
Err error
}
func (r *ExitResult) Successful() bool {
return r.ExitCode == 0 && r.Signal == 0 && r.Err == nil
}
func (r *ExitResult) Copy() *ExitResult {
if r == nil {
return nil
}
res := new(ExitResult)
*res = *r
return res
}
type TaskStatus struct {
ID string
Name string
State TaskState
StartedAt time.Time
CompletedAt time.Time
ExitResult *ExitResult
DriverAttributes map[string]string
NetworkOverride *DriverNetwork
}
type TaskEvent struct {
TaskID string
TaskName string
AllocID string
Timestamp time.Time
Message string
Annotations map[string]string
// Err is only used if an error occurred while consuming the RPC stream
Err error
}
type ExecTaskResult struct {
Stdout []byte
Stderr []byte
ExitResult *ExitResult
}
// DriverNetwork is the network created by driver's (eg Docker's bridge
// network) during Prestart.
type DriverNetwork struct {
// PortMap can be set by drivers to replace ports in environment
// variables with driver-specific mappings.
PortMap map[string]int
// IP is the IP address for the task created by the driver.
IP string
// AutoAdvertise indicates whether the driver thinks services that
// choose to auto-advertise-addresses should use this IP instead of the
// host's. eg If a Docker network plugin is used
AutoAdvertise bool
}
// Advertise returns true if the driver suggests using the IP set. May be
// called on a nil Network in which case it returns false.
func (d *DriverNetwork) Advertise() bool {
return d != nil && d.AutoAdvertise
}
// Copy a DriverNetwork struct. If it is nil, nil is returned.
func (d *DriverNetwork) Copy() *DriverNetwork {
if d == nil {
return nil
}
pm := make(map[string]int, len(d.PortMap))
for k, v := range d.PortMap {
pm[k] = v
}
return &DriverNetwork{
PortMap: pm,
IP: d.IP,
AutoAdvertise: d.AutoAdvertise,
}
}
// Hash the contents of a DriverNetwork struct to detect changes. If it is nil,
// an empty slice is returned.
func (d *DriverNetwork) Hash() []byte {
if d == nil {
return []byte{}
}
h := md5.New()
io.WriteString(h, d.IP)
io.WriteString(h, strconv.FormatBool(d.AutoAdvertise))
for k, v := range d.PortMap {
io.WriteString(h, k)
io.WriteString(h, strconv.Itoa(v))
}
return h.Sum(nil)
}
//// helper types for operating on raw exec operation
// we alias proto instances as much as possible to avoid conversion overhead
// ExecTaskStreamingRawDriver represents a low-level interface for executing a streaming exec
// call, and is intended to be used when driver instance is to delegate exec handling to another
// backend, e.g. to a executor or a driver behind a grpc/rpc protocol
//
// Nomad client would prefer this interface method over `ExecTaskStreaming` if driver implements it.
type ExecTaskStreamingRawDriver interface {
ExecTaskStreamingRaw(
ctx context.Context,
taskID string,
command []string,
tty bool,
stream ExecTaskStream) error
}
// ExecTaskStream represents a stream of exec streaming messages,
// and is a handle to get stdin and tty size and send back
// stdout/stderr and exit operations.
//
// The methods are not concurrent safe; callers must ensure that methods are called
// from at most one goroutine.
type ExecTaskStream interface {
// Send relays response message back to API.
//
// The call is synchronous and no references to message is held: once
// method call completes, the message reference can be reused or freed.
Send(*ExecTaskStreamingResponseMsg) error
// Receive exec streaming messages from API. Returns `io.EOF` on completion of stream.
Recv() (*ExecTaskStreamingRequestMsg, error)
}
type ExecTaskStreamingRequestMsg = proto.ExecTaskStreamingRequest
type ExecTaskStreamingResponseMsg = proto.ExecTaskStreamingResponse
// InternalCapabilitiesDriver is an experimental interface enabling a driver
// to disable some nomad functionality (e.g. logs or metrics).
//
// Intended for internal drivers only while the interface is stabalized.
type InternalCapabilitiesDriver interface {
InternalCapabilities() InternalCapabilities
}
// InternalCapabilities flags disabled functionality.
// Zero value means all is supported.
type InternalCapabilities struct {
DisableLogCollection bool
DisableMetricsCollection bool
}