-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternate, Simplified Async Pull #137
Changes from 5 commits
941d2ee
cfc23bd
c98e7d6
b9b5962
d65aef7
b6b702d
7a688f2
c497347
75b2517
8374809
6ff665b
fbd31d5
c206d9d
fdef232
22b486f
d805aeb
7a7b0f0
1aece15
6974562
a937ec6
89adc0e
d31d47d
b9cd81a
49207fa
1531ae6
84109f4
613debd
901c5d2
74edb81
074db0e
5fad043
3921e6c
88f6a39
62fc0d0
1879256
48fcd61
17c1c87
8b54915
b5d6c03
b824954
4c6d716
a75a0be
6f13aba
f3cd52d
6c28744
2ec4f6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,8 +51,8 @@ var ( | |
enableCache = flag.Bool("enable-daemon-image-credential-cache", true, | ||
"Whether to save contents of imagepullsecrets of the daemon ServiceAccount in memory. "+ | ||
"If set to false, secrets will be fetched from the API server on every image pull.") | ||
asyncImagePullMount = flag.Bool("async-pull-mount", false, | ||
"Whether to pull images asynchronously (helps prevent timeout for larger images)") | ||
asyncImagePullTimeout = flag.Duration("async-pull-timeout", 0, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
"If positive, specifies duration allotted for async image pulls as measured from pull start time. If zero, negative, less than 30s, or omitted, the caller's timeout (usually kubelet: 2m) is used instead of this value. (additional time helps prevent timeout for larger images or slower image pull conditions)") | ||
watcherResyncPeriod = flag.Duration("watcher-resync-period", 30*time.Minute, "The resync period of the pvc watcher.") | ||
mode = flag.String("mode", "", "The mode of the driver. Valid values are: node, controller") | ||
nodePluginSA = flag.String("node-plugin-sa", "csi-image-warm-metal", "The name of the ServiceAccount used by the node plugin.") | ||
|
@@ -129,7 +129,7 @@ func main() { | |
server.Start(*endpoint, | ||
NewIdentityServer(driverVersion), | ||
nil, | ||
NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullMount)) | ||
NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullTimeout)) | ||
case controllerMode: | ||
watcher, err := watcher.New(context.Background(), *watcherResyncPeriod) | ||
if err != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,14 @@ import ( | |
"context" | ||
"os" | ||
"strings" | ||
"time" | ||
|
||
"github.com/container-storage-interface/spec/lib/go/csi" | ||
"github.com/containerd/containerd/reference/docker" | ||
"github.com/google/uuid" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/backend" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/metrics" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/mountexecutor" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/mountstatus" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/pullexecutor" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimageasync" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/secret" | ||
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" | ||
"google.golang.org/grpc/codes" | ||
|
@@ -31,36 +30,33 @@ const ( | |
|
||
type ImagePullStatus int | ||
|
||
func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullMount bool) *NodeServer { | ||
return &NodeServer{ | ||
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), | ||
mounter: mounter, | ||
secretStore: secretStore, | ||
asyncImagePullMount: asyncImagePullMount, | ||
mountExecutor: mountexecutor.NewMountExecutor(&mountexecutor.MountExecutorOptions{ | ||
AsyncMount: asyncImagePullMount, | ||
Mounter: mounter, | ||
}), | ||
pullExecutor: pullexecutor.NewPullExecutor(&pullexecutor.PullExecutorOptions{ | ||
AsyncPull: asyncImagePullMount, | ||
ImageServiceClient: imageSvc, | ||
SecretStore: secretStore, | ||
Mounter: mounter, | ||
}), | ||
func NewNodeServer(driver *csicommon.CSIDriver, mounter *backend.SnapshotMounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullTimeout time.Duration) *NodeServer { | ||
ns := NodeServer{ | ||
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), | ||
mounter: mounter, | ||
secretStore: secretStore, | ||
asyncImagePullTimeout: asyncImagePullTimeout, | ||
asyncImagePuller: nil, | ||
} | ||
if asyncImagePullTimeout >= time.Duration(30*time.Second) { | ||
ns.asyncImagePuller = remoteimageasync.StartAsyncPuller(context.TODO(), 100, 20) | ||
} else { | ||
ns.asyncImagePullTimeout = 0 // set to default value | ||
mugdha-adhav marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return &ns | ||
} | ||
|
||
type NodeServer struct { | ||
*csicommon.DefaultNodeServer | ||
mounter *backend.SnapshotMounter | ||
secretStore secret.Store | ||
asyncImagePullMount bool | ||
mountExecutor *mountexecutor.MountExecutor | ||
pullExecutor *pullexecutor.PullExecutor | ||
mounter *backend.SnapshotMounter | ||
imageSvc cri.ImageServiceClient | ||
secretStore secret.Store | ||
asyncImagePullTimeout time.Duration | ||
asyncImagePuller remoteimageasync.AsyncPuller | ||
} | ||
|
||
func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) { | ||
valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"], "request-id", uuid.NewString()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we removing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the only place that it was used after this PR. it didn't relate to anything else and wasn't stored for association with future log messages, etc. didn't add any value in current form. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was added so that we could track the log messages belonging to a particular The NodePublishVolume function call is supposed to be idempotent and the CO (container orchestrator) may call it several times. From CSI spec docs -
|
||
valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"]) | ||
valuesLogger.Info("Incoming NodePublishVolume request", "request string", req.String()) | ||
if len(req.VolumeId) == 0 { | ||
err = status.Error(codes.InvalidArgument, "VolumeId is missing") | ||
|
@@ -122,54 +118,51 @@ func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishV | |
image = req.VolumeContext[ctxKeyImage] | ||
} | ||
|
||
namedRef, err := docker.ParseDockerRef(image) | ||
if err != nil { | ||
klog.Errorf("unable to normalize image %q: %s", image, err) | ||
return | ||
} | ||
|
||
pullAlways := strings.ToLower(req.VolumeContext[ctxKeyPullAlways]) == "true" | ||
|
||
po := &pullexecutor.PullOptions{ | ||
Context: ctx, | ||
NamedRef: namedRef, | ||
PullAlways: pullAlways, | ||
Image: image, | ||
PullSecrets: req.Secrets, | ||
Logger: valuesLogger, | ||
} | ||
|
||
if e := n.pullExecutor.StartPulling(po); e != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, e) | ||
keyring, err := n.secretStore.GetDockerKeyring(ctx, req.Secrets) | ||
if err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to fetch keyring: %s", err) | ||
mugdha-adhav marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
|
||
if e := n.pullExecutor.WaitForPull(po); e != nil { | ||
err = status.Errorf(codes.DeadlineExceeded, e.Error()) | ||
namedRef, err := docker.ParseDockerRef(image) | ||
if err != nil { | ||
klog.Errorf("unable to normalize image %q: %s", image, err) | ||
return | ||
} | ||
|
||
if mountstatus.Get(req.VolumeId) == mountstatus.Mounted { | ||
return &csi.NodePublishVolumeResponse{}, nil | ||
} | ||
|
||
o := &mountexecutor.MountOptions{ | ||
Context: ctx, | ||
NamedRef: namedRef, | ||
VolumeId: req.VolumeId, | ||
TargetPath: req.TargetPath, | ||
VolumeCapability: req.VolumeCapability, | ||
ReadOnly: req.Readonly, | ||
Logger: valuesLogger, | ||
} | ||
|
||
if e := n.mountExecutor.StartMounting(o); e != nil { | ||
err = status.Error(codes.Internal, e.Error()) | ||
return | ||
//NOTE: we are relying on n.mounter.ImageExists() to return false when | ||
// a first-time pull is in progress, else this logic may not be | ||
// correct. should test this. | ||
if pullAlways || !n.mounter.ImageExists(ctx, namedRef) { | ||
klog.Errorf("pull image %q", image) | ||
puller := remoteimage.NewPuller(n.imageSvc, namedRef, keyring) | ||
|
||
if n.asyncImagePuller != nil { | ||
var session remoteimageasync.PullSession | ||
session, err = n.asyncImagePuller.StartPull(image, puller, n.asyncImagePullTimeout) | ||
if err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) | ||
return | ||
} | ||
if err = n.asyncImagePuller.WaitForPull(session, ctx); err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) | ||
return | ||
} | ||
} else { | ||
if err = puller.Pull(ctx); err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) | ||
return | ||
} | ||
} | ||
} | ||
|
||
if e := n.mountExecutor.WaitForMount(o); e != nil { | ||
err = status.Errorf(codes.DeadlineExceeded, e.Error()) | ||
ro := req.Readonly || | ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY || | ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY | ||
if err = n.mounter.Mount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath), namedRef, ro); err != nil { | ||
err = status.Error(codes.Internal, err.Error()) | ||
return | ||
} | ||
|
||
|
@@ -194,17 +187,12 @@ func (n NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl | |
} | ||
|
||
if err = n.mounter.Unmount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath)); err != nil { | ||
// TODO(vadasambar): move this to mountexecutor once mountexecutor has `StartUnmounting` function | ||
//TODO: evaluate this metric in the absence of previous async implementation. this update makes mounting synchronous, not async (simpler). | ||
metrics.OperationErrorsCount.WithLabelValues("StartUnmounting").Inc() | ||
err = status.Error(codes.Internal, err.Error()) | ||
return | ||
} | ||
|
||
// Clear the mountstatus since the volume has been unmounted | ||
// Not doing this will make mount not work properly if the same volume is | ||
// attempted to mount twice | ||
mountstatus.Delete(req.VolumeId) | ||
|
||
return &csi.NodeUnpublishVolumeResponse{}, nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we pass this timeout value from helm values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @imuni4fun, we decided that we want to keep the
enableAsyncPull
and adding default timeout value to be passed in from values.yaml. Something like--async-pull-timeout={{ .Values.asyncPullTimeout }}