diff --git a/cmd/cdi-importer/importer.go b/cmd/cdi-importer/importer.go index d0c3a868e9..e1d768371a 100644 --- a/cmd/cdi-importer/importer.go +++ b/cmd/cdi-importer/importer.go @@ -44,6 +44,7 @@ func main() { sec, _ := util.ParseEnvVar(common.ImporterSecretKey, false) source, _ := util.ParseEnvVar(common.ImporterSource, false) contentType, _ := util.ParseEnvVar(common.ImporterContentType, false) + imageSize, _ := util.ParseEnvVar(common.ImporterImageSize, false) glog.V(1).Infoln("begin import process") dso := &importer.DataStreamOptions{ @@ -53,6 +54,7 @@ func main() { sec, source, contentType, + imageSize, } err = importer.CopyImage(dso) if err != nil { diff --git a/pkg/common/common.go b/pkg/common/common.go index 6e97ff56e1..7d4efdf9cf 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -51,6 +51,8 @@ const ( ImporterAccessKeyID = "IMPORTER_ACCESS_KEY_ID" // ImporterSecretKey provides a constant to capture our env variable "IMPORTER_SECRET_KEY" ImporterSecretKey = "IMPORTER_SECRET_KEY" + // ImporterImageSize provides a constant to capture our env variable "IMPORTER_IMAGE_SIZE" + ImporterImageSize = "IMPORTER_IMAGE_SIZE" // OwnerUID provides the UID of the owner entity (either PVC or DV) OwnerUID = "OWNER_UID" diff --git a/pkg/controller/import-controller.go b/pkg/controller/import-controller.go index a4a133163d..391d1f5881 100644 --- a/pkg/controller/import-controller.go +++ b/pkg/controller/import-controller.go @@ -37,7 +37,7 @@ type ImportController struct { } type importPodEnvVar struct { - ep, secretName, source, contentType string + ep, secretName, source, contentType, imageSize string } // NewImportController sets up an Import Controller, and returns a pointer to @@ -119,6 +119,10 @@ func (ic *ImportController) processPvcItem(pvc *v1.PersistentVolumeClaim) error if podEnvVar.secretName == "" { glog.V(2).Infof("no secret will be supplied to endpoint %q\n", podEnvVar.ep) } + podEnvVar.imageSize, err = getImageSize(pvc) + if err != nil { + return err + } } // all checks passed, let's create the importer pod! ic.expectPodCreate(pvcKey) diff --git a/pkg/controller/import_controller_ginkgo_test.go b/pkg/controller/import_controller_ginkgo_test.go index c3d5a525e5..32732fa37e 100644 --- a/pkg/controller/import_controller_ginkgo_test.go +++ b/pkg/controller/import_controller_ginkgo_test.go @@ -7,6 +7,7 @@ import ( . "github.com/onsi/gomega" "github.com/pkg/errors" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -233,6 +234,13 @@ func createInMemPVC(ns, name string, annotations map[string]string) *v1.Persiste Annotations: annotations, UID: "1234", }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("1G"), + }, + }, + }, } } diff --git a/pkg/controller/util.go b/pkg/controller/util.go index 1e00839a73..10a425d82b 100644 --- a/pkg/controller/util.go +++ b/pkg/controller/util.go @@ -80,6 +80,16 @@ func getEndpoint(pvc *v1.PersistentVolumeClaim) (string, error) { return ep, nil } +func getImageSize(pvc *v1.PersistentVolumeClaim) (string, error) { + // Per the discussion on mailing list, we are going to assume that when we request a particular capacity from a PVC + // that we will have that capacity available. If the storage provider is broken, then we will simply fail when resizing. + pvcSize, found := pvc.Spec.Resources.Requests[v1.ResourceStorage] + if !found { + return "", errors.Errorf("storage request is missing in pvc \"%s/%s\"", pvc.Namespace, pvc.Name) + } + return pvcSize.String(), nil +} + // returns the source string which determines the type of source. If no source or invalid source found, default to http func getSource(pvc *v1.PersistentVolumeClaim) string { source, found := pvc.Annotations[AnnSource] @@ -329,6 +339,10 @@ func makeEnv(podEnvVar importPodEnvVar, uid types.UID) []v1.EnvVar { Name: common.ImporterContentType, Value: podEnvVar.contentType, }, + { + Name: common.ImporterImageSize, + Value: podEnvVar.imageSize, + }, { Name: common.OwnerUID, Value: string(uid), diff --git a/pkg/controller/util_test.go b/pkg/controller/util_test.go index caf4e42b30..4e8de15b08 100644 --- a/pkg/controller/util_test.go +++ b/pkg/controller/util_test.go @@ -448,6 +448,43 @@ func Test_getContentType(t *testing.T) { } } +func Test_getImageSize(t *testing.T) { + type args struct { + pvc *v1.PersistentVolumeClaim + } + + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "expected to get size 1G", + args: args{createPvc("testPVC", "default", nil, nil)}, + want: "1G", + wantErr: false, + }, + { + name: "expected to get error, because of missing size", + args: args{createPvcNoSize("testPVC", "default", nil, nil)}, + want: "", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getImageSize(tt.args.pvc) + if err != nil && !tt.wantErr { + t.Errorf("Error retrieving adjusted image size, when not expecting error: %s", err.Error()) + } + if got != tt.want { + t.Errorf("getSource() = %v, want %v", got, tt.want) + } + }) + } +} + func Test_getSecretName(t *testing.T) { type args struct { client kubernetes.Interface @@ -717,8 +754,8 @@ func TestCreateImporterPod(t *testing.T) { }{ { name: "expect pod to be created", - args: args{k8sfake.NewSimpleClientset(pvc), "test/image", "-v=5", "Always", importPodEnvVar{"", "", "", ""}, pvc}, - want: MakeImporterPodSpec("test/image", "-v=5", "Always", importPodEnvVar{"", "", "", ""}, pvc), + args: args{k8sfake.NewSimpleClientset(pvc), "test/image", "-v=5", "Always", importPodEnvVar{"", "", "", "", "1G"}, pvc}, + want: MakeImporterPodSpec("test/image", "-v=5", "Always", importPodEnvVar{"", "", "", "", "1G"}, pvc), wantErr: false, }, } @@ -756,7 +793,7 @@ func TestMakeImporterPodSpec(t *testing.T) { }{ { name: "expect pod to be created", - args: args{"test/myimage", "5", "Always", importPodEnvVar{"", "", SourceHTTP, ContentTypeKubevirt}, pvc}, + args: args{"test/myimage", "5", "Always", importPodEnvVar{"", "", SourceHTTP, ContentTypeKubevirt, "1G"}, pvc}, wantPod: pod, }, } @@ -786,8 +823,8 @@ func Test_makeEnv(t *testing.T) { }{ { name: "env should match", - args: args{importPodEnvVar{"myendpoint", "mysecret", SourceHTTP, ContentTypeKubevirt}}, - want: createEnv(importPodEnvVar{"myendpoint", "mysecret", SourceHTTP, ContentTypeKubevirt}, mockUID), + args: args{importPodEnvVar{"myendpoint", "mysecret", SourceHTTP, ContentTypeKubevirt, "1G"}}, + want: createEnv(importPodEnvVar{"myendpoint", "mysecret", SourceHTTP, ContentTypeKubevirt, "1G"}, mockUID), }, } for _, tt := range tests { @@ -908,6 +945,7 @@ func createPod(pvc *v1.PersistentVolumeClaim, dvname string) *v1.Pod { ep, _ := getEndpoint(pvc) source := getSource(pvc) contentType := getContentType(pvc) + imageSize, _ := getImageSize(pvc) pod.Spec.Containers[0].Env = []v1.EnvVar{ { Name: ImporterSource, @@ -921,6 +959,10 @@ func createPod(pvc *v1.PersistentVolumeClaim, dvname string) *v1.Pod { Name: ImporterContentType, Value: contentType, }, + { + Name: ImporterImageSize, + Value: imageSize, + }, { Name: OwnerUID, Value: string(pvc.UID), @@ -948,6 +990,17 @@ func createPvc(name, ns string, annotations, labels map[string]string) *v1.Persi } } +func createPvcNoSize(name, ns string, annotations, labels map[string]string) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Annotations: annotations, + Labels: labels, + }, + } +} + func createClonePvc(name, ns string, annotations, labels map[string]string) *v1.PersistentVolumeClaim { return &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ @@ -998,6 +1051,10 @@ func createEnv(podEnvVar importPodEnvVar, uid string) []v1.EnvVar { Name: ImporterContentType, Value: podEnvVar.contentType, }, + { + Name: ImporterImageSize, + Value: podEnvVar.imageSize, + }, { Name: OwnerUID, Value: string(uid), diff --git a/pkg/image/qemu.go b/pkg/image/qemu.go index 4397257366..87fa830044 100644 --- a/pkg/image/qemu.go +++ b/pkg/image/qemu.go @@ -23,6 +23,7 @@ import ( "os" "regexp" "strconv" + "strings" "github.com/golang/glog" "github.com/pkg/errors" @@ -41,10 +42,22 @@ const ( matcherString = "\\((\\d?\\d\\.\\d\\d)\\/100%\\)" ) +// ImgInfo contains the virtual image information. +type ImgInfo struct { + // Format contains the format of the image + Format string `json:"format"` + // BackingFile is the file name of the backing file + BackingFile string `json:"backing-filename"` + // VirtualSize is the virtual size of the image + VirtualSize int64 `json:"virtual-size"` +} + // QEMUOperations defines the interface for executing qemu subprocesses type QEMUOperations interface { ConvertQcow2ToRaw(string, string) error ConvertQcow2ToRawStream(*url.URL, string) error + Resize(string, string) error + Info(string) (*ImgInfo, error) Validate(string, string) error } @@ -98,6 +111,32 @@ func (o *qemuOperations) ConvertQcow2ToRawStream(url *url.URL, dest string) erro return nil } +func (o *qemuOperations) Resize(image, size string) error { + // size from k8s contains an upper case K, so we need to lower case it before we can pass it to qemu. + // Below is the message from qemu that explains what it expects. + // qemu-img: Parameter 'size' expects a non-negative number below 2^64Optional suffix k, M, G, T, P or E means kilo-, mega-, giga-, tera-, peta-and exabytes, respectively. + size = strings.Replace(size, "K", "k", -1) + _, err := qemuExecFunction(qemuLimits, nil, "qemu-img", "resize", "-f", "raw", image, size) + if err != nil { + return errors.Wrapf(err, "Error resizing image %s", image) + } + return nil +} + +func (o *qemuOperations) Info(image string) (*ImgInfo, error) { + output, err := qemuExecFunction(qemuLimits, nil, "qemu-img", "info", "--output=json", image) + if err != nil { + return nil, errors.Wrapf(err, "Error getting info on image %s", image) + } + var info ImgInfo + err = json.Unmarshal(output, &info) + if err != nil { + glog.Errorf("Invalid JSON:\n%s\n", string(output)) + return nil, errors.Wrapf(err, "Invalid json for image %s", image) + } + return &info, nil +} + func (o *qemuOperations) Validate(image, format string) error { type imageInfo struct { Format string `json:"format"` diff --git a/pkg/importer/dataStream.go b/pkg/importer/dataStream.go index 931316915b..0c2999754f 100644 --- a/pkg/importer/dataStream.go +++ b/pkg/importer/dataStream.go @@ -34,6 +34,8 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/api/resource" + "github.com/golang/glog" "github.com/minio/minio-go" "github.com/pkg/errors" @@ -81,11 +83,7 @@ type DataStreamOptions struct { Dest string // Endpoint is the endpoint to get the data from for various Sources. Endpoint string -<<<<<<< HEAD // AccessKey is the access key for the endpoint, can be blank. This needs to be a base64 encoded string. -======= - // AccessKey is the access key for the endpoint, can be blank. ->>>>>>> Add source and contentType annotations AccessKey string // SecKey is the security key needed for the endpoint, can be blank. SecKey string @@ -93,6 +91,8 @@ type DataStreamOptions struct { Source string // ContentType is the content type of the data. ContentType string + // ImageSize is the size we want the resulting image to be. + ImageSize string } const ( @@ -132,6 +132,7 @@ func newDataStreamFromStream(stream io.ReadCloser) (*DataStream, error) { "", controller.SourceHTTP, controller.ContentTypeKubevirt, + "", // Blank means don't resize }, stream) } @@ -311,6 +312,22 @@ func SaveStream(stream io.ReadCloser, dest string) (int64, error) { return ds.Size, nil } +// ResizeImage resizes the images to match the requested size. +func ResizeImage(dest, imageSize string) error { + info, err := qemuOperations.Info(dest) + if err != nil { + return err + } + currentImageSizeQuantity := resource.NewQuantity(info.VirtualSize, resource.BinarySI) + newImageSizeQuantity := resource.MustParse(imageSize) + if currentImageSizeQuantity.Cmp(newImageSizeQuantity) == 0 { + glog.V(1).Infof("No need to resize image. Requested size: %s, Image size: %d.\n", imageSize, info.VirtualSize) + return nil + } + glog.V(1).Infof("Expanding image size to: %s\n", imageSize) + return qemuOperations.Resize(dest, imageSize) +} + // Read the endpoint and determine the file composition (eg. .iso.tar.gz) based on the magic number in // each known file format header. Set the Reader slice in the receiver and set the Size field to each // reader's original size. Note: if, when this method returns, the Size is still 0 then another method @@ -627,11 +644,11 @@ func (d *DataStream) copy(dest string) error { return nil } - return copy(d.topReader(), dest, d.qemu) + return copy(d.topReader(), dest, d.qemu, d.ImageSize) } // Copy the file using its Reader (r) to the passed-in destination (`out`). -func copy(r io.Reader, out string, qemu bool) error { +func copy(r io.Reader, out string, qemu bool, imageSize string) error { out = filepath.Clean(out) glog.V(2).Infof("copying image file to %q", out) dest := out @@ -659,6 +676,12 @@ func copy(r io.Reader, out string, qemu bool) error { if err != nil { return errors.Wrap(err, "Local qcow to raw conversion failed") } + if imageSize != "" { + err = qemuOperations.Resize(dest, imageSize) + } + if err != nil { + return errors.Wrap(err, "Resize of image failed") + } } return nil } diff --git a/pkg/importer/dataStream_test.go b/pkg/importer/dataStream_test.go index 032dd7c915..c0fe38fa80 100644 --- a/pkg/importer/dataStream_test.go +++ b/pkg/importer/dataStream_test.go @@ -47,6 +47,8 @@ type fakeQEMUOperations struct { e1 error e2 error e3 error + e4 error + e5 error } // EndlessReader doesn't return any value read, te r @@ -78,7 +80,8 @@ var _ = Describe("Data Stream", func() { accessKeyID, secretKey, controller.SourceHTTP, - controller.ContentTypeKubevirt}) + controller.ContentTypeKubevirt, + "1G"}) if ds != nil && len(ds.Readers) > 0 { defer ds.Close() } @@ -109,7 +112,8 @@ var _ = Describe("Data Stream", func() { "", "", controller.SourceHTTP, - controller.ContentTypeKubevirt}) + controller.ContentTypeKubevirt, + "1G"}) Expect(err).NotTo(HaveOccurred()) By("Closing data stream") err = ds.Close() @@ -127,7 +131,8 @@ var _ = Describe("Data Stream", func() { "", "", controller.SourceHTTP, - controller.ContentTypeKubevirt}) + controller.ContentTypeKubevirt, + "1G"}) if ds != nil && len(ds.Readers) > 0 { defer ds.Close() } @@ -154,7 +159,8 @@ var _ = Describe("Data Stream", func() { "", "", controller.SourceHTTP, - controller.ContentTypeKubevirt}) + controller.ContentTypeKubevirt, + "1G"}) defer func() { tempTestServer.Close() }() @@ -184,7 +190,7 @@ var _ = Describe("SaveStream", func() { It("Should successfully save the stream", func() { defer os.Remove("testqcow2file") - replaceQEMUOperations(NewFakeQEMUOperations(nil, errors.New("Shouldn't get this"), nil), func() { + replaceQEMUOperations(NewFakeQEMUOperations(nil, errors.New("Shouldn't get this"), nil, nil, nil), func() { rdr, err := os.Open(filepath.Join(imageDir, "cirros-qcow2.img")) Expect(err).NotTo(HaveOccurred()) defer rdr.Close() @@ -221,7 +227,8 @@ var _ = Describe("Copy", func() { "", "", controller.SourceHTTP, - controller.ContentTypeKubevirt}) + controller.ContentTypeKubevirt, + "1G"}) if !wantErr { Expect(err).NotTo(HaveOccurred()) } else { @@ -231,16 +238,16 @@ var _ = Describe("Copy", func() { }, table.Entry("successfully copy local image", "tinyCore.raw", "tinyCore.iso", NewQEMUAllErrors(), false), table.Entry("expect failure trying to copy non-existing local image", "cdi-testcopy", "tinyCoreBad.iso", NewQEMUAllErrors(), true), - table.Entry("successfully copy streaming image", "cirros-qcow2.raw", "cirros-qcow2.img", NewFakeQEMUOperations(errors.New("should not be called"), nil, nil), false), - table.Entry("streaming image qemu validation fails", "cirros-qcow2.raw", "cirros-qcow2.img", NewFakeQEMUOperations(nil, nil, errors.New("invalid image")), true), - table.Entry("streaming image qemu convert fails", "cirros-qcow2.raw", "cirros-qcow2.img", NewFakeQEMUOperations(nil, errors.New("exit 1"), nil), true), + table.Entry("successfully copy streaming image", "cirros-qcow2.raw", "cirros-qcow2.img", NewFakeQEMUOperations(errors.New("should not be called"), nil, nil, nil, nil), false), + table.Entry("streaming image qemu validation fails", "cirros-qcow2.raw", "cirros-qcow2.img", NewFakeQEMUOperations(nil, nil, nil, nil, errors.New("invalid image")), true), + table.Entry("streaming image qemu convert fails", "cirros-qcow2.raw", "cirros-qcow2.img", NewFakeQEMUOperations(nil, errors.New("exit 1"), nil, nil, nil), true), ) table.DescribeTable("internal copy", func(r io.Reader, out string, qemu bool, qemuOperations image.QEMUOperations, wantErr bool) { defer os.Remove(out) By("Replacing QEMU Operations") replaceQEMUOperations(qemuOperations, func() { - err := copy(r, out, qemu) + err := copy(r, out, qemu, "") if !wantErr { Expect(err).NotTo(HaveOccurred()) } else { @@ -248,10 +255,10 @@ var _ = Describe("Copy", func() { } }) }, - table.Entry("successfully copy reader", stringRdr, "testoutfile", false, NewFakeQEMUOperations(nil, errors.New("Shouldn't get this"), nil), false), - table.Entry("successfully copy qcow2 reader", testFileRdrs, "testqcow2file", true, NewFakeQEMUOperations(nil, errors.New("Shouldn't get this"), nil), false), - table.Entry("expect error trying to copy invalid format", testFileRdrs, "testinvalidfile", true, NewFakeQEMUOperations(nil, nil, errors.New("invalid format")), true), - table.Entry("expect error trying to copy qemu process fails", testFileRdrs, "testinvalidfile", true, NewFakeQEMUOperations(errors.New("exit 1"), nil, nil), true), + table.Entry("successfully copy reader", stringRdr, "testoutfile", false, NewFakeQEMUOperations(nil, errors.New("Shouldn't get this"), nil, nil, nil), false), + table.Entry("successfully copy qcow2 reader", testFileRdrs, "testqcow2file", true, NewFakeQEMUOperations(nil, errors.New("Shouldn't get this"), nil, nil, nil), false), + table.Entry("expect error trying to copy invalid format", testFileRdrs, "testinvalidfile", true, NewFakeQEMUOperations(nil, nil, nil, nil, errors.New("invalid format")), true), + table.Entry("expect error trying to copy qemu process fails", testFileRdrs, "testinvalidfile", true, NewFakeQEMUOperations(errors.New("exit 1"), nil, nil, nil, nil), true), ) }) @@ -363,7 +370,8 @@ var _ = Describe("Streaming Data Conversion", func() { "", "", controller.SourceHTTP, - controller.ContentTypeKubevirt}) + controller.ContentTypeKubevirt, + "1G"}) Expect(err).NotTo(HaveOccurred()) By(fmt.Sprintf("Checking size of the output file %q", testTarget)) @@ -396,7 +404,8 @@ var _ = Describe("Streaming Data Conversion", func() { "", "", controller.SourceHTTP, - controller.ContentTypeKubevirt}) + controller.ContentTypeKubevirt, + "1G"}) Expect(err).NotTo(HaveOccurred()) defer ds.Close() @@ -474,11 +483,11 @@ func replaceQEMUOperations(replacement image.QEMUOperations, f func()) { func NewQEMUAllErrors() image.QEMUOperations { err := errors.New("qemu should not be called from this test override with replaceQEMUOperations") - return NewFakeQEMUOperations(err, err, err) + return NewFakeQEMUOperations(err, err, err, err, err) } -func NewFakeQEMUOperations(e1, e2, e3 error) image.QEMUOperations { - return &fakeQEMUOperations{e1, e2, e3} +func NewFakeQEMUOperations(e1, e2, e3, e4, e5 error) image.QEMUOperations { + return &fakeQEMUOperations{e1, e2, e3, e4, e5} } func (o *fakeQEMUOperations) ConvertQcow2ToRaw(string, string) error { @@ -490,9 +499,17 @@ func (o *fakeQEMUOperations) ConvertQcow2ToRawStream(*url.URL, string) error { } func (o *fakeQEMUOperations) Validate(string, string) error { + return o.e5 +} + +func (o *fakeQEMUOperations) Resize(string, string) error { return o.e3 } +func (o *fakeQEMUOperations) Info(string) (*image.ImgInfo, error) { + return nil, o.e4 +} + func createTestData() map[string]string { xzfile, _ := utils.FormatTestData(filepath.Join(imageDir, "tinyCore.iso"), os.TempDir(), image.ExtXz) gzfile, _ := utils.FormatTestData(filepath.Join(imageDir, "tinyCore.iso"), os.TempDir(), image.ExtGz)