-
Notifications
You must be signed in to change notification settings - Fork 2
/
node_server.go
409 lines (346 loc) · 15.9 KB
/
node_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
package driver
import (
"context"
"fmt"
"os"
"github.com/BurntSushi/toml"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/rs/zerolog/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
mount "k8s.io/mount-utils"
)
// MaxVolumesPerNode is the maximum number of volumes a single node may host
const MaxVolumesPerNode int64 = 1024
// NodeStageVolume is called after the volume is attached to the instance, so it can be partitioned, formatted and mounted to a staging path
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
log.Info().Str("volume_id", req.VolumeId).Str("staging_target_path", req.StagingTargetPath).Msg("Request: NodeStageVolume")
if req.VolumeId == "" {
log.Error().Msg("must provide a VolumeId to NodeStageVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeId to NodeStageVolume")
}
if req.StagingTargetPath == "" {
log.Error().Msg("must provide a StagingTargetPath to NodeStageVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a StagingTargetPath to NodeStageVolume")
}
if req.VolumeCapability == nil {
log.Error().Msg("must provide a VolumeCapability to NodeStageVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeCapability to NodeStageVolume")
}
log.Debug().Str("volume_id", req.VolumeId).Msg("Formatting and mounting volume (staging)")
// Find the disk attachment location
attachedDiskPath := d.DiskHotPlugger.PathForVolume(req.VolumeId)
if attachedDiskPath == "" {
log.Error().Str("volume_id", req.VolumeId).Msg("path to volume (/dev/disk/by-id/VOLUME_ID) not found")
return nil, status.Errorf(codes.NotFound, "path to volume (/dev/disk/by-id/%s) not found", req.VolumeId)
}
// Format the volume if not already formatted
formatted, err := d.DiskHotPlugger.IsFormatted(attachedDiskPath)
if err != nil {
log.Error().Str("path", attachedDiskPath).Err(err).Msg("Formatted check errored")
return nil, err
}
log.Debug().Str("volume_id", req.VolumeId).Bool("formatted", formatted).Msg("Is currently formatted?")
if !formatted {
d.DiskHotPlugger.Format(d.DiskHotPlugger.PathForVolume(req.VolumeId), "ext4")
}
// Mount the volume if not already mounted
mounted, err := d.DiskHotPlugger.IsMounted(d.DiskHotPlugger.PathForVolume(req.VolumeId))
if err != nil {
log.Error().Str("path", attachedDiskPath).Err(err).Msg("Mounted check errored")
return nil, err
}
log.Debug().Str("volume_id", req.VolumeId).Bool("mounted", formatted).Msg("Is currently mounted?")
if !mounted {
mount := req.VolumeCapability.GetMount()
options := []string{}
if mount != nil {
options = mount.MountFlags
}
d.DiskHotPlugger.Mount(d.DiskHotPlugger.PathForVolume(req.VolumeId), req.StagingTargetPath, "ext4", options...)
}
return &csi.NodeStageVolumeResponse{}, nil
}
// NodeUnstageVolume unmounts the volume when it's finished with, ready for deletion
func (d *Driver) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
log.Info().Str("volume_id", req.VolumeId).Str("staging_target_path", req.StagingTargetPath).Msg("Request: NodeUnstageVolume")
if req.VolumeId == "" {
log.Error().Msg("must provide a VolumeId to NodeUnstageVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeId to NodeUnstageVolume")
}
if req.StagingTargetPath == "" {
log.Error().Msg("must provide a StagingTargetPath to NodeUnstageVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a StagingTargetPath to NodeUnstageVolume")
}
log.Debug().Str("volume_id", req.VolumeId).Str("path", req.StagingTargetPath).Msg("Unmounting volume (unstaging)")
path := d.DiskHotPlugger.PathForVolume(req.VolumeId)
if path == "" && !d.TestMode {
log.Error().Str("volume_id", req.VolumeId).Msg("path to volume (/dev/disk/by-id/VOLUME_ID) not found")
return &csi.NodeUnstageVolumeResponse{}, nil
}
mounted, err := d.DiskHotPlugger.IsMounted(path)
if err != nil {
log.Error().Str("path", path).Err(err).Msg("Mounted check errored")
return nil, err
}
log.Debug().Str("volume_id", req.VolumeId).Bool("mounted", mounted).Msg("Mounted check completed")
if mounted {
log.Debug().Str("volume_id", req.VolumeId).Bool("mounted", mounted).Msg("Unmounting")
d.DiskHotPlugger.Unmount(path)
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
// NodePublishVolume bind mounts the staging path into the container
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
log.Info().Str("volume_id", req.VolumeId).Str("staging_target_path", req.StagingTargetPath).Str("target_path", req.TargetPath).Msg("Request: NodePublishVolume")
if req.VolumeId == "" {
log.Error().Msg("must provide a VolumeId to NodePublishVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeId to NodePublishVolume")
}
if req.StagingTargetPath == "" {
log.Error().Msg("must provide a StagingTargetPath to NodePublishVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a StagingTargetPath to NodePublishVolume")
}
if req.TargetPath == "" {
log.Error().Msg("must provide a TargetPath to NodePublishVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a TargetPath to NodePublishVolume")
}
if req.VolumeCapability == nil {
log.Error().Msg("must provide a VolumeCapability to NodePublishVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeCapability to NodePublishVolume")
}
log.Debug().Str("volume_id", req.VolumeId).Str("from_path", req.StagingTargetPath).Str("to_path", req.TargetPath).Msg("Bind-mounting volume (publishing)")
err := os.MkdirAll(req.TargetPath, 0o750)
if err != nil {
log.Error().Str("volume_id", req.VolumeId).Str("targetPath", req.TargetPath).Err(err).Msg("Failed to create target path")
return nil, err
}
log.Debug().Str("volume_id", req.VolumeId).Str("targetPath", req.TargetPath).Msg("Ensuring target path exists")
// Mount the volume if not already mounted
mounted, err := d.DiskHotPlugger.IsMounted(req.TargetPath)
if err != nil {
log.Error().Str("path", req.TargetPath).Err(err).Msg("Mounted check errored")
return nil, err
}
log.Debug().Str("volume_id", req.VolumeId).Bool("mounted", mounted).Msg("Checking if currently mounting")
if !mounted {
options := []string{
"bind",
}
if req.Readonly {
options = append(options, "ro")
}
d.DiskHotPlugger.Mount(req.StagingTargetPath, req.TargetPath, "ext4", options...)
}
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume removes the bind mount
func (d *Driver) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
log.Info().Str("volume_id", req.VolumeId).Str("target_path", req.TargetPath).Msg("Request: NodeUnpublishVolume")
if req.VolumeId == "" {
log.Error().Msg("must provide a VolumeId to NodeUnpublishVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeId to NodeUnpublishVolume")
}
if req.TargetPath == "" {
log.Error().Msg("must provide a TargetPath to NodeUnpublishVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a TargetPath to NodeUnpublishVolume")
}
targetPath := req.GetTargetPath()
log.Info().Str("volume_id", req.VolumeId).Str("path", targetPath).Msg("Removing bind-mount for volume (unpublishing)")
mounted, err := d.DiskHotPlugger.IsMounted(targetPath)
if err != nil {
if os.IsNotExist(err) {
log.Debug().Str("targetPath", targetPath).Msg("targetPath has already been deleted")
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if !mount.IsCorruptedMnt(err) {
return &csi.NodeUnpublishVolumeResponse{}, err
}
mounted = true
}
log.Debug().Str("volume_id", req.VolumeId).Bool("mounted", mounted).Msg("Checking if currently mounting")
if !mounted {
if err = os.RemoveAll(targetPath); err != nil {
log.Error().Str("targetPath", targetPath).Err(err).Msg("Failed to remove target path")
return nil, status.Errorf(codes.Internal, "failed to remove target path %q: %s", targetPath, err)
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
err = d.DiskHotPlugger.Unmount(targetPath)
if err != nil {
log.Error().Str("targetPath", targetPath).Err(err).Msg("Failed to unmount target path")
return nil, err
}
log.Info().Str("volume_id", req.VolumeId).Str("target_path", targetPath).Msg("Removing target path")
err = os.Remove(targetPath)
if err != nil && !os.IsNotExist(err) {
log.Error().Str("targetPath", targetPath).Err(err).Msg("Failed to remove target path")
return nil, status.Errorf(codes.Internal, "failed to remove target path %q: %s", targetPath, err)
}
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetInfo returns some identifier (ID, name) for the current node
func (d *Driver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
log.Info().Msg("Request: NodeGetInfo")
nodeInstanceID, region, err := d.currentNodeDetails()
if err != nil {
log.Error().Err(err).Msg("Failed to get current node details")
return nil, status.Errorf(codes.Internal, "failed to get current node details: %s", err)
}
log.Debug().Str("node_id", nodeInstanceID).Str("region", region).Msg("Requested information about node")
return &csi.NodeGetInfoResponse{
NodeId: nodeInstanceID,
MaxVolumesPerNode: MaxVolumesPerNode,
// make sure that the driver works on this particular region only
AccessibleTopology: &csi.Topology{
Segments: map[string]string{
"region": region,
},
},
}, nil
}
// NodeGetVolumeStats returns the volume capacity statistics available for the the given volume
func (d *Driver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
log.Info().Str("volume_id", req.VolumeId).Msg("Request: NodeGetVolumeStats")
if req.VolumeId == "" {
log.Error().Msg("must provide a VolumeId to NodeGetVolumeStats")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeId to NodeGetVolumeStats")
}
volumePath := req.VolumePath
if volumePath == "" {
log.Error().Msg("must provide a VolumePath to NodeGetVolumeStats")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumePath to NodeGetVolumeStats")
}
mounted, err := d.DiskHotPlugger.IsMounted(volumePath)
if err != nil {
log.Error().Str("volume_id", req.VolumeId).Str("path", volumePath).Err(err).Msg("Failed to check if volume path is mounted")
return nil, status.Errorf(codes.Internal, "failed to check if volume path %q is mounted: %s", volumePath, err)
}
if !mounted {
log.Error().Str("volume_id", req.VolumeId).Str("path", volumePath).Msg("Volume path is not mounted")
return nil, status.Errorf(codes.NotFound, "volume path %q is not mounted", volumePath)
}
stats, err := d.DiskHotPlugger.GetStatistics(volumePath)
if err != nil {
log.Error().Str("volume_id", req.VolumeId).Str("path", volumePath).Err(err).Msg("Failed to retrieve capacity statistics")
return nil, status.Errorf(codes.Internal, "failed to retrieve capacity statistics for volume path %q: %s", volumePath, err)
}
log.Info().Int64("bytes_available", stats.AvailableBytes).Int64("bytes_total", stats.TotalBytes).
Int64("bytes_used", stats.UsedBytes).Int64("inodes_available", stats.AvailableInodes).Int64("inodes_total", stats.TotalInodes).
Int64("inodes_used", stats.UsedInodes).Msg("Node capacity statistics retrieved")
return &csi.NodeGetVolumeStatsResponse{
Usage: []*csi.VolumeUsage{
{
Available: stats.AvailableBytes,
Total: stats.TotalBytes,
Used: stats.UsedBytes,
Unit: csi.VolumeUsage_BYTES,
},
{
Available: stats.AvailableInodes,
Total: stats.TotalInodes,
Used: stats.UsedInodes,
Unit: csi.VolumeUsage_INODES,
},
},
}, nil
}
// NodeExpandVolume is used to expand the filesystem inside volumes
func (d *Driver) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
log.Info().Str("volume_id", req.VolumeId).Str("target_path", req.VolumePath).Msg("Request: NodeExpandVolume")
if req.VolumeId == "" {
log.Error().Msg("must provide a VolumeId to NodeExpandVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumeId to NodeExpandVolume")
}
if req.VolumePath == "" {
log.Error().Msg("must provide a VolumePath to NodeExpandVolume")
return nil, status.Error(codes.InvalidArgument, "must provide a VolumePath to NodeExpandVolume")
}
_, err := d.CivoClient.GetVolume(req.VolumeId)
if err != nil {
log.Error().Str("volume_id", req.VolumeId).Err(err).Msg("Failed to find VolumeID to NodeExpandVolume")
return nil, status.Errorf(codes.NotFound, "unable to find VolumeID %q to NodeExpandVolume: %s", req.VolumeId, err)
}
// Find the disk attachment location
attachedDiskPath := d.DiskHotPlugger.PathForVolume(req.VolumeId)
if attachedDiskPath == "" {
log.Error().Str("volume_id", req.VolumeId).Msg("path to volume (/dev/disk/by-id/VOLUME_ID) not found")
return nil, status.Errorf(codes.NotFound, "path to volume (/dev/disk/by-id/%s) not found", req.VolumeId)
}
log.Info().Str("volume_id", req.VolumeId).Str("path", attachedDiskPath).Msg("Expanding Volume")
err = d.DiskHotPlugger.ExpandFilesystem(d.DiskHotPlugger.PathForVolume(req.VolumeId))
if err != nil {
log.Error().Str("volume_id", req.VolumeId).Err(err).Msg("Failed to expand filesystem")
return nil, status.Errorf(codes.Internal, "failed to expand file system: %s", err)
}
// TODO: Get new size for resposne
return &csi.NodeExpandVolumeResponse{}, nil
}
// NodeGetCapabilities returns the capabilities that this node and driver support
func (d *Driver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
// Intentionally don't return VOLUME_CONDITION and NODE_GET_VOLUME_STATS
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
},
},
}, nil
}
type civostatsdConfig struct {
Server string
Token string
Region string
InstanceID string `toml:"instance_id"`
}
func (d *Driver) currentNodeDetails() (string, string, error) {
configFile := "/etc/civostatsd"
_, err := os.Stat(configFile)
if err != nil {
log.Debug().Msg("Node details file /etc/civostatsd doesn't existing, using ENVironment variables")
return d.currentNodeDetailsFromEnv()
}
var config civostatsdConfig
if _, err := toml.DecodeFile(configFile, &config); err != nil {
log.Debug().Msg("Node details file /etc/civostatsd isn't valid TOML, using ENVironment variables")
return d.currentNodeDetailsFromEnv()
}
return config.InstanceID, config.Region, nil
}
// Get the node details from the environment variables
// NODE_ID is the ID of the node that can be used to access details from the CIVO API
// REGION is the region that the node is in
// If NODE_ID is not set, then the KUBE_NODE_NAME is used to fetch the node using it's name
func (d *Driver) currentNodeDetailsFromEnv() (string, string, error) {
if os.Getenv("NODE_ID") == "" {
nodeName := os.Getenv("KUBE_NODE_NAME")
if nodeName == "" {
return "", "", fmt.Errorf("NODE_ID is not set and KUBE_NODE_NAME is not set")
}
instance, err := d.CivoClient.FindKubernetesClusterInstance(d.ClusterID, nodeName)
if err != nil {
return "", "", err
}
// Return the instance ID and the region
return instance.ID, instance.Region, nil
}
return os.Getenv("NODE_ID"), os.Getenv("REGION"), nil
}