-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternate, Simplified Async Pull #137
Changes from 45 commits
941d2ee
cfc23bd
c98e7d6
b9b5962
d65aef7
b6b702d
7a688f2
c497347
75b2517
8374809
6ff665b
fbd31d5
c206d9d
fdef232
22b486f
d805aeb
7a7b0f0
1aece15
6974562
a937ec6
89adc0e
d31d47d
b9cd81a
49207fa
1531ae6
84109f4
613debd
901c5d2
74edb81
074db0e
5fad043
3921e6c
88f6a39
62fc0d0
1879256
48fcd61
17c1c87
8b54915
b5d6c03
b824954
4c6d716
a75a0be
6f13aba
f3cd52d
6c28744
2ec4f6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
name: containerd-async-11mins | ||
on: | ||
push: | ||
branches: [main] | ||
pull_request: | ||
branches: [main] | ||
workflow_dispatch: | ||
jobs: | ||
integration: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v4 | ||
- name: Start a kind cluster with containerd | ||
uses: helm/[email protected] | ||
with: | ||
cluster_name: kind-${{ github.run_id }} | ||
kubectl_version: "v1.25.2" | ||
config: ./hack/ci/containerd-cluster-conf.yaml | ||
- name: Install private registry | ||
run: ./hack/ci/setup_private_registry.sh | ||
- name: Build image | ||
run: ./hack/ci/build.sh | ||
- name: Set image version | ||
run: | | ||
echo "VALUE_FILE=charts/warm-metal-csi-driver/values.yaml" >> "$GITHUB_ENV" | ||
echo "IMAGE_TAG=$(git rev-parse --short HEAD)" >> "$GITHUB_ENV" | ||
echo "HELM_NAME=wm-csi-integration-tests" >> "$GITHUB_ENV" | ||
- name: Install the CSI Driver | ||
run: | | ||
helm install ${HELM_NAME} charts/warm-metal-csi-driver -n kube-system \ | ||
-f ${VALUE_FILE} \ | ||
--set csiPlugin.image.tag=${IMAGE_TAG} \ | ||
--set enableAsyncPull=true \ | ||
--wait \ | ||
--debug | ||
- name: Run integration Tests | ||
run: ./hack/ci/test.sh | ||
- name: Uninstall the CSI Driver | ||
run: helm uninstall -n kube-system ${HELM_NAME} --wait |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,14 @@ import ( | |
"context" | ||
"os" | ||
"strings" | ||
"time" | ||
|
||
"github.com/container-storage-interface/spec/lib/go/csi" | ||
"github.com/containerd/containerd/reference/docker" | ||
"github.com/google/uuid" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/backend" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/metrics" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/mountexecutor" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/mountstatus" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/pullexecutor" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimage" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimageasync" | ||
"github.com/warm-metal/container-image-csi-driver/pkg/secret" | ||
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common" | ||
"google.golang.org/grpc/codes" | ||
|
@@ -31,36 +30,36 @@ const ( | |
|
||
type ImagePullStatus int | ||
|
||
func NewNodeServer(driver *csicommon.CSIDriver, mounter backend.Mounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullMount bool) *NodeServer { | ||
return &NodeServer{ | ||
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), | ||
mounter: mounter, | ||
secretStore: secretStore, | ||
asyncImagePullMount: asyncImagePullMount, | ||
mountExecutor: mountexecutor.NewMountExecutor(&mountexecutor.MountExecutorOptions{ | ||
AsyncMount: asyncImagePullMount, | ||
Mounter: mounter, | ||
}), | ||
pullExecutor: pullexecutor.NewPullExecutor(&pullexecutor.PullExecutorOptions{ | ||
AsyncPull: asyncImagePullMount, | ||
ImageServiceClient: imageSvc, | ||
SecretStore: secretStore, | ||
Mounter: mounter, | ||
}), | ||
} | ||
func NewNodeServer(driver *csicommon.CSIDriver, mounter backend.Mounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullTimeout time.Duration) *NodeServer { | ||
ns := NodeServer{ | ||
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver), | ||
mounter: mounter, | ||
imageSvc: imageSvc, | ||
secretStore: secretStore, | ||
asyncImagePullTimeout: asyncImagePullTimeout, | ||
asyncImagePuller: nil, | ||
} | ||
if asyncImagePullTimeout >= time.Duration(30*time.Second) { | ||
klog.Infof("Starting node server in Async mode with %v timeout", asyncImagePullTimeout) | ||
ns.asyncImagePuller = remoteimageasync.StartAsyncPuller(context.TODO(), 100) | ||
} else { | ||
klog.Info("Starting node server in Sync mode") | ||
ns.asyncImagePullTimeout = 0 // set to default value | ||
mugdha-adhav marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return &ns | ||
} | ||
|
||
type NodeServer struct { | ||
*csicommon.DefaultNodeServer | ||
mounter backend.Mounter | ||
secretStore secret.Store | ||
asyncImagePullMount bool | ||
mountExecutor *mountexecutor.MountExecutor | ||
pullExecutor *pullexecutor.PullExecutor | ||
mounter backend.Mounter | ||
imageSvc cri.ImageServiceClient | ||
secretStore secret.Store | ||
asyncImagePullTimeout time.Duration | ||
asyncImagePuller remoteimageasync.AsyncPuller | ||
} | ||
|
||
func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) { | ||
valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"], "request-id", uuid.NewString()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we removing the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the only place that it was used after this PR. it didn't relate to anything else and wasn't stored for association with future log messages, etc. didn't add any value in current form. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was added so that we could track the log messages belonging to a particular The NodePublishVolume function call is supposed to be idempotent and the CO (container orchestrator) may call it several times. From CSI spec docs -
|
||
valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"]) | ||
valuesLogger.Info("Incoming NodePublishVolume request", "request string", req.String()) | ||
if len(req.VolumeId) == 0 { | ||
err = status.Error(codes.InvalidArgument, "VolumeId is missing") | ||
|
@@ -122,56 +121,59 @@ func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishV | |
image = req.VolumeContext[ctxKeyImage] | ||
} | ||
|
||
namedRef, err := docker.ParseDockerRef(image) | ||
if err != nil { | ||
klog.Errorf("unable to normalize image %q: %s", image, err) | ||
return | ||
} | ||
|
||
pullAlways := strings.ToLower(req.VolumeContext[ctxKeyPullAlways]) == "true" | ||
|
||
po := &pullexecutor.PullOptions{ | ||
Context: ctx, | ||
NamedRef: namedRef, | ||
PullAlways: pullAlways, | ||
Image: image, | ||
PullSecrets: req.Secrets, | ||
Logger: valuesLogger, | ||
} | ||
|
||
if e := n.pullExecutor.StartPulling(po); e != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, e) | ||
keyring, err := n.secretStore.GetDockerKeyring(ctx, req.Secrets) | ||
if err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to fetch keyring: %s", err) | ||
mugdha-adhav marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
|
||
if e := n.pullExecutor.WaitForPull(po); e != nil { | ||
err = status.Errorf(codes.DeadlineExceeded, e.Error()) | ||
namedRef, err := docker.ParseDockerRef(image) | ||
if err != nil { | ||
klog.Errorf("unable to normalize image %q: %s", image, err) | ||
return | ||
} | ||
|
||
if mountstatus.Get(req.VolumeId) == mountstatus.Mounted { | ||
return &csi.NodePublishVolumeResponse{}, nil | ||
} | ||
|
||
o := &mountexecutor.MountOptions{ | ||
Context: ctx, | ||
NamedRef: namedRef, | ||
VolumeId: req.VolumeId, | ||
TargetPath: req.TargetPath, | ||
VolumeCapability: req.VolumeCapability, | ||
ReadOnly: req.Readonly, | ||
Logger: valuesLogger, | ||
//NOTE: we are relying on n.mounter.ImageExists() to return false when | ||
// a first-time pull is in progress, else this logic may not be | ||
// correct. should test this. | ||
if pullAlways || !n.mounter.ImageExists(ctx, namedRef) { | ||
klog.Errorf("pull image %q", image) | ||
puller := remoteimage.NewPuller(n.imageSvc, namedRef, keyring) | ||
|
||
if n.asyncImagePuller != nil { | ||
var session *remoteimageasync.PullSession | ||
session, err = n.asyncImagePuller.StartPull(image, puller, n.asyncImagePullTimeout) | ||
if err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) | ||
metrics.OperationErrorsCount.WithLabelValues("pull-async-start").Inc() | ||
return | ||
} | ||
if err = n.asyncImagePuller.WaitForPull(session, ctx); err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) | ||
metrics.OperationErrorsCount.WithLabelValues("pull-async-wait").Inc() | ||
return | ||
} | ||
} else { | ||
if err = puller.Pull(ctx); err != nil { | ||
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err) | ||
metrics.OperationErrorsCount.WithLabelValues("pull-sync-call").Inc() | ||
return | ||
} | ||
} | ||
} | ||
|
||
if e := n.mountExecutor.StartMounting(o); e != nil { | ||
err = status.Error(codes.Internal, e.Error()) | ||
ro := req.Readonly || | ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY || | ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY | ||
if err = n.mounter.Mount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath), namedRef, ro); err != nil { | ||
err = status.Error(codes.Internal, err.Error()) | ||
metrics.OperationErrorsCount.WithLabelValues("mount").Inc() | ||
return | ||
} | ||
|
||
if e := n.mountExecutor.WaitForMount(o); e != nil { | ||
err = status.Errorf(codes.DeadlineExceeded, e.Error()) | ||
return | ||
} | ||
valuesLogger.Info("Successfully completed NodePublishVolume request", "request string", req.String()) | ||
|
||
return &csi.NodePublishVolumeResponse{}, nil | ||
} | ||
|
@@ -194,17 +196,11 @@ func (n NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl | |
} | ||
|
||
if err = n.mounter.Unmount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath)); err != nil { | ||
// TODO(vadasambar): move this to mountexecutor once mountexecutor has `StartUnmounting` function | ||
metrics.OperationErrorsCount.WithLabelValues("StartUnmounting").Inc() | ||
metrics.OperationErrorsCount.WithLabelValues("unmount").Inc() | ||
err = status.Error(codes.Internal, err.Error()) | ||
return | ||
} | ||
|
||
// Clear the mountstatus since the volume has been unmounted | ||
// Not doing this will make mount not work properly if the same volume is | ||
// attempted to mount twice | ||
mountstatus.Delete(req.VolumeId) | ||
|
||
return &csi.NodeUnpublishVolumeResponse{}, nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍