From 780deff31676e4bc397a1aea6121745ac862f23c Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Thu, 21 Nov 2024 15:40:49 +0100 Subject: [PATCH 01/13] chunked reader --- .../repositories/ocireg/repository.go | 1 + api/utils/accessio/chunkedreader.go | 105 ++++++++++++++ api/utils/accessio/chunkedreader_test.go | 133 ++++++++++++++++++ api/utils/blobaccess/chunked/access.go | 92 ++++++++++++ 4 files changed, 331 insertions(+) create mode 100644 api/utils/accessio/chunkedreader.go create mode 100644 api/utils/accessio/chunkedreader_test.go create mode 100644 api/utils/blobaccess/chunked/access.go diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go index bff1e51078..7b10b46a2f 100644 --- a/api/oci/extensions/repositories/ocireg/repository.go +++ b/api/oci/extensions/repositories/ocireg/repository.go @@ -69,6 +69,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo) spec: spec, info: info, } + i.logger.Debug("created repository") return cpi.NewRepository(i), nil } diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go new file mode 100644 index 0000000000..40ef814e75 --- /dev/null +++ b/api/utils/accessio/chunkedreader.go @@ -0,0 +1,105 @@ +package accessio + +import ( + "bytes" + "io" + "sync" + + "github.com/mandelsoft/goutils/general" +) + +type ChunkedReader struct { + lock sync.Mutex + reader io.Reader + buffer *bytes.Buffer + size uint64 + chunk uint64 + read uint64 + err error + + preread uint +} + +var _ io.Reader = (*ChunkedReader)(nil) + +func NewChunkedReader(r io.Reader, chunk uint64, preread ...uint) *ChunkedReader { + return &ChunkedReader{ + reader: r, + size: chunk, + preread: general.OptionalDefaulted(8096, preread...), + } +} + +func (c *ChunkedReader) Read(p []byte) (n int, err error) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.read == c.size { + return 0, io.EOF + } + if c.read+uint64(len(p)) > c.size { + p = p[:c.size-c.read] // read at most rest of chunk size + } + if c.buffer != nil && c.buffer.Len() > 0 { + // first, consume from buffer + n, _ := c.buffer.Read(p) + c.read += uint64(n) + if c.buffer.Len() == 0 { + c.buffer = nil + } + return c.report(n, nil) + } else { + c.buffer = nil + } + + if c.err != nil { + return 0, c.err + } + n, err = c.reader.Read(p) + c.read += uint64(n) + return c.report(n, err) +} + +func (c *ChunkedReader) report(n int, err error) (int, error) { + if err == nil && c.read >= c.size { + err = io.EOF + } + return n, err +} + +func (c *ChunkedReader) ChunkDone() bool { + c.lock.Lock() + defer c.lock.Unlock() + + return c.read >= c.size +} + +func (c *ChunkedReader) Next() bool { + c.lock.Lock() + defer c.lock.Unlock() + + if c.read < c.size || c.err != nil { + return false + } + + if c.buffer == nil { + // cannot assume that read with size 0 returns EOF as proposed + // by io.Reader.Read (see bytes.Buffer.Read). + // Therefore, we really have to read something. + + var buf = make([]byte, c.preread, c.preread) + n, err := c.reader.Read(buf) + c.err = err + if n > 0 { + c.buffer = bytes.NewBuffer(buf[:n]) + } else { + if err == io.EOF { + return false + } + } + } + + c.read = 0 + c.chunk++ + return true +} diff --git a/api/utils/accessio/chunkedreader_test.go b/api/utils/accessio/chunkedreader_test.go new file mode 100644 index 0000000000..68a3f46eae --- /dev/null +++ b/api/utils/accessio/chunkedreader_test.go @@ -0,0 +1,133 @@ +package accessio_test + +import ( + "bytes" + "io" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/utils/accessio" +) + +var _ = FDescribe("Test Environment", func() { + in := "12345678901234567890" + var buf *bytes.Buffer + var chunked *accessio.ChunkedReader + + BeforeEach(func() { + buf = bytes.NewBuffer([]byte(in)) + }) + + Context("complete", func() { + BeforeEach(func() { + chunked = accessio.NewChunkedReader(buf, 100, 2) + }) + + It("reports EOF", func() { + var buf [30]byte + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(20)) + Expect(err).To(BeNil()) + Expect(string(buf[:n])).To(Equal(in)) + Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.Next()).To(Equal(false)) + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.Next()).To(Equal(false)) + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + }) + + It("reports EOF with matched size", func() { + var buf [20]byte + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(20)) + Expect(err).To(BeNil()) + Expect(string(buf[:n])).To(Equal(in)) + Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.Next()).To(Equal(false)) + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.Next()).To(Equal(false)) + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + }) + }) + + Context("chunk size matches read size", func() { + BeforeEach(func() { + chunked = accessio.NewChunkedReader(buf, 20, 2) + }) + + It("reports EOF with matched size", func() { + var buf [20]byte + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(20)) + Expect(err).To(Equal(io.EOF)) + Expect(string(buf[:n])).To(Equal(in)) + Expect(chunked.ChunkDone()).To(Equal(true)) + Expect(chunked.Next()).To(Equal(false)) + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + }) + }) + + Context("split", func() { + BeforeEach(func() { + chunked = accessio.NewChunkedReader(buf, 5, 2) + }) + + It("reports EOF and splits reader", func() { + var buf [30]byte + cnt := 0 + + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(5)) + Expect(err).To(Equal(io.EOF)) + Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + Expect(chunked.ChunkDone()).To(Equal(true)) + cnt += n + Expect(chunked.Next()).To(Equal(true)) + + for i := 0; i < 3; i++ { + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(2)) + Expect(err).To(BeNil()) + Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + Expect(chunked.ChunkDone()).To(Equal(false)) + cnt += n + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(3)) + Expect(err).To(Equal(io.EOF)) + Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + Expect(chunked.ChunkDone()).To(Equal(true)) + cnt += n + + Expect(chunked.Next()).To(Equal(i != 2)) + } + Expect(chunked.Next()).To(Equal(false)) + }) + }) +}) + +func check(chunked *accessio.ChunkedReader, n int, buf []byte, exp int, data string, done, next bool) { + ExpectWithOffset(1, n).To(Equal(exp)) + ExpectWithOffset(1, string(buf[:n])).To(Equal(data)) + + ExpectWithOffset(1, chunked.ChunkDone()).To(Equal(done)) + ExpectWithOffset(1, chunked.Next()).To(Equal(next)) + +} diff --git a/api/utils/blobaccess/chunked/access.go b/api/utils/blobaccess/chunked/access.go new file mode 100644 index 0000000000..3b67cadffa --- /dev/null +++ b/api/utils/blobaccess/chunked/access.go @@ -0,0 +1,92 @@ +package chunked + +import ( + "io" + "sync" + + "github.com/opencontainers/go-digest" + "ocm.software/ocm/api/utils" + "ocm.software/ocm/api/utils/blobaccess/bpi" +) + +type Chunked interface { + bpi.BlobAccess + + Next() bool +} + +type chunked struct { + lock sync.Mutex + base bpi.BlobAccess + blobsize uint64 + chunksize uint64 + preread uint + + reader io.Reader +} + +var _ bpi.BlobAccessBase = (*chunked)(nil) + +func New(acc bpi.BlobAccess, chunk uint64, preread...uint) (Chunked, error) { + b, err := acc.Dup() + if err != nil { + return nil, err + } + + s := acc.Size() + + return bpi.NewBlobAccessForBase(&chunked{base: b, blobsize: size, chunksize: chunk, preread: utils.OptionalDefaulted(8096, preread...)}), nil +} + +type view struct { + bpi.BlobAccess +} + +func (v *view) Dup() bpi.BlobAccess { + +} +func (c *chunked) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.base == nil { + return bpi.ErrClosed + } + err := c.base.Close() + c.base = nil + return err +} + +func (c *chunked) Get() ([]byte, error) { + c.lock.Lock() + defer c.lock.Unlock() + + if + // TODO implement me + panic("implement me") +} + +func (c *chunked) Reader() (io.ReadCloser, error) { + // TODO implement me + panic("implement me") +} + +func (c *chunked) Digest() digest.Digest { + // TODO implement me + panic("implement me") +} + +func (c *chunked) MimeType() string { + // TODO implement me + panic("implement me") +} + +func (c *chunked) DigestKnown() bool { + // TODO implement me + panic("implement me") +} + +func (c *chunked) Size() int64 { + // TODO implement me + panic("implement me") +} From 1eba038539b3419697dc7667dedd6af034be972c Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Mon, 25 Nov 2024 17:17:15 +0100 Subject: [PATCH 02/13] OCI layer size limit --- api/ocm/cpi/repocpi/bridge_r.go | 10 ++ .../genericocireg/accessmethod_localblob.go | 119 ++++++++++++++++- .../genericocireg/componentversion.go | 121 +++++++++++++----- .../repositories/genericocireg/repo_test.go | 53 ++++++++ .../repositories/genericocireg/repository.go | 26 ++-- .../repositories/genericocireg/type.go | 23 +++- api/utils/accessio/chunkedreader.go | 33 +++-- api/utils/accessio/chunkedreader_test.go | 4 +- api/utils/blobaccess/chunked/access.go | 111 +++++++--------- api/utils/blobaccess/chunked/chunked_test.go | 112 ++++++++++++++++ api/utils/blobaccess/chunked/suite_test.go | 13 ++ 11 files changed, 493 insertions(+), 132 deletions(-) create mode 100644 api/utils/blobaccess/chunked/chunked_test.go create mode 100644 api/utils/blobaccess/chunked/suite_test.go diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go index f7eea176fe..bce9642245 100644 --- a/api/ocm/cpi/repocpi/bridge_r.go +++ b/api/ocm/cpi/repocpi/bridge_r.go @@ -34,6 +34,16 @@ type RepositoryImpl interface { io.Closer } +type Chunked interface { + SetBlobLimit(s int64) +} + +func SetBlobLimit(i RepositoryImpl, s int64) { + if c, ok := i.(Chunked); ok { + c.SetBlobLimit(s) + } +} + type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository] type repositoryBridge struct { diff --git a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go index 94a0a85966..2bb3feaad3 100644 --- a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go +++ b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go @@ -1,10 +1,13 @@ package genericocireg import ( + "bytes" "io" + "strings" "sync" "github.com/mandelsoft/goutils/errors" + "github.com/mandelsoft/goutils/finalizer" "github.com/opencontainers/go-digest" "ocm.software/ocm/api/oci" @@ -88,7 +91,17 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) { return nil, errors.ErrNotImplemented("artifact blob synthesis") } } - _, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) + refs := strings.Split(m.spec.LocalReference, ",") + + var ( + data blobaccess.DataAccess + err error + ) + if len(refs) < 2 { + _, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) + } else { + data = &composedBlock{m, refs} + } if err != nil { return nil, err } @@ -111,3 +124,107 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) { func (m *localBlobAccessMethod) MimeType() string { return m.spec.MediaType } + +//////////////////////////////////////////////////////////////////////////////// + +type composedBlock struct { + m *localBlobAccessMethod + refs []string +} + +var _ blobaccess.DataAccess = (*composedBlock)(nil) + +func (c *composedBlock) Get() ([]byte, error) { + buf := bytes.NewBuffer(nil) + for _, ref := range c.refs { + var finalize finalizer.Finalizer + + _, data, err := c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return nil, err + } + finalize.Close(data) + r, err := data.Reader() + if err != nil { + return nil, err + } + finalize.Close(r) + _, err = io.Copy(buf, r) + if err != nil { + return nil, err + } + err = finalize.Finalize() + if err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +func (c *composedBlock) Reader() (io.ReadCloser, error) { + return &composedReader{ + m: c.m, + refs: c.refs, + }, nil +} + +func (c composedBlock) Close() error { + return nil +} + +type composedReader struct { + lock sync.Mutex + m *localBlobAccessMethod + refs []string + reader io.ReadCloser + data blobaccess.DataAccess +} + +func (c *composedReader) Read(p []byte) (n int, err error) { + c.lock.Lock() + defer c.lock.Unlock() + + for { + if c.reader != nil { + n, err := c.reader.Read(p) + if n > 0 { + if err == io.EOF { + err = nil + } + return n, err + } + if err != nil { + return n, err + } + c.reader.Close() + c.data.Close() + c.reader = nil + } + if len(c.refs) == 0 { + return 0, io.EOF + } + + ref := strings.TrimSpace(c.refs[0]) + _, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref)) + if err != nil { + return 0, err + } + c.reader, err = c.data.Reader() + if err != nil { + return 0, err + } + } +} + +func (c *composedReader) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.reader != nil { + c.reader.Close() + c.data.Close() + c.reader = nil + c.refs = nil + } + return nil +} diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go index f603fbe509..c9b6338993 100644 --- a/api/ocm/extensions/repositories/genericocireg/componentversion.go +++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go @@ -9,6 +9,7 @@ import ( "github.com/mandelsoft/goutils/set" "github.com/opencontainers/go-digest" + "ocm.software/ocm/api/datacontext/attrs/vfsattr" "ocm.software/ocm/api/oci" "ocm.software/ocm/api/oci/artdesc" "ocm.software/ocm/api/oci/extensions/repositories/artifactset" @@ -25,7 +26,10 @@ import ( ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/accessobj" + "ocm.software/ocm/api/utils/blobaccess/blobaccess" + "ocm.software/ocm/api/utils/blobaccess/chunked" "ocm.software/ocm/api/utils/errkind" + "ocm.software/ocm/api/utils/mime" common "ocm.software/ocm/api/utils/misc" "ocm.software/ocm/api/utils/refmgmt" "ocm.software/ocm/api/utils/runtime" @@ -183,32 +187,36 @@ func (c *ComponentVersionContainer) Update() (bool, error) { layers.Add(i) } for i, r := range desc.Resources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed resource layer evaluation: %w", err) } - if l > 0 { - layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ - Kind: ARTKIND_RESOURCE, - Identity: r.GetIdentity(desc.Resources), - }) - layers.Delete(l) + if len(list) > 0 { + for _, l := range list { + layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ + Kind: ARTKIND_RESOURCE, + Identity: r.GetIdentity(desc.Resources), + }) + layers.Delete(l) + } } if s != r.Access { desc.Resources[i].Access = s } } for i, r := range desc.Sources { - s, l, err := c.evalLayer(r.Access) + s, list, err := c.evalLayer(r.Access) if err != nil { return false, fmt.Errorf("failed source layer evaluation: %w", err) } - if l > 0 { - layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ - Kind: ARTKIND_SOURCE, - Identity: r.GetIdentity(desc.Sources), - }) - layers.Delete(l) + if len(list) > 0 { + for _, l := range list { + layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ + Kind: ARTKIND_SOURCE, + Identity: r.GetIdentity(desc.Sources), + }) + layers.Delete(l) + } } if s != r.Access { desc.Sources[i].Access = s @@ -259,32 +267,45 @@ func (c *ComponentVersionContainer) Update() (bool, error) { return false, nil } -func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) { - var d *artdesc.Descriptor +func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, []int, error) { + var ( + d *artdesc.Descriptor + layernums []int + ) spec, err := c.GetContext().AccessSpecForSpec(s) if err != nil { - return s, 0, err + return s, nil, err } if a, ok := spec.(*localblob.AccessSpec); ok { if ok, _ := artdesc.IsDigest(a.LocalReference); !ok { - return s, 0, errors.ErrInvalid("digest", a.LocalReference) + return s, nil, errors.ErrInvalid("digest", a.LocalReference) } - d = &artdesc.Descriptor{Digest: digest.Digest(a.LocalReference), MediaType: a.GetMimeType()} - } - if d != nil { - // find layer - layers := c.manifest.GetDescriptor().Layers - maxLen := len(layers) - 1 - for i := range layers { - l := layers[len(layers)-1-i] - if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) { - return s, len(layers) - 1 - i, nil + refs := strings.Split(a.LocalReference, ",") + media := a.GetMimeType() + if len(refs) > 1 { + media = mime.MIME_OCTET + } + for _, ref := range refs { + d = &artdesc.Descriptor{Digest: digest.Digest(strings.TrimSpace(ref)), MediaType: media} + // find layer + layers := c.manifest.GetDescriptor().Layers + maxLen := len(layers) - 1 + found := false + for i := range layers { + l := layers[len(layers)-1-i] + if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) { + layernums = append(layernums, len(layers)-1-i) + found = true + break + } + } + if !found { + return s, nil, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } } - return s, 0, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType) } - return s, 0, nil + return s, layernums, nil } func (c *ComponentVersionContainer) GetDescriptor() *compdesc.ComponentDescriptor { @@ -304,15 +325,49 @@ func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, return nil, errors.New("a resource has to be defined") } + size := blob.Size() + var refs []string + if c.comp.repo.blobLimit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > c.comp.repo.blobLimit { + reader, err := blob.Reader() + if err != nil { + return nil, err + } + ch := chunked.New(reader, c.comp.repo.blobLimit, vfsattr.Get(c.GetContext())) + for { + b, err := ch.Next() + if err != nil { + return nil, errors.Wrapf(err, "chunked blob") + } + if b == nil { + break + } + err = c.addLayer(b, &refs) + b.Close() + + if err != nil { + return nil, errors.Wrapf(err, "chunked blob") + } + } + } else { + err := c.addLayer(blob, &refs) + if err != nil { + return nil, err + } + } + return localblob.New(strings.Join(refs, ","), refName, blob.MimeType(), global), nil +} + +func (c *ComponentVersionContainer) addLayer(blob cpi.BlobAccess, refs *[]string) error { err := c.manifest.AddBlob(blob) if err != nil { - return nil, err + return err } err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob) if err != nil { - return nil, err + return err } - return localblob.New(blob.Digest().String(), refName, blob.MimeType(), global), nil + *refs = append(*refs, blob.Digest().String()) + return nil } // AssureGlobalRef provides a global manifest for a local OCI Artifact. diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 5ea992c69a..f550ef0a6f 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -123,6 +123,59 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) + FIt("creates a dummy component with chunks", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + repocpi.SetBlobLimit(impl, 5) + + comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1"))) + + m1 := compdesc.NewResourceMeta("rsc1", resourcetypes.PLAIN_TEXT, metav1.LocalRelation) + blob := blobaccess.ForString(mime.MIME_TEXT, ocmtesthelper.S_TESTDATA) + MustBeSuccessful(vers.SetResourceBlob(m1, blob, "", nil)) + + MustBeSuccessful(comp.AddVersion(vers)) + + noref := vers.Repository() + Expect(noref).NotTo(BeNil()) + Expect(noref.IsClosed()).To(BeFalse()) + Expect(noref.Close()).To(Succeed()) + Expect(noref.IsClosed()).To(BeFalse()) + + MustBeSuccessful(finalize.Finalize()) + + Expect(noref.IsClosed()).To(BeTrue()) + Expect(noref.Close()).To(MatchError("closed")) + ExpectError(noref.LookupComponent("dummy")).To(MatchError("closed")) + + // access it again + repo = finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + + ok := Must(repo.ExistsComponentVersion(COMPONENT, "v1")) + Expect(ok).To(BeTrue()) + + comp = finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers = finalizer.ClosingWith(&finalize, Must(comp.LookupVersion("v1"))) + + rsc := Must(vers.GetResourceByIndex(0)) + acc := Must(rsc.Access()) + local, ok := acc.(*localblob.AccessSpec) + Expect(ok).To(BeTrue()) + Expect(local.LocalReference).To(Equal("sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d")) + Expect(rsc.Meta().Digest).NotTo(BeNil()) + Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) + + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + + MustBeSuccessful(finalize.Finalize()) + }) + It("handles legacylocalociblob access method", func() { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 70661f76f5..5bde9477b4 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -38,12 +38,13 @@ func GetOCIRepository(r cpi.Repository) ocicpi.Repository { } type RepositoryImpl struct { - bridge repocpi.RepositoryBridge - ctx cpi.Context - meta ComponentRepositoryMeta - nonref cpi.Repository - ocirepo oci.Repository - readonly bool + bridge repocpi.RepositoryBridge + ctx cpi.Context + meta ComponentRepositoryMeta + nonref cpi.Repository + ocirepo oci.Repository + readonly bool + blobLimit int64 } var ( @@ -51,16 +52,21 @@ var ( _ credentials.ConsumerIdentityProvider = (*RepositoryImpl)(nil) ) -func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository) cpi.Repository { +func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) impl := &RepositoryImpl{ - ctx: ctx, - meta: *DefaultComponentRepositoryMeta(meta), - ocirepo: ocirepo, + ctx: ctx, + meta: *DefaultComponentRepositoryMeta(meta), + ocirepo: ocirepo, + blobLimit: general.OptionalDefaulted(-1, blobLimit...), } return repocpi.NewRepository(impl, "OCM repo[OCI]") } +func (r *RepositoryImpl) SetBlobLimit(s int64) { + r.blobLimit = s +} + func (r *RepositoryImpl) Close() error { return r.ocirepo.Close() } diff --git a/api/ocm/extensions/repositories/genericocireg/type.go b/api/ocm/extensions/repositories/genericocireg/type.go index 809d598140..d9563748be 100644 --- a/api/ocm/extensions/repositories/genericocireg/type.go +++ b/api/ocm/extensions/repositories/genericocireg/type.go @@ -91,6 +91,7 @@ func NewComponentRepositoryMeta(subPath string, mapping ComponentNameMapping) *C type RepositorySpec struct { oci.RepositorySpec ComponentRepositoryMeta + BlobLimit int64 } var ( @@ -127,19 +128,26 @@ func (a *RepositorySpec) AsUniformSpec(cpi.Context) *cpi.UniformRepositorySpec { return &cpi.UniformRepositorySpec{Type: a.GetKind(), Scheme: spec.Scheme, Host: spec.Host, Info: spec.Info, TypeHint: spec.TypeHint, SubPath: a.SubPath} } +type meta struct { + ComponentRepositoryMeta `json:",inline"` + BlobLimit int64 `json:"blobLimit"` +} + func (u *RepositorySpec) UnmarshalJSON(data []byte) error { logrus.Debugf("unmarshal generic ocireg spec %s\n", string(data)) ocispec := &oci.GenericRepositorySpec{} if err := json.Unmarshal(data, ocispec); err != nil { return err } - compmeta := &ComponentRepositoryMeta{} - if err := json.Unmarshal(data, ocispec); err != nil { + + var m meta + if err := json.Unmarshal(data, &m); err != nil { return err } u.RepositorySpec = ocispec - u.ComponentRepositoryMeta = *compmeta + u.ComponentRepositoryMeta = m.ComponentRepositoryMeta + u.BlobLimit = m.BlobLimit normalizers.Normalize(u) return nil @@ -154,7 +162,12 @@ func (u RepositorySpec) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - compmeta, err := runtime.ToUnstructuredObject(u.ComponentRepositoryMeta) + + m := meta{ + ComponentRepositoryMeta: u.ComponentRepositoryMeta, + BlobLimit: u.BlobLimit, + } + compmeta, err := runtime.ToUnstructuredObject(&m) if err != nil { return nil, err } @@ -166,7 +179,7 @@ func (s *RepositorySpec) Repository(ctx cpi.Context, creds credentials.Credentia if err != nil { return nil, err } - return NewRepository(ctx, &s.ComponentRepositoryMeta, r), nil + return NewRepository(ctx, &s.ComponentRepositoryMeta, r, s.BlobLimit), nil } func (s *RepositorySpec) GetConsumerId(uctx ...credentials.UsageContext) credentials.ConsumerIdentity { diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go index 40ef814e75..6dc728f4d0 100644 --- a/api/utils/accessio/chunkedreader.go +++ b/api/utils/accessio/chunkedreader.go @@ -12,9 +12,9 @@ type ChunkedReader struct { lock sync.Mutex reader io.Reader buffer *bytes.Buffer - size uint64 - chunk uint64 - read uint64 + size int64 + chunk int + read int64 err error preread uint @@ -22,7 +22,7 @@ type ChunkedReader struct { var _ io.Reader = (*ChunkedReader)(nil) -func NewChunkedReader(r io.Reader, chunk uint64, preread ...uint) *ChunkedReader { +func NewChunkedReader(r io.Reader, chunk int64, preread ...uint) *ChunkedReader { return &ChunkedReader{ reader: r, size: chunk, @@ -37,13 +37,13 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { if c.read == c.size { return 0, io.EOF } - if c.read+uint64(len(p)) > c.size { + if c.read+int64(len(p)) > c.size { p = p[:c.size-c.read] // read at most rest of chunk size } if c.buffer != nil && c.buffer.Len() > 0 { // first, consume from buffer n, _ := c.buffer.Read(p) - c.read += uint64(n) + c.read += int64(n) if c.buffer.Len() == 0 { c.buffer = nil } @@ -56,7 +56,8 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { return 0, c.err } n, err = c.reader.Read(p) - c.read += uint64(n) + c.read += int64(n) + c.err = err return c.report(n, err) } @@ -67,11 +68,18 @@ func (c *ChunkedReader) report(n int, err error) (int, error) { return n, err } +func (c *ChunkedReader) ChunkNo() int { + c.lock.Lock() + defer c.lock.Unlock() + + return c.chunk +} + func (c *ChunkedReader) ChunkDone() bool { c.lock.Lock() defer c.lock.Unlock() - return c.read >= c.size + return c.read >= c.size || c.err != nil } func (c *ChunkedReader) Next() bool { @@ -82,12 +90,11 @@ func (c *ChunkedReader) Next() bool { return false } + // cannot assume that read with size 0 returns EOF as proposed + // by io.Reader.Read (see bytes.Buffer.Read). + // Therefore, we really have to read something. if c.buffer == nil { - // cannot assume that read with size 0 returns EOF as proposed - // by io.Reader.Read (see bytes.Buffer.Read). - // Therefore, we really have to read something. - - var buf = make([]byte, c.preread, c.preread) + buf := make([]byte, c.preread) n, err := c.reader.Read(buf) c.err = err if n > 0 { diff --git a/api/utils/accessio/chunkedreader_test.go b/api/utils/accessio/chunkedreader_test.go index 68a3f46eae..6dad018658 100644 --- a/api/utils/accessio/chunkedreader_test.go +++ b/api/utils/accessio/chunkedreader_test.go @@ -35,7 +35,7 @@ var _ = FDescribe("Test Environment", func() { n, err = chunked.Read(buf[:]) Expect(n).To(Equal(0)) Expect(err).To(Equal(io.EOF)) - Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.ChunkDone()).To(Equal(true)) Expect(chunked.Next()).To(Equal(false)) n, err = chunked.Read(buf[:]) @@ -55,7 +55,7 @@ var _ = FDescribe("Test Environment", func() { n, err = chunked.Read(buf[:]) Expect(n).To(Equal(0)) Expect(err).To(Equal(io.EOF)) - Expect(chunked.ChunkDone()).To(Equal(false)) + Expect(chunked.ChunkDone()).To(Equal(true)) Expect(chunked.Next()).To(Equal(false)) n, err = chunked.Read(buf[:]) diff --git a/api/utils/blobaccess/chunked/access.go b/api/utils/blobaccess/chunked/access.go index 3b67cadffa..2643681cd9 100644 --- a/api/utils/blobaccess/chunked/access.go +++ b/api/utils/blobaccess/chunked/access.go @@ -1,92 +1,67 @@ package chunked import ( + "fmt" "io" "sync" - "github.com/opencontainers/go-digest" + "github.com/mandelsoft/vfs/pkg/vfs" + "ocm.software/ocm/api/utils" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/blobaccess" "ocm.software/ocm/api/utils/blobaccess/bpi" + "ocm.software/ocm/api/utils/mime" ) -type Chunked interface { - bpi.BlobAccess - - Next() bool -} - -type chunked struct { - lock sync.Mutex - base bpi.BlobAccess - blobsize uint64 - chunksize uint64 - preread uint - - reader io.Reader -} - -var _ bpi.BlobAccessBase = (*chunked)(nil) - -func New(acc bpi.BlobAccess, chunk uint64, preread...uint) (Chunked, error) { - b, err := acc.Dup() +func newChunck(r io.Reader, fss ...vfs.FileSystem) (bpi.BlobAccess, error) { + t, err := blobaccess.NewTempFile("", "chunk-*", fss...) if err != nil { return nil, err } - s := acc.Size() - - return bpi.NewBlobAccessForBase(&chunked{base: b, blobsize: size, chunksize: chunk, preread: utils.OptionalDefaulted(8096, preread...)}), nil -} - -type view struct { - bpi.BlobAccess -} - -func (v *view) Dup() bpi.BlobAccess { - -} -func (c *chunked) Close() error { - c.lock.Lock() - defer c.lock.Unlock() - - if c.base == nil { - return bpi.ErrClosed + _, err = io.Copy(t.Writer(), r) + if err != nil { + t.Close() + return nil, err } - err := c.base.Close() - c.base = nil - return err -} - -func (c *chunked) Get() ([]byte, error) { - c.lock.Lock() - defer c.lock.Unlock() - - if - // TODO implement me - panic("implement me") -} - -func (c *chunked) Reader() (io.ReadCloser, error) { - // TODO implement me - panic("implement me") + return t.AsBlob(mime.MIME_OCTET), nil } -func (c *chunked) Digest() digest.Digest { - // TODO implement me - panic("implement me") +type ChunkedBlobSource interface { + Next() (bpi.BlobAccess, error) } -func (c *chunked) MimeType() string { - // TODO implement me - panic("implement me") +type chunkedAccess struct { + lock sync.Mutex + chunksize int64 + reader *accessio.ChunkedReader + fs vfs.FileSystem + cont bool } -func (c *chunked) DigestKnown() bool { - // TODO implement me - panic("implement me") +func New(r io.Reader, chunksize int64, fss ...vfs.FileSystem) ChunkedBlobSource { + reader := accessio.NewChunkedReader(r, chunksize) + return &chunkedAccess{ + chunksize: chunksize, + reader: reader, + fs: utils.FileSystem(fss...), + cont: false, + } } -func (c *chunked) Size() int64 { - // TODO implement me - panic("implement me") +func (r *chunkedAccess) Next() (bpi.BlobAccess, error) { + r.lock.Lock() + defer r.lock.Unlock() + + if r.cont { + if !r.reader.ChunkDone() { + return nil, fmt.Errorf("unexpected incomplete read") + } + if !r.reader.Next() { + return nil, nil + } + } + r.cont = true + return newChunck(r.reader, r.fs) } diff --git a/api/utils/blobaccess/chunked/chunked_test.go b/api/utils/blobaccess/chunked/chunked_test.go new file mode 100644 index 0000000000..958c94fd3c --- /dev/null +++ b/api/utils/blobaccess/chunked/chunked_test.go @@ -0,0 +1,112 @@ +package chunked_test + +import ( + "bytes" + "io" + + "github.com/mandelsoft/goutils/finalizer" + . "github.com/mandelsoft/goutils/testutils" + "github.com/mandelsoft/vfs/pkg/memoryfs" + "github.com/mandelsoft/vfs/pkg/vfs" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/utils/blobaccess/chunked" +) + +var _ = Describe("Chunked Blobs", func() { + var blobData = []byte("a1a2a3a4a5a6a7a8a9a0b1b2b3b4b5b6b7b8b9b0") + + var src chunked.ChunkedBlobSource + var fs vfs.FileSystem + + Context("blobs", func() { + BeforeEach(func() { + fs = memoryfs.New() + }) + + AfterEach(func() { + vfs.Cleanup(fs) + }) + + It("small blobs", func() { + src = chunked.New(bytes.NewBuffer(blobData), 50, fs) + n := 0 + buf := bytes.NewBuffer(nil) + + var finalize finalizer.Finalizer + defer finalize.Finalize() + for { + b := Must(src.Next()) + if b == nil { + break + } + n++ + finalize.Close(b) + r := Must(b.Reader()) + _, err := io.Copy(buf, r) + r.Close() + MustBeSuccessful(err) + } + Expect(n).To(Equal(1)) + Expect(buf.String()).To(Equal(string(blobData))) + + MustBeSuccessful(finalize.Finalize()) + list := Must(vfs.ReadDir(fs, "/")) + Expect(len(list)).To(Equal(0)) + }) + + It("matching blobs", func() { + src = chunked.New(bytes.NewBuffer(blobData), 40, fs) + n := 0 + buf := bytes.NewBuffer(nil) + + var finalize finalizer.Finalizer + defer finalize.Finalize() + for { + b := Must(src.Next()) + if b == nil { + break + } + n++ + finalize.Close(b) + r := Must(b.Reader()) + _, err := io.Copy(buf, r) + r.Close() + MustBeSuccessful(err) + } + Expect(n).To(Equal(1)) + Expect(buf.String()).To(Equal(string(blobData))) + + MustBeSuccessful(finalize.Finalize()) + list := Must(vfs.ReadDir(fs, "/")) + Expect(len(list)).To(Equal(0)) + }) + + It("large blobs", func() { + src = chunked.New(bytes.NewBuffer(blobData), 18, fs) + n := 0 + buf := bytes.NewBuffer(nil) + + var finalize finalizer.Finalizer + defer finalize.Finalize() + for { + b := Must(src.Next()) + if b == nil { + break + } + n++ + finalize.Close(b) + r := Must(b.Reader()) + _, err := io.Copy(buf, r) + r.Close() + MustBeSuccessful(err) + } + Expect(n).To(Equal(3)) + Expect(buf.String()).To(Equal(string(blobData))) + + MustBeSuccessful(finalize.Finalize()) + list := Must(vfs.ReadDir(fs, "/")) + Expect(len(list)).To(Equal(0)) + }) + }) +}) diff --git a/api/utils/blobaccess/chunked/suite_test.go b/api/utils/blobaccess/chunked/suite_test.go new file mode 100644 index 0000000000..55e1b7ca35 --- /dev/null +++ b/api/utils/blobaccess/chunked/suite_test.go @@ -0,0 +1,13 @@ +package chunked_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfig(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "chunked blobs Test Suite") +} From 39bff33ae467995ef22fcf54110927df90a85d1e Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Tue, 26 Nov 2024 10:41:05 +0100 Subject: [PATCH 03/13] blob limit configuration --- api/credentials/identity/hostpath/identity.go | 11 ++ .../repositories/genericocireg/config/type.go | 128 ++++++++++++++++++ .../repositories/genericocireg/config_test.go | 62 +++++++++ .../repositories/genericocireg/repo_test.go | 2 +- .../repositories/genericocireg/repository.go | 33 +++++ .../repositories/genericocireg/type.go | 13 +- docs/reference/ocm_configfile.md | 41 ++++++ 7 files changed, 285 insertions(+), 5 deletions(-) create mode 100644 api/ocm/extensions/repositories/genericocireg/config/type.go create mode 100644 api/ocm/extensions/repositories/genericocireg/config_test.go diff --git a/api/credentials/identity/hostpath/identity.go b/api/credentials/identity/hostpath/identity.go index baa2aad8a9..968412dda6 100644 --- a/api/credentials/identity/hostpath/identity.go +++ b/api/credentials/identity/hostpath/identity.go @@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string { } return strings.TrimPrefix(id[ID_PATHPREFIX], "/") } + +func HostPort(id cpi.ConsumerIdentity) string { + if id == nil { + return "" + } + host := id[ID_HOSTNAME] + if port, ok := id[ID_PORT]; ok { + return host + ":" + port + } + return host +} diff --git a/api/ocm/extensions/repositories/genericocireg/config/type.go b/api/ocm/extensions/repositories/genericocireg/config/type.go new file mode 100644 index 0000000000..ceb4c8d1a6 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config/type.go @@ -0,0 +1,128 @@ +package config + +import ( + "strings" + + "ocm.software/ocm/api/config" + cfgcpi "ocm.software/ocm/api/config/cpi" + "ocm.software/ocm/api/utils/runtime" +) + +const ( + ConfigType = "blobLimits.ocireg.ocm" + cfgcpi.OCM_CONFIG_TYPE_SUFFIX + ConfigTypeV1 = ConfigType + runtime.VersionSeparator + "v1" +) + +func init() { + cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigType, usage)) + cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigTypeV1)) +} + +// Config describes a memory based config interface +// for configuring blob limits for underlying OCI manifest layers. +type Config struct { + runtime.ObjectVersionedType `json:",inline"` + // BlobLimits describe the limit setting for host:port + // entries. As a spcial case (for testing) it is possible + // to configure linits for CTF, also, by using "@"+filepath. + BlobLimits BlobLimits `json:"blobLimits"` +} + +type BlobLimits map[string]int64 + +func (b BlobLimits) GetLimit(hostport string) int64 { + if b == nil { + return -1 + } + host := hostport + i := strings.Index(hostport, ":") + if i > 0 { + host = hostport[:i] + } + + l, ok := b[hostport] + if ok { + return l + } + l, ok = b[host] + if ok { + return l + } + return -1 +} + +type Configurable interface { + ConfigureBlobLimits(limits BlobLimits) +} + +// New creates a blob limit ConfigSpec. +func New() *Config { + return &Config{ + ObjectVersionedType: runtime.NewVersionedTypedObject(ConfigType), + } +} + +func (a *Config) GetType() string { + return ConfigType +} + +func (a *Config) AddLimit(hostport string, limit int64) { + if a.BlobLimits == nil { + a.BlobLimits = BlobLimits{} + } + a.BlobLimits[hostport] = limit +} + +func (a *Config) ApplyTo(ctx config.Context, target interface{}) error { + t, ok := target.(Configurable) + if !ok { + return config.ErrNoContext(ConfigType) + } + if a.BlobLimits != nil { + t.ConfigureBlobLimits(a.BlobLimits) + } + return nil +} + +const usage = ` +The config type ` + ConfigType + ` can be used to set some +configurations for an OCM context; + +
+    type: ` + ConfigType + `
+    aliases:
+       myrepo: 
+          type: <any repository type>
+          <specification attributes>
+          ...
+    resolvers:
+      - repository:
+          type: <any repository type>
+          <specification attributes>
+          ...
+        prefix: ghcr.io/open-component-model/ocm
+        priority: 10
+
+ +With aliases repository alias names can be mapped to a repository specification. +The alias name can be used in a string notation for an OCM repository. + +Resolvers define a list of OCM repository specifications to be used to resolve +dedicated component versions. These settings are used to compose a standard +component version resolver provided for an OCM context. Optionally, a component +name prefix can be given. It limits the usage of the repository to resolve only +components with the given name prefix (always complete name segments). +An optional priority can be used to influence the lookup order. Larger value +means higher priority (default 10). + +All matching entries are tried to lookup a component version in the following +order: +- highest priority first +- longest matching sequence of component name segments first. + +If resolvers are defined, it is possible to use component version names on the +command line without a repository. The names are resolved with the specified +resolution rule. +They are also used as default lookup repositories to lookup component references +for recursive operations on component versions (--lookup option). +` diff --git a/api/ocm/extensions/repositories/genericocireg/config_test.go b/api/ocm/extensions/repositories/genericocireg/config_test.go new file mode 100644 index 0000000000..2435497a81 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config_test.go @@ -0,0 +1,62 @@ +package genericocireg_test + +import ( + "reflect" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/datacontext" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" + + "github.com/mandelsoft/goutils/finalizer" + "github.com/mandelsoft/vfs/pkg/osfs" + "github.com/mandelsoft/vfs/pkg/vfs" + "ocm.software/ocm/api/oci" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/ocm" + "ocm.software/ocm/api/ocm/cpi/repocpi" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/accessobj" +) + +var _ = Describe("component repository mapping", func() { + var tempfs vfs.FileSystem + + var ocispec oci.RepositorySpec + var spec *genericocireg.RepositorySpec + + BeforeEach(func() { + t, err := osfs.NewTempFileSystem() + Expect(err).To(Succeed()) + tempfs = t + + // ocmlog.Context().AddRule(logging.NewConditionRule(logging.TraceLevel, accessio.ALLOC_REALM)) + + ocispec, err = ctf.NewRepositorySpec(accessobj.ACC_CREATE, "test", accessio.PathFileSystem(tempfs), accessobj.FormatDirectory) + Expect(err).To(Succeed()) + spec = genericocireg.NewRepositorySpec(ocispec, nil) + }) + + AfterEach(func() { + vfs.Cleanup(tempfs) + }) + + It("creates a dummy component with configured chunks", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + ctx := ocm.New(datacontext.MODE_EXTENDED) + + cfg := config.New() + cfg.AddLimit("@test", 5) + ctx.ConfigContext().ApplyConfig(cfg, "direct") + + repo := finalizer.ClosingWith(&finalize, Must(ctx.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + + Expect(impl.(*genericocireg.RepositoryImpl).GetBlobLimit()).To(Equal(int64(5))) + }) +}) diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index f550ef0a6f..5ebea32af2 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -123,7 +123,7 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) - FIt("creates a dummy component with chunks", func() { + It("creates a dummy component with chunks", func() { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 5bde9477b4..7db37b463d 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -11,12 +11,16 @@ import ( "github.com/mandelsoft/goutils/general" "ocm.software/ocm/api/credentials" + "ocm.software/ocm/api/credentials/identity/hostpath" "ocm.software/ocm/api/datacontext" "ocm.software/ocm/api/oci" ocicpi "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/oci/extensions/repositories/ocireg" "ocm.software/ocm/api/ocm/cpi" "ocm.software/ocm/api/ocm/cpi/repocpi" "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/componentmapping" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" ) type OCIBasedRepository interface { @@ -50,23 +54,52 @@ type RepositoryImpl struct { var ( _ repocpi.RepositoryImpl = (*RepositoryImpl)(nil) _ credentials.ConsumerIdentityProvider = (*RepositoryImpl)(nil) + _ config.Configurable = (*RepositoryImpl)(nil) ) func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) + impl := &RepositoryImpl{ ctx: ctx, meta: *DefaultComponentRepositoryMeta(meta), ocirepo: ocirepo, blobLimit: general.OptionalDefaulted(-1, blobLimit...), } + if len(blobLimit) == 0 { + ctxp.OCMContext().ConfigContext().ApplyTo(0, impl) + } return repocpi.NewRepository(impl, "OCM repo[OCI]") } +func (r *RepositoryImpl) ConfigureBlobLimits(limits config.BlobLimits) { + if len(limits) == 0 { + return + } + if spec, ok := r.ocirepo.GetSpecification().(*ocireg.RepositorySpec); ok { + id := spec.GetConsumerId() + hp := hostpath.HostPort(id) + l := limits.GetLimit(hp) + if l >= 0 { + r.blobLimit = l + } + } + if spec, ok := r.ocirepo.GetSpecification().(*ctf.RepositorySpec); ok { + l := limits.GetLimit("@" + spec.FilePath) + if l >= 0 { + r.blobLimit = l + } + } +} + func (r *RepositoryImpl) SetBlobLimit(s int64) { r.blobLimit = s } +func (r *RepositoryImpl) GetBlobLimit() int64 { + return r.blobLimit +} + func (r *RepositoryImpl) Close() error { return r.ocirepo.Close() } diff --git a/api/ocm/extensions/repositories/genericocireg/type.go b/api/ocm/extensions/repositories/genericocireg/type.go index d9563748be..b3c4b460e5 100644 --- a/api/ocm/extensions/repositories/genericocireg/type.go +++ b/api/ocm/extensions/repositories/genericocireg/type.go @@ -91,7 +91,7 @@ func NewComponentRepositoryMeta(subPath string, mapping ComponentNameMapping) *C type RepositorySpec struct { oci.RepositorySpec ComponentRepositoryMeta - BlobLimit int64 + BlobLimit *int64 } var ( @@ -130,7 +130,7 @@ func (a *RepositorySpec) AsUniformSpec(cpi.Context) *cpi.UniformRepositorySpec { type meta struct { ComponentRepositoryMeta `json:",inline"` - BlobLimit int64 `json:"blobLimit"` + BlobLimit *int64 `json:"blobLimit,omitempty"` } func (u *RepositorySpec) UnmarshalJSON(data []byte) error { @@ -147,7 +147,9 @@ func (u *RepositorySpec) UnmarshalJSON(data []byte) error { u.RepositorySpec = ocispec u.ComponentRepositoryMeta = m.ComponentRepositoryMeta - u.BlobLimit = m.BlobLimit + if m.BlobLimit != nil { + u.BlobLimit = m.BlobLimit + } normalizers.Normalize(u) return nil @@ -179,7 +181,10 @@ func (s *RepositorySpec) Repository(ctx cpi.Context, creds credentials.Credentia if err != nil { return nil, err } - return NewRepository(ctx, &s.ComponentRepositoryMeta, r, s.BlobLimit), nil + if s.BlobLimit != nil { + return NewRepository(ctx, &s.ComponentRepositoryMeta, r, *s.BlobLimit), nil + } + return NewRepository(ctx, &s.ComponentRepositoryMeta, r), nil } func (s *RepositorySpec) GetConsumerId(uctx ...credentials.UsageContext) credentials.ConsumerIdentity { diff --git a/docs/reference/ocm_configfile.md b/docs/reference/ocm_configfile.md index ae9e51920d..5ac8180bf6 100644 --- a/docs/reference/ocm_configfile.md +++ b/docs/reference/ocm_configfile.md @@ -24,6 +24,47 @@ The following configuration types are supported: <name>: <yaml defining the attribute> ... +- blobLimits.ocireg.ocm.config.ocm.software + The config type blobLimits.ocireg.ocm.config.ocm.software can be used to set some + configurations for an OCM context; + +
+      type: blobLimits.ocireg.ocm.config.ocm.software
+      aliases:
+         myrepo:
+            type: <any repository type>
+            <specification attributes>
+            ...
+      resolvers:
+        - repository:
+            type: <any repository type>
+            <specification attributes>
+            ...
+          prefix: ghcr.io/open-component-model/ocm
+          priority: 10
+  
+ + With aliases repository alias names can be mapped to a repository specification. + The alias name can be used in a string notation for an OCM repository. + + Resolvers define a list of OCM repository specifications to be used to resolve + dedicated component versions. These settings are used to compose a standard + component version resolver provided for an OCM context. Optionally, a component + name prefix can be given. It limits the usage of the repository to resolve only + components with the given name prefix (always complete name segments). + An optional priority can be used to influence the lookup order. Larger value + means higher priority (default 10). + + All matching entries are tried to lookup a component version in the following + order: + - highest priority first + - longest matching sequence of component name segments first. + + If resolvers are defined, it is possible to use component version names on the + command line without a repository. The names are resolved with the specified + resolution rule. + They are also used as default lookup repositories to lookup component references + for recursive operations on component versions (--lookup option). - cli.ocm.config.ocm.software The config type cli.ocm.config.ocm.software is used to handle the main configuration flags of the OCM command line tool. From 780bcdf6a65505382e39f75dc8b67390c3555583 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Tue, 26 Nov 2024 11:08:35 +0100 Subject: [PATCH 04/13] docs --- api/utils/accessio/chunkedreader.go | 8 +++++++- api/utils/accessio/chunkedreader_test.go | 3 ++- api/utils/blobaccess/chunked/access.go | 12 ++++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go index 6dc728f4d0..042794c8ee 100644 --- a/api/utils/accessio/chunkedreader.go +++ b/api/utils/accessio/chunkedreader.go @@ -8,6 +8,12 @@ import ( "github.com/mandelsoft/goutils/general" ) +// ChunkedReader splits a reader into several +// logical readers with a limited content size. +// Once the reader reaches its limits it provides +// a io.EOF. +// It can be continued by Calling Next, which returns +// whether a follow-up is required or not. type ChunkedReader struct { lock sync.Mutex reader io.Reader @@ -34,7 +40,7 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { c.lock.Lock() defer c.lock.Unlock() - if c.read == c.size { + if c.read >= c.size { return 0, io.EOF } if c.read+int64(len(p)) > c.size { diff --git a/api/utils/accessio/chunkedreader_test.go b/api/utils/accessio/chunkedreader_test.go index 6dad018658..2b085dc156 100644 --- a/api/utils/accessio/chunkedreader_test.go +++ b/api/utils/accessio/chunkedreader_test.go @@ -6,10 +6,11 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "ocm.software/ocm/api/utils/accessio" ) -var _ = FDescribe("Test Environment", func() { +var _ = Describe("Test Environment", func() { in := "12345678901234567890" var buf *bytes.Buffer var chunked *accessio.ChunkedReader diff --git a/api/utils/blobaccess/chunked/access.go b/api/utils/blobaccess/chunked/access.go index 2643681cd9..73f498273e 100644 --- a/api/utils/blobaccess/chunked/access.go +++ b/api/utils/blobaccess/chunked/access.go @@ -14,7 +14,7 @@ import ( "ocm.software/ocm/api/utils/mime" ) -func newChunck(r io.Reader, fss ...vfs.FileSystem) (bpi.BlobAccess, error) { +func newChunk(r io.Reader, fss ...vfs.FileSystem) (bpi.BlobAccess, error) { t, err := blobaccess.NewTempFile("", "chunk-*", fss...) if err != nil { return nil, err @@ -28,6 +28,8 @@ func newChunck(r io.Reader, fss ...vfs.FileSystem) (bpi.BlobAccess, error) { return t.AsBlob(mime.MIME_OCTET), nil } +// ChunkedBlobSource provides a sequence of +// bpi.BlobAccess objects. type ChunkedBlobSource interface { Next() (bpi.BlobAccess, error) } @@ -40,6 +42,12 @@ type chunkedAccess struct { cont bool } +// New provides a sequence of +// bpi.BlobAccess objects for a given io.Reader +// each with a limited size. +// The provided blobs are temporarily stored +// on the filesystem and can therefore be kept +// and accessed any number of times until they are closed. func New(r io.Reader, chunksize int64, fss ...vfs.FileSystem) ChunkedBlobSource { reader := accessio.NewChunkedReader(r, chunksize) return &chunkedAccess{ @@ -63,5 +71,5 @@ func (r *chunkedAccess) Next() (bpi.BlobAccess, error) { } } r.cont = true - return newChunck(r.reader, r.fs) + return newChunk(r.reader, r.fs) } From e2fb84eafe5713c8a171baff3ffee768be0b08f6 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Thu, 28 Nov 2024 00:59:55 +0100 Subject: [PATCH 05/13] incorporate review --- api/ocm/cpi/repocpi/bridge_r.go | 14 +++- .../genericocireg/accessmethod_localblob.go | 39 ++++++---- .../repositories/genericocireg/bloblimits.go | 46 ++++++++++++ .../genericocireg/componentversion.go | 42 ++++++----- .../repositories/genericocireg/config/type.go | 71 +++++++------------ .../repositories/genericocireg/repo_test.go | 39 ++++++++-- .../repositories/genericocireg/repository.go | 30 +++++--- api/utils/accessio/chunkedreader.go | 26 +++---- 8 files changed, 198 insertions(+), 109 deletions(-) create mode 100644 api/ocm/extensions/repositories/genericocireg/bloblimits.go diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go index bce9642245..c9ffb8ed50 100644 --- a/api/ocm/cpi/repocpi/bridge_r.go +++ b/api/ocm/cpi/repocpi/bridge_r.go @@ -34,14 +34,22 @@ type RepositoryImpl interface { io.Closer } +// Chunked is an optional interface, which +// may be implemented to accept a blob limit for mapping +// local blobs to an external storage system. type Chunked interface { - SetBlobLimit(s int64) + // SetBlobLimit sets the blob limit if possible. + // It returns true, if this was successful. + SetBlobLimit(s int64) bool } -func SetBlobLimit(i RepositoryImpl, s int64) { +// SetBlobLimit tries to set a blob limt for a repository +// implementation. It returns true, if this was possible. +func SetBlobLimit(i RepositoryImpl, s int64) bool { if c, ok := i.(Chunked); ok { - c.SetBlobLimit(s) + return c.SetBlobLimit(s) } + return false } type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository] diff --git a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go index 2bb3feaad3..c661c9f294 100644 --- a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go +++ b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go @@ -3,6 +3,7 @@ package genericocireg import ( "bytes" "io" + "os" "strings" "sync" @@ -99,12 +100,12 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) { ) if len(refs) < 2 { _, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference)) + if err != nil { + return nil, err + } } else { data = &composedBlock{m, refs} } - if err != nil { - return nil, err - } m.data = data return m.data, err } @@ -168,7 +169,7 @@ func (c *composedBlock) Reader() (io.ReadCloser, error) { }, nil } -func (c composedBlock) Close() error { +func (c *composedBlock) Close() error { return nil } @@ -187,19 +188,28 @@ func (c *composedReader) Read(p []byte) (n int, err error) { for { if c.reader != nil { n, err := c.reader.Read(p) - if n > 0 { - if err == io.EOF { - err = nil - } - return n, err + + if err == io.EOF { + c.reader.Close() + c.data.Close() + c.refs = c.refs[1:] + c.reader = nil + c.data = nil + // start new layer and return partial (>0) read before next layer is started + err = nil } - if err != nil { + // return partial read (even a zero read if layer is not yet finished) or error + if c.reader != nil || err != nil || n > 0 { return n, err } - c.reader.Close() - c.data.Close() - c.reader = nil + // otherwise, we can use the given buffer for the next layer + + // now, we have to check for a next succeeding layer. + // This means to finish with the actual reader and continue + // with the next one. } + + // If no more layers are available, report EOF. if len(c.refs) == 0 { return 0, io.EOF } @@ -220,6 +230,9 @@ func (c *composedReader) Close() error { c.lock.Lock() defer c.lock.Unlock() + if c.reader == nil && c.refs == nil { + return os.ErrClosed + } if c.reader != nil { c.reader.Close() c.data.Close() diff --git a/api/ocm/extensions/repositories/genericocireg/bloblimits.go b/api/ocm/extensions/repositories/genericocireg/bloblimits.go new file mode 100644 index 0000000000..2ecb09d690 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/bloblimits.go @@ -0,0 +1,46 @@ +package genericocireg + +import ( + "sync" + + configctx "ocm.software/ocm/api/config" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" +) + +var ( + defaultBlobLimits config.BlobLimits + lock sync.Mutex +) + +func init() { + defaultBlobLimits = config.BlobLimits{} + + // Add limits for known OCI repositories, here, + // or provide init functions in specialized packages + // by calling AddDefaultBlobLimit. +} + +// AddDefaultBlobLimit can be used to set default blob limits +// for known repositories. +// Those limits will be overwritten, by blob limits +// given by a configuration ovject and the repository +// specification +func AddDefaultBlobLimit(name string, limit int64) { + lock.Lock() + defer lock.Unlock() + + defaultBlobLimits[name] = limit +} + +func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) { + if target != nil { + lock.Lock() + defer lock.Unlock() + + target.ConfigureBlobLimits(defaultBlobLimits) + + if ctx != nil { + ctx.ConfigContext().ApplyTo(0, target) + } + } +} diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go index c9b6338993..f415fd9f1b 100644 --- a/api/ocm/extensions/repositories/genericocireg/componentversion.go +++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go @@ -191,14 +191,12 @@ func (c *ComponentVersionContainer) Update() (bool, error) { if err != nil { return false, fmt.Errorf("failed resource layer evaluation: %w", err) } - if len(list) > 0 { - for _, l := range list { - layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ - Kind: ARTKIND_RESOURCE, - Identity: r.GetIdentity(desc.Resources), - }) - layers.Delete(l) - } + for _, l := range list { + layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ + Kind: ARTKIND_RESOURCE, + Identity: r.GetIdentity(desc.Resources), + }) + layers.Delete(l) } if s != r.Access { desc.Resources[i].Access = s @@ -209,14 +207,12 @@ func (c *ComponentVersionContainer) Update() (bool, error) { if err != nil { return false, fmt.Errorf("failed source layer evaluation: %w", err) } - if len(list) > 0 { - for _, l := range list { - layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ - Kind: ARTKIND_SOURCE, - Identity: r.GetIdentity(desc.Sources), - }) - layers.Delete(l) - } + for _, l := range list { + layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{ + Kind: ARTKIND_SOURCE, + Identity: r.GetIdentity(desc.Sources), + }) + layers.Delete(l) } if s != r.Access { desc.Sources[i].Access = s @@ -292,10 +288,10 @@ func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.A layers := c.manifest.GetDescriptor().Layers maxLen := len(layers) - 1 found := false - for i := range layers { - l := layers[len(layers)-1-i] - if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) { - layernums = append(layernums, len(layers)-1-i) + for i := maxLen; i > 0; i-- { // layer 0 is the component descriptor + l := layers[i] + if l.Digest == d.Digest { + layernums = append(layernums, i) found = true break } @@ -326,13 +322,15 @@ func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, } size := blob.Size() + limit := c.comp.repo.blobLimit var refs []string - if c.comp.repo.blobLimit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > c.comp.repo.blobLimit { + if limit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > limit { reader, err := blob.Reader() if err != nil { return nil, err } - ch := chunked.New(reader, c.comp.repo.blobLimit, vfsattr.Get(c.GetContext())) + defer reader.Close() + ch := chunked.New(reader, limit, vfsattr.Get(c.GetContext())) for { b, err := ch.Next() if err != nil { diff --git a/api/ocm/extensions/repositories/genericocireg/config/type.go b/api/ocm/extensions/repositories/genericocireg/config/type.go index ceb4c8d1a6..fae19804b9 100644 --- a/api/ocm/extensions/repositories/genericocireg/config/type.go +++ b/api/ocm/extensions/repositories/genericocireg/config/type.go @@ -1,6 +1,7 @@ package config import ( + "net" "strings" "ocm.software/ocm/api/config" @@ -23,8 +24,8 @@ func init() { type Config struct { runtime.ObjectVersionedType `json:",inline"` // BlobLimits describe the limit setting for host:port - // entries. As a spcial case (for testing) it is possible - // to configure linits for CTF, also, by using "@"+filepath. + // entries. As a special case (for testing) it is possible + // to configure limits for CTF, also, by using "@"+filepath. BlobLimits BlobLimits `json:"blobLimits"` } @@ -34,19 +35,19 @@ func (b BlobLimits) GetLimit(hostport string) int64 { if b == nil { return -1 } - host := hostport - i := strings.Index(hostport, ":") - if i > 0 { - host = hostport[:i] - } - l, ok := b[hostport] if ok { return l } - l, ok = b[host] - if ok { - return l + + if !strings.HasPrefix(hostport, "@") { + host, _, err := net.SplitHostPort(hostport) + if err == nil { + l, ok = b[host] + if ok { + return l + } + } } return -1 } @@ -86,43 +87,23 @@ func (a *Config) ApplyTo(ctx config.Context, target interface{}) error { const usage = ` The config type ` + ConfigType + ` can be used to set some -configurations for an OCM context; +blob layer limits for particular OCI registries used to host OCM repositories;
     type: ` + ConfigType + `
-    aliases:
-       myrepo: 
-          type: <any repository type>
-          <specification attributes>
-          ...
-    resolvers:
-      - repository:
-          type: <any repository type>
-          <specification attributes>
-          ...
-        prefix: ghcr.io/open-component-model/ocm
-        priority: 10
+    blobLimits:
+        dummy.io: 65564
+        dummy.io:8443: 32768
 
-With aliases repository alias names can be mapped to a repository specification. -The alias name can be used in a string notation for an OCM repository. - -Resolvers define a list of OCM repository specifications to be used to resolve -dedicated component versions. These settings are used to compose a standard -component version resolver provided for an OCM context. Optionally, a component -name prefix can be given. It limits the usage of the repository to resolve only -components with the given name prefix (always complete name segments). -An optional priority can be used to influence the lookup order. Larger value -means higher priority (default 10). - -All matching entries are tried to lookup a component version in the following -order: -- highest priority first -- longest matching sequence of component name segments first. - -If resolvers are defined, it is possible to use component version names on the -command line without a repository. The names are resolved with the specified -resolution rule. -They are also used as default lookup repositories to lookup component references -for recursive operations on component versions (--lookup option). +If blob limits apply to a registry, local blobs with a size larger than +the configured limit will be split into several layers with a maximum +size of the given value. + +These settings can be overwritten by explicit settings in an OCM +repository specification for those repositories. + +The most specific entry will be used. If a registry with a dedicated +port is requested, but no explicit such configuration is found, the +setting for the sole hostname is used (if configured). ` diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 5ebea32af2..295d31b69d 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -2,6 +2,7 @@ package genericocireg_test import ( "fmt" + "io" "path" "reflect" @@ -123,7 +124,7 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) - It("creates a dummy component with chunks", func() { + DescribeTable("creates a dummy component with chunks", func(f func(ocm.ResourceAccess)) { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) @@ -170,11 +171,41 @@ var _ = Describe("component repository mapping", func() { Expect(rsc.Meta().Digest).NotTo(BeNil()) Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) - data := Must(ocmutils.GetResourceData(rsc)) - Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + f(rsc) MustBeSuccessful(finalize.Finalize()) - }) + }, + Entry("get blob", func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }), + Entry("stream blob", func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }), + Entry("stream blob with small buffer", func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }), + ) It("handles legacylocalociblob access method", func() { var finalize finalizer.Finalizer diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 7db37b463d..2a47a27b3a 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -42,12 +42,16 @@ func GetOCIRepository(r cpi.Repository) ocicpi.Repository { } type RepositoryImpl struct { - bridge repocpi.RepositoryBridge - ctx cpi.Context - meta ComponentRepositoryMeta - nonref cpi.Repository - ocirepo oci.Repository - readonly bool + bridge repocpi.RepositoryBridge + ctx cpi.Context + meta ComponentRepositoryMeta + nonref cpi.Repository + ocirepo oci.Repository + readonly bool + // blobLimit is the size limit for layers maintained for the storage of localBlobs. + // The value -1 means an unconfigured value, + // a value == 0 disables the limiting and + // a value > 0 enabled the usage of the specified size. blobLimit int64 } @@ -57,6 +61,13 @@ var ( _ config.Configurable = (*RepositoryImpl)(nil) ) +// NewRepository creates a new OCM repository based on any OCI abstraction from +// the OCI context type. +// The optional blobLimit is the size limit for layers maintained for the storage of localBlobs. +// The value -1 means an unconfigured value, +// a value == 0 disables the limiting and +// a value > 0 enabled the usage of the specified size. +// If this parameter is not given -1 is assumed. func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) @@ -66,8 +77,8 @@ func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocir ocirepo: ocirepo, blobLimit: general.OptionalDefaulted(-1, blobLimit...), } - if len(blobLimit) == 0 { - ctxp.OCMContext().ConfigContext().ApplyTo(0, impl) + if impl.blobLimit < 0 { + ConfigureBlobLimits(ctxp.OCMContext(), impl) } return repocpi.NewRepository(impl, "OCM repo[OCI]") } @@ -92,8 +103,9 @@ func (r *RepositoryImpl) ConfigureBlobLimits(limits config.BlobLimits) { } } -func (r *RepositoryImpl) SetBlobLimit(s int64) { +func (r *RepositoryImpl) SetBlobLimit(s int64) bool { r.blobLimit = s + return true } func (r *RepositoryImpl) GetBlobLimit() int64 { diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go index 042794c8ee..00b0771b02 100644 --- a/api/utils/accessio/chunkedreader.go +++ b/api/utils/accessio/chunkedreader.go @@ -15,23 +15,23 @@ import ( // It can be continued by Calling Next, which returns // whether a follow-up is required or not. type ChunkedReader struct { - lock sync.Mutex - reader io.Reader - buffer *bytes.Buffer - size int64 - chunk int - read int64 - err error + lock sync.Mutex + reader io.Reader + buffer *bytes.Buffer + size int64 + chunkNo int + read int64 + err error preread uint } var _ io.Reader = (*ChunkedReader)(nil) -func NewChunkedReader(r io.Reader, chunk int64, preread ...uint) *ChunkedReader { +func NewChunkedReader(r io.Reader, chunkSize int64, preread ...uint) *ChunkedReader { return &ChunkedReader{ reader: r, - size: chunk, + size: chunkSize, preread: general.OptionalDefaulted(8096, preread...), } } @@ -48,9 +48,9 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { } if c.buffer != nil && c.buffer.Len() > 0 { // first, consume from buffer - n, _ := c.buffer.Read(p) + n, err := c.buffer.Read(p) c.read += int64(n) - if c.buffer.Len() == 0 { + if err != nil { // the only error returned is io.EOF c.buffer = nil } return c.report(n, nil) @@ -78,7 +78,7 @@ func (c *ChunkedReader) ChunkNo() int { c.lock.Lock() defer c.lock.Unlock() - return c.chunk + return c.chunkNo } func (c *ChunkedReader) ChunkDone() bool { @@ -113,6 +113,6 @@ func (c *ChunkedReader) Next() bool { } c.read = 0 - c.chunk++ + c.chunkNo++ return true } From d4a74b36f4da2e3432f2b3e9b9168780428af0a5 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Mon, 9 Dec 2024 09:49:34 +0100 Subject: [PATCH 06/13] generate --- docs/reference/ocm_configfile.md | 44 +++++++++----------------------- 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/docs/reference/ocm_configfile.md b/docs/reference/ocm_configfile.md index 5ac8180bf6..08714d79bd 100644 --- a/docs/reference/ocm_configfile.md +++ b/docs/reference/ocm_configfile.md @@ -26,45 +26,25 @@ The following configuration types are supported: - blobLimits.ocireg.ocm.config.ocm.software The config type blobLimits.ocireg.ocm.config.ocm.software can be used to set some - configurations for an OCM context; + blob layer limits for particular OCI registries used to host OCM repositories;
       type: blobLimits.ocireg.ocm.config.ocm.software
-      aliases:
-         myrepo:
-            type: <any repository type>
-            <specification attributes>
-            ...
-      resolvers:
-        - repository:
-            type: <any repository type>
-            <specification attributes>
-            ...
-          prefix: ghcr.io/open-component-model/ocm
-          priority: 10
+      blobLimits:
+          dummy.io: 65564
+          dummy.io:8443: 32768
   
- With aliases repository alias names can be mapped to a repository specification. - The alias name can be used in a string notation for an OCM repository. - - Resolvers define a list of OCM repository specifications to be used to resolve - dedicated component versions. These settings are used to compose a standard - component version resolver provided for an OCM context. Optionally, a component - name prefix can be given. It limits the usage of the repository to resolve only - components with the given name prefix (always complete name segments). - An optional priority can be used to influence the lookup order. Larger value - means higher priority (default 10). + If blob limits apply to a registry, local blobs with a size larger than + the configured limit will be split into several layers with a maximum + size of the given value. - All matching entries are tried to lookup a component version in the following - order: - - highest priority first - - longest matching sequence of component name segments first. + These settings can be overwritten by explicit settings in an OCM + repository specification for those repositories. - If resolvers are defined, it is possible to use component version names on the - command line without a repository. The names are resolved with the specified - resolution rule. - They are also used as default lookup repositories to lookup component references - for recursive operations on component versions (--lookup option). + The most specific entry will be used. If a registry with a dedicated + port is requested, but no explicit such configuration is found, the + setting for the sole hostname is used (if configured). - cli.ocm.config.ocm.software The config type cli.ocm.config.ocm.software is used to handle the main configuration flags of the OCM command line tool. From 25693a9bdd6b129b062fae94ca77c5274d627441 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Mon, 9 Dec 2024 10:35:23 +0100 Subject: [PATCH 07/13] use LimitReader --- api/utils/accessio/chunkedreader.go | 52 ++++++------- api/utils/accessio/chunkedreader_test.go | 96 +++++++++++++++++++++++- 2 files changed, 114 insertions(+), 34 deletions(-) diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go index 00b0771b02..9ed1dcba2a 100644 --- a/api/utils/accessio/chunkedreader.go +++ b/api/utils/accessio/chunkedreader.go @@ -15,12 +15,13 @@ import ( // It can be continued by Calling Next, which returns // whether a follow-up is required or not. type ChunkedReader struct { - lock sync.Mutex - reader io.Reader - buffer *bytes.Buffer - size int64 - chunkNo int - read int64 + lock sync.Mutex + reader io.Reader + buffer *bytes.Buffer + chunkSize int64 + chunkNo int + + limited io.Reader err error preread uint @@ -30,9 +31,10 @@ var _ io.Reader = (*ChunkedReader)(nil) func NewChunkedReader(r io.Reader, chunkSize int64, preread ...uint) *ChunkedReader { return &ChunkedReader{ - reader: r, - size: chunkSize, - preread: general.OptionalDefaulted(8096, preread...), + reader: r, + chunkSize: chunkSize, + limited: io.LimitReader(r, chunkSize), + preread: min(uint(chunkSize-1), general.OptionalDefaulted(8096, preread...)), } } @@ -40,38 +42,26 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { c.lock.Lock() defer c.lock.Unlock() - if c.read >= c.size { - return 0, io.EOF - } - if c.read+int64(len(p)) > c.size { - p = p[:c.size-c.read] // read at most rest of chunk size + if c.err != nil { + return 0, c.err } if c.buffer != nil && c.buffer.Len() > 0 { // first, consume from buffer n, err := c.buffer.Read(p) - c.read += int64(n) if err != nil { // the only error returned is io.EOF c.buffer = nil } - return c.report(n, nil) + if n > 0 { + return n, nil + } } else { c.buffer = nil } - if c.err != nil { - return 0, c.err - } - n, err = c.reader.Read(p) - c.read += int64(n) + n, err = c.limited.Read(p) c.err = err - return c.report(n, err) -} - -func (c *ChunkedReader) report(n int, err error) (int, error) { - if err == nil && c.read >= c.size { - err = io.EOF - } return n, err + } func (c *ChunkedReader) ChunkNo() int { @@ -85,14 +75,14 @@ func (c *ChunkedReader) ChunkDone() bool { c.lock.Lock() defer c.lock.Unlock() - return c.read >= c.size || c.err != nil + return c.err == io.EOF } func (c *ChunkedReader) Next() bool { c.lock.Lock() defer c.lock.Unlock() - if c.read < c.size || c.err != nil { + if c.err != io.EOF { return false } @@ -112,7 +102,7 @@ func (c *ChunkedReader) Next() bool { } } - c.read = 0 c.chunkNo++ + c.limited = io.LimitReader(c.reader, c.chunkSize-int64(c.preread)) return true } diff --git a/api/utils/accessio/chunkedreader_test.go b/api/utils/accessio/chunkedreader_test.go index 2b085dc156..08e8ee9652 100644 --- a/api/utils/accessio/chunkedreader_test.go +++ b/api/utils/accessio/chunkedreader_test.go @@ -10,6 +10,19 @@ import ( "ocm.software/ocm/api/utils/accessio" ) +func CheckEOF(r io.Reader, err error) { + var ( + buf [20]byte + n int + ) + + if err == nil { + n, err = r.Read(buf[:]) + ExpectWithOffset(1, n).To(Equal(0)) + } + ExpectWithOffset(1, err).To(Equal(io.EOF)) +} + var _ = Describe("Test Environment", func() { in := "12345678901234567890" var buf *bytes.Buffer @@ -19,6 +32,83 @@ var _ = Describe("Test Environment", func() { buf = bytes.NewBuffer([]byte(in)) }) + Context("max preread", func() { + BeforeEach(func() { + chunked = accessio.NewChunkedReader(buf, 5) + }) + + It("reports EOF and splits reader", func() { + var buf [30]byte + cnt := 0 + + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(5)) + Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + CheckEOF(chunked, err) + Expect(chunked.ChunkDone()).To(Equal(true)) + cnt += n + Expect(chunked.Next()).To(Equal(true)) + + for i := 0; i < 3; i++ { + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(4)) + Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + Expect(err).To(BeNil()) + Expect(chunked.ChunkDone()).To(Equal(false)) + cnt += n + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(1)) + CheckEOF(chunked, err) + Expect(chunked.ChunkDone()).To(Equal(true)) + cnt += n + + Expect(chunked.Next()).To(Equal(i != 2)) + } + Expect(chunked.Next()).To(Equal(false)) + }) + + It("keeps reporting EOF", func() { + var buf [30]byte + cnt := 0 + + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(5)) + Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + CheckEOF(chunked, err) + cnt += n + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + + Expect(chunked.ChunkDone()).To(Equal(true)) + Expect(chunked.Next()).To(Equal(true)) + + for i := 0; i < 3; i++ { + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(4)) + Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + Expect(err).To(BeNil()) + Expect(chunked.ChunkDone()).To(Equal(false)) + cnt += n + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(1)) + CheckEOF(chunked, err) + cnt += n + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + Expect(chunked.ChunkDone()).To(Equal(true)) + + Expect(chunked.Next()).To(Equal(i != 2)) + } + Expect(chunked.Next()).To(Equal(false)) + }) + }) + Context("complete", func() { BeforeEach(func() { chunked = accessio.NewChunkedReader(buf, 100, 2) @@ -74,8 +164,8 @@ var _ = Describe("Test Environment", func() { var buf [20]byte n, err := chunked.Read(buf[:]) Expect(n).To(Equal(20)) - Expect(err).To(Equal(io.EOF)) Expect(string(buf[:n])).To(Equal(in)) + CheckEOF(chunked, err) Expect(chunked.ChunkDone()).To(Equal(true)) Expect(chunked.Next()).To(Equal(false)) @@ -96,8 +186,8 @@ var _ = Describe("Test Environment", func() { n, err := chunked.Read(buf[:]) Expect(n).To(Equal(5)) - Expect(err).To(Equal(io.EOF)) Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + CheckEOF(chunked, err) Expect(chunked.ChunkDone()).To(Equal(true)) cnt += n Expect(chunked.Next()).To(Equal(true)) @@ -112,8 +202,8 @@ var _ = Describe("Test Environment", func() { n, err = chunked.Read(buf[:]) Expect(n).To(Equal(3)) - Expect(err).To(Equal(io.EOF)) Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + CheckEOF(chunked, err) Expect(chunked.ChunkDone()).To(Equal(true)) cnt += n From 0c39b66065e57b0631e11e1325e03a635da97571 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Mon, 9 Dec 2024 11:48:28 +0100 Subject: [PATCH 08/13] extract LookupReader --- api/utils/accessio/chunkedreader.go | 73 +++++++----------- api/utils/accessio/chunkedreader_test.go | 65 ++++++++-------- api/utils/accessio/lookaheadreader.go | 74 ++++++++++++++++++ api/utils/accessio/lookaheadreader_test.go | 89 ++++++++++++++++++++++ 4 files changed, 226 insertions(+), 75 deletions(-) create mode 100644 api/utils/accessio/lookaheadreader.go create mode 100644 api/utils/accessio/lookaheadreader_test.go diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go index 9ed1dcba2a..3dd99e9bde 100644 --- a/api/utils/accessio/chunkedreader.go +++ b/api/utils/accessio/chunkedreader.go @@ -1,40 +1,35 @@ package accessio import ( - "bytes" "io" "sync" - "github.com/mandelsoft/goutils/general" + "github.com/mandelsoft/goutils/errors" ) // ChunkedReader splits a reader into several // logical readers with a limited content size. -// Once the reader reaches its limits it provides -// a io.EOF. +// Once the reader reaches its limit it provides +// an io.EOF. // It can be continued by Calling Next, which returns // whether a follow-up is required or not. type ChunkedReader struct { lock sync.Mutex - reader io.Reader - buffer *bytes.Buffer + reader *LookAheadReader chunkSize int64 chunkNo int - limited io.Reader + limited *io.LimitedReader err error - - preread uint } var _ io.Reader = (*ChunkedReader)(nil) -func NewChunkedReader(r io.Reader, chunkSize int64, preread ...uint) *ChunkedReader { +func NewChunkedReader(r io.Reader, chunkSize int64) *ChunkedReader { return &ChunkedReader{ - reader: r, + reader: NewLookAheadReader(r), chunkSize: chunkSize, - limited: io.LimitReader(r, chunkSize), - preread: min(uint(chunkSize-1), general.OptionalDefaulted(8096, preread...)), + limited: &io.LimitedReader{r, chunkSize}, } } @@ -42,28 +37,13 @@ func (c *ChunkedReader) Read(p []byte) (n int, err error) { c.lock.Lock() defer c.lock.Unlock() - if c.err != nil { - return 0, c.err - } - if c.buffer != nil && c.buffer.Len() > 0 { - // first, consume from buffer - n, err := c.buffer.Read(p) - if err != nil { // the only error returned is io.EOF - c.buffer = nil - } - if n > 0 { - return n, nil - } - } else { - c.buffer = nil - } - n, err = c.limited.Read(p) c.err = err return n, err - } +// ChunkNo returns the number previously +// provided chunks. func (c *ChunkedReader) ChunkNo() int { c.lock.Lock() defer c.lock.Unlock() @@ -71,38 +51,43 @@ func (c *ChunkedReader) ChunkNo() int { return c.chunkNo } +// ChunkDone returns true, if the actual +// chunk is completely read. func (c *ChunkedReader) ChunkDone() bool { c.lock.Lock() defer c.lock.Unlock() - return c.err == io.EOF + return errors.Is(c.err, io.EOF) } +// Next returns true, if a followup chunk +// has been prepared for the reader. +// If called while the current chunk is not yet completed +// it always returns false (check by calling ChunkDone). func (c *ChunkedReader) Next() bool { c.lock.Lock() defer c.lock.Unlock() - if c.err != io.EOF { + if !errors.Is(c.err, io.EOF) { return false } + if c.limited.N > 0 { + // don't need to check for more data if EOF is + // provided before chunk size is reached. + return false + } // cannot assume that read with size 0 returns EOF as proposed // by io.Reader.Read (see bytes.Buffer.Read). // Therefore, we really have to read something. - if c.buffer == nil { - buf := make([]byte, c.preread) - n, err := c.reader.Read(buf) - c.err = err - if n > 0 { - c.buffer = bytes.NewBuffer(buf[:n]) - } else { - if err == io.EOF { - return false - } - } + var buf [1]byte + n, err := c.reader.LookAhead(buf[:]) + if n == 0 && errors.Is(err, io.EOF) { + return false } c.chunkNo++ - c.limited = io.LimitReader(c.reader, c.chunkSize-int64(c.preread)) + c.err = nil + c.limited = &io.LimitedReader{c.reader, c.chunkSize} return true } diff --git a/api/utils/accessio/chunkedreader_test.go b/api/utils/accessio/chunkedreader_test.go index 08e8ee9652..4076bb0655 100644 --- a/api/utils/accessio/chunkedreader_test.go +++ b/api/utils/accessio/chunkedreader_test.go @@ -23,7 +23,7 @@ func CheckEOF(r io.Reader, err error) { ExpectWithOffset(1, err).To(Equal(io.EOF)) } -var _ = Describe("Test Environment", func() { +var _ = Describe("ChunkedReader", func() { in := "12345678901234567890" var buf *bytes.Buffer var chunked *accessio.ChunkedReader @@ -51,14 +51,8 @@ var _ = Describe("Test Environment", func() { for i := 0; i < 3; i++ { n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(4)) + Expect(n).To(Equal(5)) Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - Expect(err).To(BeNil()) - Expect(chunked.ChunkDone()).To(Equal(false)) - cnt += n - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(1)) CheckEOF(chunked, err) Expect(chunked.ChunkDone()).To(Equal(true)) cnt += n @@ -87,21 +81,11 @@ var _ = Describe("Test Environment", func() { for i := 0; i < 3; i++ { n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(4)) + Expect(n).To(Equal(5)) Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - Expect(err).To(BeNil()) - Expect(chunked.ChunkDone()).To(Equal(false)) - cnt += n - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(1)) CheckEOF(chunked, err) - cnt += n - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) Expect(chunked.ChunkDone()).To(Equal(true)) + cnt += n Expect(chunked.Next()).To(Equal(i != 2)) } @@ -111,7 +95,7 @@ var _ = Describe("Test Environment", func() { Context("complete", func() { BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 100, 2) + chunked = accessio.NewChunkedReader(buf, 100) }) It("reports EOF", func() { @@ -155,9 +139,35 @@ var _ = Describe("Test Environment", func() { }) }) + Context("non-matching chunk size", func() { + BeforeEach(func() { + chunked = accessio.NewChunkedReader(buf, 15) + }) + It("reports EOF and Next with non-matching size", func() { + var buf [20]byte + n, err := chunked.Read(buf[:]) + Expect(n).To(Equal(15)) + CheckEOF(chunked, err) + Expect(string(buf[:n])).To(Equal(in[:15])) + Expect(chunked.ChunkDone()).To(Equal(true)) + Expect(chunked.Next()).To(Equal(true)) + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(5)) + Expect(string(buf[:n])).To(Equal(in[15:20])) + CheckEOF(chunked, err) + Expect(chunked.ChunkDone()).To(Equal(true)) + Expect(chunked.Next()).To(Equal(false)) + + n, err = chunked.Read(buf[:]) + Expect(n).To(Equal(0)) + Expect(err).To(Equal(io.EOF)) + }) + }) + Context("chunk size matches read size", func() { BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 20, 2) + chunked = accessio.NewChunkedReader(buf, 20) }) It("reports EOF with matched size", func() { @@ -177,7 +187,7 @@ var _ = Describe("Test Environment", func() { Context("split", func() { BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 5, 2) + chunked = accessio.NewChunkedReader(buf, 5) }) It("reports EOF and splits reader", func() { @@ -194,15 +204,8 @@ var _ = Describe("Test Environment", func() { for i := 0; i < 3; i++ { n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(2)) - Expect(err).To(BeNil()) - Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - Expect(chunked.ChunkDone()).To(Equal(false)) - cnt += n - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(3)) Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) + Expect(n).To(Equal(5)) CheckEOF(chunked, err) Expect(chunked.ChunkDone()).To(Equal(true)) cnt += n diff --git a/api/utils/accessio/lookaheadreader.go b/api/utils/accessio/lookaheadreader.go new file mode 100644 index 0000000000..66cacdf694 --- /dev/null +++ b/api/utils/accessio/lookaheadreader.go @@ -0,0 +1,74 @@ +package accessio + +import ( + "bytes" + "io" + "sync" +) + +// LookAheadReader is an io.Reader which additionally +// provides a look ahead of upcoming data, which does not +// affect the regular reader. +type LookAheadReader struct { + lock sync.Mutex + reader io.Reader + buffer *bytes.Buffer +} + +func NewLookAheadReader(r io.Reader) *LookAheadReader { + return &LookAheadReader{ + reader: r, + } +} + +func (r *LookAheadReader) Read(p []byte) (int, error) { + r.lock.Lock() + defer r.lock.Unlock() + + return r.read(p) +} + +func (r *LookAheadReader) read(p []byte) (n int, err error) { + if r.buffer != nil && r.buffer.Len() > 0 { + // first, consume from buffer + n, err = r.buffer.Read(p) + if err != nil { // the only error returned is io.EOF + r.buffer = nil + } + } else { + r.buffer = nil + } + + if n >= len(p) { + return n, nil + } + + cnt, err := r.reader.Read(p[n:]) + return cnt + n, err +} + +// LookAhead provides a preview of upcoming data. +// It tries to fill the complete given buffer. +// The regular data stream provided by ead is not affected. +func (r *LookAheadReader) LookAhead(p []byte) (n int, err error) { + r.lock.Lock() + defer r.lock.Unlock() + + cnt := 0 + for cnt < len(p) { + n, err = r.read(p[cnt:]) + if err != nil { + break + } + cnt += n + } + + if cnt > 0 { + if r.buffer == nil { + r.buffer = bytes.NewBuffer(nil) + } + r.buffer.Write(p[:cnt]) + } + + return cnt, err +} diff --git a/api/utils/accessio/lookaheadreader_test.go b/api/utils/accessio/lookaheadreader_test.go new file mode 100644 index 0000000000..d819ae1d53 --- /dev/null +++ b/api/utils/accessio/lookaheadreader_test.go @@ -0,0 +1,89 @@ +package accessio_test + +import ( + "bytes" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "ocm.software/ocm/api/utils/accessio" +) + +var _ = Describe("LookAheadReader", func() { + in := "12345678901234567890" + var buf *bytes.Buffer + var lookup *accessio.LookAheadReader + + BeforeEach(func() { + buf = bytes.NewBuffer([]byte(in)) + }) + + Context("read", func() { + BeforeEach(func() { + lookup = accessio.NewLookAheadReader(buf) + }) + + It("reads all", func() { + var buf [30]byte + + n, err := lookup.Read(buf[:]) + Expect(n).To(Equal(20)) + Expect(string(buf[:n])).To(Equal(in)) + CheckEOF(lookup, err) + }) + + It("looksup", func() { + var buf [30]byte + + n, err := lookup.Read(buf[:2]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(2)) + Expect(string(buf[:n])).To(Equal(in[:2])) + + n, err = lookup.LookAhead(buf[:5]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(5)) + Expect(string(buf[:n])).To(Equal(in[2:7])) + + n, err = lookup.Read(buf[:3]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(3)) + Expect(string(buf[:n])).To(Equal(in[2:5])) + + n, err = lookup.Read(buf[:]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(15)) + Expect(string(buf[:n])).To(Equal(in[5:20])) + }) + + It("interferring lookup", func() { + var buf [30]byte + + n, err := lookup.Read(buf[:2]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(2)) + Expect(string(buf[:n])).To(Equal(in[:2])) + + n, err = lookup.LookAhead(buf[:5]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(5)) + Expect(string(buf[:n])).To(Equal(in[2:7])) + + n, err = lookup.Read(buf[:3]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(3)) + Expect(string(buf[:n])).To(Equal(in[2:5])) + + n, err = lookup.LookAhead(buf[:5]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(5)) + Expect(string(buf[:n])).To(Equal(in[5:10])) + + n, err = lookup.Read(buf[:]) + Expect(err).To(BeNil()) + Expect(n).To(Equal(15)) + Expect(string(buf[:n])).To(Equal(in[5:20])) + }) + + }) +}) From b6878e36376bb18a42131424bef1495689eeb503 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Wed, 11 Dec 2024 09:59:32 +0100 Subject: [PATCH 09/13] fix typo --- api/utils/accessio/lookaheadreader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/utils/accessio/lookaheadreader.go b/api/utils/accessio/lookaheadreader.go index 66cacdf694..b2ee53b86f 100644 --- a/api/utils/accessio/lookaheadreader.go +++ b/api/utils/accessio/lookaheadreader.go @@ -49,7 +49,7 @@ func (r *LookAheadReader) read(p []byte) (n int, err error) { // LookAhead provides a preview of upcoming data. // It tries to fill the complete given buffer. -// The regular data stream provided by ead is not affected. +// The regular data stream provided by Read is not affected. func (r *LookAheadReader) LookAhead(p []byte) (n int, err error) { r.lock.Lock() defer r.lock.Unlock() From c4e1573019742c443fdedef76de143c5889a2ed6 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Wed, 11 Dec 2024 14:32:30 +0100 Subject: [PATCH 10/13] get rid on named parameters --- api/utils/accessio/lookaheadreader.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/api/utils/accessio/lookaheadreader.go b/api/utils/accessio/lookaheadreader.go index b2ee53b86f..7a08d33813 100644 --- a/api/utils/accessio/lookaheadreader.go +++ b/api/utils/accessio/lookaheadreader.go @@ -28,7 +28,12 @@ func (r *LookAheadReader) Read(p []byte) (int, error) { return r.read(p) } -func (r *LookAheadReader) read(p []byte) (n int, err error) { +func (r *LookAheadReader) read(p []byte) (int, error) { + var ( + n int + err error + ) + if r.buffer != nil && r.buffer.Len() > 0 { // first, consume from buffer n, err = r.buffer.Read(p) From b58d06da26d73adb9157c6075632b68373c02661 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Wed, 11 Dec 2024 18:05:25 +0100 Subject: [PATCH 11/13] simplified blob splitting --- api/ocm/cpi/repocpi/bridge_r.go | 2 +- .../repositories/genericocireg/bloblimits.go | 4 +- .../genericocireg/componentversion.go | 47 ++-- .../repositories/genericocireg/config/type.go | 7 +- .../repositories/genericocireg/repo_test.go | 85 ++++++- .../repositories/genericocireg/repository.go | 4 +- api/utils/accessio/chunkedreader.go | 93 ------- api/utils/accessio/chunkedreader_test.go | 227 ------------------ api/utils/accessio/lookaheadreader.go | 79 ------ api/utils/accessio/lookaheadreader_test.go | 89 ------- api/utils/blobaccess/chunked/access.go | 75 ------ api/utils/blobaccess/chunked/chunked_test.go | 112 --------- api/utils/blobaccess/chunked/suite_test.go | 13 - 13 files changed, 118 insertions(+), 719 deletions(-) delete mode 100644 api/utils/accessio/chunkedreader.go delete mode 100644 api/utils/accessio/chunkedreader_test.go delete mode 100644 api/utils/accessio/lookaheadreader.go delete mode 100644 api/utils/accessio/lookaheadreader_test.go delete mode 100644 api/utils/blobaccess/chunked/access.go delete mode 100644 api/utils/blobaccess/chunked/chunked_test.go delete mode 100644 api/utils/blobaccess/chunked/suite_test.go diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go index c9ffb8ed50..53ed151f5a 100644 --- a/api/ocm/cpi/repocpi/bridge_r.go +++ b/api/ocm/cpi/repocpi/bridge_r.go @@ -43,7 +43,7 @@ type Chunked interface { SetBlobLimit(s int64) bool } -// SetBlobLimit tries to set a blob limt for a repository +// SetBlobLimit tries to set a blob limit for a repository // implementation. It returns true, if this was possible. func SetBlobLimit(i RepositoryImpl, s int64) bool { if c, ok := i.(Chunked); ok { diff --git a/api/ocm/extensions/repositories/genericocireg/bloblimits.go b/api/ocm/extensions/repositories/genericocireg/bloblimits.go index 2ecb09d690..1fd82927ee 100644 --- a/api/ocm/extensions/repositories/genericocireg/bloblimits.go +++ b/api/ocm/extensions/repositories/genericocireg/bloblimits.go @@ -23,8 +23,8 @@ func init() { // AddDefaultBlobLimit can be used to set default blob limits // for known repositories. // Those limits will be overwritten, by blob limits -// given by a configuration ovject and the repository -// specification +// given by a configuration object and the repository +// specification. func AddDefaultBlobLimit(name string, limit int64) { lock.Lock() defer lock.Unlock() diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go index f415fd9f1b..d332ca2527 100644 --- a/api/ocm/extensions/repositories/genericocireg/componentversion.go +++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go @@ -2,11 +2,13 @@ package genericocireg import ( "fmt" + "io" "path" "strings" "github.com/mandelsoft/goutils/errors" "github.com/mandelsoft/goutils/set" + "github.com/mandelsoft/vfs/pkg/vfs" "github.com/opencontainers/go-digest" "ocm.software/ocm/api/datacontext/attrs/vfsattr" @@ -26,8 +28,7 @@ import ( ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci" "ocm.software/ocm/api/utils/accessio" "ocm.software/ocm/api/utils/accessobj" - "ocm.software/ocm/api/utils/blobaccess/blobaccess" - "ocm.software/ocm/api/utils/blobaccess/chunked" + "ocm.software/ocm/api/utils/blobaccess" "ocm.software/ocm/api/utils/errkind" "ocm.software/ocm/api/utils/mime" common "ocm.software/ocm/api/utils/misc" @@ -316,11 +317,29 @@ func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext { return ocihdlr.New(c.comp.GetName(), c.Repository(), c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest) } +func blobAccessForChunk(blob blobaccess.BlobAccess, fs vfs.FileSystem, r io.Reader, limit int64) (cpi.BlobAccess, bool, error) { + f, err := blobaccess.NewTempFile("", "chunk-*", fs) + if err != nil { + return nil, true, err + } + written, err := io.CopyN(f.Writer(), r, limit) + if err != nil && !errors.Is(err, io.EOF) { + f.Close() + return nil, false, err + } + if written <= 0 { + f.Close() + return nil, false, nil + } + return f.AsBlob(blob.MimeType()), written == limit, nil +} + func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) { if blob == nil { return nil, errors.New("a resource has to be defined") } + fs := vfsattr.Get(c.GetContext()) size := blob.Size() limit := c.comp.repo.blobLimit var refs []string @@ -330,20 +349,20 @@ func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, return nil, err } defer reader.Close() - ch := chunked.New(reader, limit, vfsattr.Get(c.GetContext())) - for { - b, err := ch.Next() - if err != nil { - return nil, errors.Wrapf(err, "chunked blob") - } - if b == nil { - break - } - err = c.addLayer(b, &refs) - b.Close() + var b blobaccess.BlobAccess + cont := true + for cont { + b, cont, err = blobAccessForChunk(blob, fs, reader, limit) if err != nil { - return nil, errors.Wrapf(err, "chunked blob") + return nil, err + } + if b != nil { + err = c.addLayer(b, &refs) + b.Close() + if err != nil { + return nil, err + } } } } else { diff --git a/api/ocm/extensions/repositories/genericocireg/config/type.go b/api/ocm/extensions/repositories/genericocireg/config/type.go index fae19804b9..137c1d2688 100644 --- a/api/ocm/extensions/repositories/genericocireg/config/type.go +++ b/api/ocm/extensions/repositories/genericocireg/config/type.go @@ -87,13 +87,14 @@ func (a *Config) ApplyTo(ctx config.Context, target interface{}) error { const usage = ` The config type ` + ConfigType + ` can be used to set some -blob layer limits for particular OCI registries used to host OCM repositories; +blob layer limits for particular OCI registries used to host OCM repositories. +The blobLimits field maps a OCI registry address to the blob limit to use:
     type: ` + ConfigType + `
     blobLimits:
         dummy.io: 65564
-        dummy.io:8443: 32768
+        dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit
 
If blob limits apply to a registry, local blobs with a size larger than @@ -104,6 +105,6 @@ These settings can be overwritten by explicit settings in an OCM repository specification for those repositories. The most specific entry will be used. If a registry with a dedicated -port is requested, but no explicit such configuration is found, the +port is requested, but no explicit configuration is found, the setting for the sole hostname is used (if configured). ` diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 295d31b69d..a892bf0f9c 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -124,14 +124,18 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) - DescribeTable("creates a dummy component with chunks", func(f func(ocm.ResourceAccess)) { + const ref4 = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08,sha256:3a6eb0790f39ac87c94f3856b2dd2c5d110e6811602261a9a923d3bb23adc8b7" + const ref5 = "sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d" + const ref8 = "sha256:" + ocmtesthelper.D_TESTDATA + + DescribeTable("creates a dummy component with chunks", func(limit int, f func(ocm.ResourceAccess), ref string) { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) impl := Must(repocpi.GetRepositoryImplementation(repo)) Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) - repocpi.SetBlobLimit(impl, 5) + repocpi.SetBlobLimit(impl, int64(limit)) comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1"))) @@ -167,7 +171,8 @@ var _ = Describe("component repository mapping", func() { acc := Must(rsc.Access()) local, ok := acc.(*localblob.AccessSpec) Expect(ok).To(BeTrue()) - Expect(local.LocalReference).To(Equal("sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d")) + // fmt.Printf("localref: %s\n", local.LocalReference) + Expect(local.LocalReference).To(Equal(ref)) Expect(rsc.Meta().Digest).NotTo(BeNil()) Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) @@ -175,16 +180,78 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }, - Entry("get blob", func(rsc ocm.ResourceAccess) { + Entry("get blob", 5, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob", 5, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob with small buffer", 5, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + + Entry("get blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob with small buffer (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + + Entry("get blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { data := Must(ocmutils.GetResourceData(rsc)) Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) - }), - Entry("stream blob", func(rsc ocm.ResourceAccess) { + }, ref4), + Entry("stream blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { r := Must(ocmutils.GetResourceReader(rsc)) data := Must(io.ReadAll(r)) Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) - }), - Entry("stream blob with small buffer", func(rsc ocm.ResourceAccess) { + }, ref4), + Entry("stream blob with small buffer (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { var buf [2]byte var data []byte @@ -204,7 +271,7 @@ var _ = Describe("component repository mapping", func() { } } Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) - }), + }, ref4), ) It("handles legacylocalociblob access method", func() { diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 2a47a27b3a..a9ec886c6d 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -49,8 +49,8 @@ type RepositoryImpl struct { ocirepo oci.Repository readonly bool // blobLimit is the size limit for layers maintained for the storage of localBlobs. - // The value -1 means an unconfigured value, - // a value == 0 disables the limiting and + // The value -1 means an unconfigured value (a default from the blob limit configuration is used), + // a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), // a value > 0 enabled the usage of the specified size. blobLimit int64 } diff --git a/api/utils/accessio/chunkedreader.go b/api/utils/accessio/chunkedreader.go deleted file mode 100644 index 3dd99e9bde..0000000000 --- a/api/utils/accessio/chunkedreader.go +++ /dev/null @@ -1,93 +0,0 @@ -package accessio - -import ( - "io" - "sync" - - "github.com/mandelsoft/goutils/errors" -) - -// ChunkedReader splits a reader into several -// logical readers with a limited content size. -// Once the reader reaches its limit it provides -// an io.EOF. -// It can be continued by Calling Next, which returns -// whether a follow-up is required or not. -type ChunkedReader struct { - lock sync.Mutex - reader *LookAheadReader - chunkSize int64 - chunkNo int - - limited *io.LimitedReader - err error -} - -var _ io.Reader = (*ChunkedReader)(nil) - -func NewChunkedReader(r io.Reader, chunkSize int64) *ChunkedReader { - return &ChunkedReader{ - reader: NewLookAheadReader(r), - chunkSize: chunkSize, - limited: &io.LimitedReader{r, chunkSize}, - } -} - -func (c *ChunkedReader) Read(p []byte) (n int, err error) { - c.lock.Lock() - defer c.lock.Unlock() - - n, err = c.limited.Read(p) - c.err = err - return n, err -} - -// ChunkNo returns the number previously -// provided chunks. -func (c *ChunkedReader) ChunkNo() int { - c.lock.Lock() - defer c.lock.Unlock() - - return c.chunkNo -} - -// ChunkDone returns true, if the actual -// chunk is completely read. -func (c *ChunkedReader) ChunkDone() bool { - c.lock.Lock() - defer c.lock.Unlock() - - return errors.Is(c.err, io.EOF) -} - -// Next returns true, if a followup chunk -// has been prepared for the reader. -// If called while the current chunk is not yet completed -// it always returns false (check by calling ChunkDone). -func (c *ChunkedReader) Next() bool { - c.lock.Lock() - defer c.lock.Unlock() - - if !errors.Is(c.err, io.EOF) { - return false - } - - if c.limited.N > 0 { - // don't need to check for more data if EOF is - // provided before chunk size is reached. - return false - } - // cannot assume that read with size 0 returns EOF as proposed - // by io.Reader.Read (see bytes.Buffer.Read). - // Therefore, we really have to read something. - var buf [1]byte - n, err := c.reader.LookAhead(buf[:]) - if n == 0 && errors.Is(err, io.EOF) { - return false - } - - c.chunkNo++ - c.err = nil - c.limited = &io.LimitedReader{c.reader, c.chunkSize} - return true -} diff --git a/api/utils/accessio/chunkedreader_test.go b/api/utils/accessio/chunkedreader_test.go deleted file mode 100644 index 4076bb0655..0000000000 --- a/api/utils/accessio/chunkedreader_test.go +++ /dev/null @@ -1,227 +0,0 @@ -package accessio_test - -import ( - "bytes" - "io" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "ocm.software/ocm/api/utils/accessio" -) - -func CheckEOF(r io.Reader, err error) { - var ( - buf [20]byte - n int - ) - - if err == nil { - n, err = r.Read(buf[:]) - ExpectWithOffset(1, n).To(Equal(0)) - } - ExpectWithOffset(1, err).To(Equal(io.EOF)) -} - -var _ = Describe("ChunkedReader", func() { - in := "12345678901234567890" - var buf *bytes.Buffer - var chunked *accessio.ChunkedReader - - BeforeEach(func() { - buf = bytes.NewBuffer([]byte(in)) - }) - - Context("max preread", func() { - BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 5) - }) - - It("reports EOF and splits reader", func() { - var buf [30]byte - cnt := 0 - - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - CheckEOF(chunked, err) - Expect(chunked.ChunkDone()).To(Equal(true)) - cnt += n - Expect(chunked.Next()).To(Equal(true)) - - for i := 0; i < 3; i++ { - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - CheckEOF(chunked, err) - Expect(chunked.ChunkDone()).To(Equal(true)) - cnt += n - - Expect(chunked.Next()).To(Equal(i != 2)) - } - Expect(chunked.Next()).To(Equal(false)) - }) - - It("keeps reporting EOF", func() { - var buf [30]byte - cnt := 0 - - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - CheckEOF(chunked, err) - cnt += n - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) - - Expect(chunked.ChunkDone()).To(Equal(true)) - Expect(chunked.Next()).To(Equal(true)) - - for i := 0; i < 3; i++ { - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - CheckEOF(chunked, err) - Expect(chunked.ChunkDone()).To(Equal(true)) - cnt += n - - Expect(chunked.Next()).To(Equal(i != 2)) - } - Expect(chunked.Next()).To(Equal(false)) - }) - }) - - Context("complete", func() { - BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 100) - }) - - It("reports EOF", func() { - var buf [30]byte - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(20)) - Expect(err).To(BeNil()) - Expect(string(buf[:n])).To(Equal(in)) - Expect(chunked.ChunkDone()).To(Equal(false)) - Expect(chunked.Next()).To(Equal(false)) - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) - Expect(chunked.ChunkDone()).To(Equal(true)) - Expect(chunked.Next()).To(Equal(false)) - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) - }) - - It("reports EOF with matched size", func() { - var buf [20]byte - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(20)) - Expect(err).To(BeNil()) - Expect(string(buf[:n])).To(Equal(in)) - Expect(chunked.ChunkDone()).To(Equal(false)) - Expect(chunked.Next()).To(Equal(false)) - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) - Expect(chunked.ChunkDone()).To(Equal(true)) - Expect(chunked.Next()).To(Equal(false)) - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) - }) - }) - - Context("non-matching chunk size", func() { - BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 15) - }) - It("reports EOF and Next with non-matching size", func() { - var buf [20]byte - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(15)) - CheckEOF(chunked, err) - Expect(string(buf[:n])).To(Equal(in[:15])) - Expect(chunked.ChunkDone()).To(Equal(true)) - Expect(chunked.Next()).To(Equal(true)) - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[15:20])) - CheckEOF(chunked, err) - Expect(chunked.ChunkDone()).To(Equal(true)) - Expect(chunked.Next()).To(Equal(false)) - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) - }) - }) - - Context("chunk size matches read size", func() { - BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 20) - }) - - It("reports EOF with matched size", func() { - var buf [20]byte - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(20)) - Expect(string(buf[:n])).To(Equal(in)) - CheckEOF(chunked, err) - Expect(chunked.ChunkDone()).To(Equal(true)) - Expect(chunked.Next()).To(Equal(false)) - - n, err = chunked.Read(buf[:]) - Expect(n).To(Equal(0)) - Expect(err).To(Equal(io.EOF)) - }) - }) - - Context("split", func() { - BeforeEach(func() { - chunked = accessio.NewChunkedReader(buf, 5) - }) - - It("reports EOF and splits reader", func() { - var buf [30]byte - cnt := 0 - - n, err := chunked.Read(buf[:]) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - CheckEOF(chunked, err) - Expect(chunked.ChunkDone()).To(Equal(true)) - cnt += n - Expect(chunked.Next()).To(Equal(true)) - - for i := 0; i < 3; i++ { - n, err := chunked.Read(buf[:]) - Expect(string(buf[:n])).To(Equal(in[cnt : cnt+n])) - Expect(n).To(Equal(5)) - CheckEOF(chunked, err) - Expect(chunked.ChunkDone()).To(Equal(true)) - cnt += n - - Expect(chunked.Next()).To(Equal(i != 2)) - } - Expect(chunked.Next()).To(Equal(false)) - }) - }) -}) - -func check(chunked *accessio.ChunkedReader, n int, buf []byte, exp int, data string, done, next bool) { - ExpectWithOffset(1, n).To(Equal(exp)) - ExpectWithOffset(1, string(buf[:n])).To(Equal(data)) - - ExpectWithOffset(1, chunked.ChunkDone()).To(Equal(done)) - ExpectWithOffset(1, chunked.Next()).To(Equal(next)) - -} diff --git a/api/utils/accessio/lookaheadreader.go b/api/utils/accessio/lookaheadreader.go deleted file mode 100644 index 7a08d33813..0000000000 --- a/api/utils/accessio/lookaheadreader.go +++ /dev/null @@ -1,79 +0,0 @@ -package accessio - -import ( - "bytes" - "io" - "sync" -) - -// LookAheadReader is an io.Reader which additionally -// provides a look ahead of upcoming data, which does not -// affect the regular reader. -type LookAheadReader struct { - lock sync.Mutex - reader io.Reader - buffer *bytes.Buffer -} - -func NewLookAheadReader(r io.Reader) *LookAheadReader { - return &LookAheadReader{ - reader: r, - } -} - -func (r *LookAheadReader) Read(p []byte) (int, error) { - r.lock.Lock() - defer r.lock.Unlock() - - return r.read(p) -} - -func (r *LookAheadReader) read(p []byte) (int, error) { - var ( - n int - err error - ) - - if r.buffer != nil && r.buffer.Len() > 0 { - // first, consume from buffer - n, err = r.buffer.Read(p) - if err != nil { // the only error returned is io.EOF - r.buffer = nil - } - } else { - r.buffer = nil - } - - if n >= len(p) { - return n, nil - } - - cnt, err := r.reader.Read(p[n:]) - return cnt + n, err -} - -// LookAhead provides a preview of upcoming data. -// It tries to fill the complete given buffer. -// The regular data stream provided by Read is not affected. -func (r *LookAheadReader) LookAhead(p []byte) (n int, err error) { - r.lock.Lock() - defer r.lock.Unlock() - - cnt := 0 - for cnt < len(p) { - n, err = r.read(p[cnt:]) - if err != nil { - break - } - cnt += n - } - - if cnt > 0 { - if r.buffer == nil { - r.buffer = bytes.NewBuffer(nil) - } - r.buffer.Write(p[:cnt]) - } - - return cnt, err -} diff --git a/api/utils/accessio/lookaheadreader_test.go b/api/utils/accessio/lookaheadreader_test.go deleted file mode 100644 index d819ae1d53..0000000000 --- a/api/utils/accessio/lookaheadreader_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package accessio_test - -import ( - "bytes" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "ocm.software/ocm/api/utils/accessio" -) - -var _ = Describe("LookAheadReader", func() { - in := "12345678901234567890" - var buf *bytes.Buffer - var lookup *accessio.LookAheadReader - - BeforeEach(func() { - buf = bytes.NewBuffer([]byte(in)) - }) - - Context("read", func() { - BeforeEach(func() { - lookup = accessio.NewLookAheadReader(buf) - }) - - It("reads all", func() { - var buf [30]byte - - n, err := lookup.Read(buf[:]) - Expect(n).To(Equal(20)) - Expect(string(buf[:n])).To(Equal(in)) - CheckEOF(lookup, err) - }) - - It("looksup", func() { - var buf [30]byte - - n, err := lookup.Read(buf[:2]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(2)) - Expect(string(buf[:n])).To(Equal(in[:2])) - - n, err = lookup.LookAhead(buf[:5]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[2:7])) - - n, err = lookup.Read(buf[:3]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(3)) - Expect(string(buf[:n])).To(Equal(in[2:5])) - - n, err = lookup.Read(buf[:]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(15)) - Expect(string(buf[:n])).To(Equal(in[5:20])) - }) - - It("interferring lookup", func() { - var buf [30]byte - - n, err := lookup.Read(buf[:2]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(2)) - Expect(string(buf[:n])).To(Equal(in[:2])) - - n, err = lookup.LookAhead(buf[:5]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[2:7])) - - n, err = lookup.Read(buf[:3]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(3)) - Expect(string(buf[:n])).To(Equal(in[2:5])) - - n, err = lookup.LookAhead(buf[:5]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(5)) - Expect(string(buf[:n])).To(Equal(in[5:10])) - - n, err = lookup.Read(buf[:]) - Expect(err).To(BeNil()) - Expect(n).To(Equal(15)) - Expect(string(buf[:n])).To(Equal(in[5:20])) - }) - - }) -}) diff --git a/api/utils/blobaccess/chunked/access.go b/api/utils/blobaccess/chunked/access.go deleted file mode 100644 index 73f498273e..0000000000 --- a/api/utils/blobaccess/chunked/access.go +++ /dev/null @@ -1,75 +0,0 @@ -package chunked - -import ( - "fmt" - "io" - "sync" - - "github.com/mandelsoft/vfs/pkg/vfs" - - "ocm.software/ocm/api/utils" - "ocm.software/ocm/api/utils/accessio" - "ocm.software/ocm/api/utils/blobaccess" - "ocm.software/ocm/api/utils/blobaccess/bpi" - "ocm.software/ocm/api/utils/mime" -) - -func newChunk(r io.Reader, fss ...vfs.FileSystem) (bpi.BlobAccess, error) { - t, err := blobaccess.NewTempFile("", "chunk-*", fss...) - if err != nil { - return nil, err - } - - _, err = io.Copy(t.Writer(), r) - if err != nil { - t.Close() - return nil, err - } - return t.AsBlob(mime.MIME_OCTET), nil -} - -// ChunkedBlobSource provides a sequence of -// bpi.BlobAccess objects. -type ChunkedBlobSource interface { - Next() (bpi.BlobAccess, error) -} - -type chunkedAccess struct { - lock sync.Mutex - chunksize int64 - reader *accessio.ChunkedReader - fs vfs.FileSystem - cont bool -} - -// New provides a sequence of -// bpi.BlobAccess objects for a given io.Reader -// each with a limited size. -// The provided blobs are temporarily stored -// on the filesystem and can therefore be kept -// and accessed any number of times until they are closed. -func New(r io.Reader, chunksize int64, fss ...vfs.FileSystem) ChunkedBlobSource { - reader := accessio.NewChunkedReader(r, chunksize) - return &chunkedAccess{ - chunksize: chunksize, - reader: reader, - fs: utils.FileSystem(fss...), - cont: false, - } -} - -func (r *chunkedAccess) Next() (bpi.BlobAccess, error) { - r.lock.Lock() - defer r.lock.Unlock() - - if r.cont { - if !r.reader.ChunkDone() { - return nil, fmt.Errorf("unexpected incomplete read") - } - if !r.reader.Next() { - return nil, nil - } - } - r.cont = true - return newChunk(r.reader, r.fs) -} diff --git a/api/utils/blobaccess/chunked/chunked_test.go b/api/utils/blobaccess/chunked/chunked_test.go deleted file mode 100644 index 958c94fd3c..0000000000 --- a/api/utils/blobaccess/chunked/chunked_test.go +++ /dev/null @@ -1,112 +0,0 @@ -package chunked_test - -import ( - "bytes" - "io" - - "github.com/mandelsoft/goutils/finalizer" - . "github.com/mandelsoft/goutils/testutils" - "github.com/mandelsoft/vfs/pkg/memoryfs" - "github.com/mandelsoft/vfs/pkg/vfs" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "ocm.software/ocm/api/utils/blobaccess/chunked" -) - -var _ = Describe("Chunked Blobs", func() { - var blobData = []byte("a1a2a3a4a5a6a7a8a9a0b1b2b3b4b5b6b7b8b9b0") - - var src chunked.ChunkedBlobSource - var fs vfs.FileSystem - - Context("blobs", func() { - BeforeEach(func() { - fs = memoryfs.New() - }) - - AfterEach(func() { - vfs.Cleanup(fs) - }) - - It("small blobs", func() { - src = chunked.New(bytes.NewBuffer(blobData), 50, fs) - n := 0 - buf := bytes.NewBuffer(nil) - - var finalize finalizer.Finalizer - defer finalize.Finalize() - for { - b := Must(src.Next()) - if b == nil { - break - } - n++ - finalize.Close(b) - r := Must(b.Reader()) - _, err := io.Copy(buf, r) - r.Close() - MustBeSuccessful(err) - } - Expect(n).To(Equal(1)) - Expect(buf.String()).To(Equal(string(blobData))) - - MustBeSuccessful(finalize.Finalize()) - list := Must(vfs.ReadDir(fs, "/")) - Expect(len(list)).To(Equal(0)) - }) - - It("matching blobs", func() { - src = chunked.New(bytes.NewBuffer(blobData), 40, fs) - n := 0 - buf := bytes.NewBuffer(nil) - - var finalize finalizer.Finalizer - defer finalize.Finalize() - for { - b := Must(src.Next()) - if b == nil { - break - } - n++ - finalize.Close(b) - r := Must(b.Reader()) - _, err := io.Copy(buf, r) - r.Close() - MustBeSuccessful(err) - } - Expect(n).To(Equal(1)) - Expect(buf.String()).To(Equal(string(blobData))) - - MustBeSuccessful(finalize.Finalize()) - list := Must(vfs.ReadDir(fs, "/")) - Expect(len(list)).To(Equal(0)) - }) - - It("large blobs", func() { - src = chunked.New(bytes.NewBuffer(blobData), 18, fs) - n := 0 - buf := bytes.NewBuffer(nil) - - var finalize finalizer.Finalizer - defer finalize.Finalize() - for { - b := Must(src.Next()) - if b == nil { - break - } - n++ - finalize.Close(b) - r := Must(b.Reader()) - _, err := io.Copy(buf, r) - r.Close() - MustBeSuccessful(err) - } - Expect(n).To(Equal(3)) - Expect(buf.String()).To(Equal(string(blobData))) - - MustBeSuccessful(finalize.Finalize()) - list := Must(vfs.ReadDir(fs, "/")) - Expect(len(list)).To(Equal(0)) - }) - }) -}) diff --git a/api/utils/blobaccess/chunked/suite_test.go b/api/utils/blobaccess/chunked/suite_test.go deleted file mode 100644 index 55e1b7ca35..0000000000 --- a/api/utils/blobaccess/chunked/suite_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package chunked_test - -import ( - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -func TestConfig(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "chunked blobs Test Suite") -} From 9da1dfc6547278257444f61dc1060c10fb03c891 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Thu, 12 Dec 2024 13:32:33 +0100 Subject: [PATCH 12/13] generate --- .../extensions/repositories/genericocireg/repository.go | 5 ++--- docs/reference/ocm_configfile.md | 7 ++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index a9ec886c6d..8773350ea5 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -64,10 +64,9 @@ var ( // NewRepository creates a new OCM repository based on any OCI abstraction from // the OCI context type. // The optional blobLimit is the size limit for layers maintained for the storage of localBlobs. -// The value -1 means an unconfigured value, -// a value == 0 disables the limiting and +// The value -1 means an unconfigured value (a default from the blob limit configuration is used), +// a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), // a value > 0 enabled the usage of the specified size. -// If this parameter is not given -1 is assumed. func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) diff --git a/docs/reference/ocm_configfile.md b/docs/reference/ocm_configfile.md index 08714d79bd..4f5282064d 100644 --- a/docs/reference/ocm_configfile.md +++ b/docs/reference/ocm_configfile.md @@ -26,13 +26,14 @@ The following configuration types are supported: - blobLimits.ocireg.ocm.config.ocm.software The config type blobLimits.ocireg.ocm.config.ocm.software can be used to set some - blob layer limits for particular OCI registries used to host OCM repositories; + blob layer limits for particular OCI registries used to host OCM repositories. + The blobLimits field maps a OCI registry address to the blob limit to use:
       type: blobLimits.ocireg.ocm.config.ocm.software
       blobLimits:
           dummy.io: 65564
-          dummy.io:8443: 32768
+          dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit
   
If blob limits apply to a registry, local blobs with a size larger than @@ -43,7 +44,7 @@ The following configuration types are supported: repository specification for those repositories. The most specific entry will be used. If a registry with a dedicated - port is requested, but no explicit such configuration is found, the + port is requested, but no explicit configuration is found, the setting for the sole hostname is used (if configured). - cli.ocm.config.ocm.software The config type cli.ocm.config.ocm.software is used to handle the From 56b5a4a7a4f71f4ec6692f4905a11deccf6e7610 Mon Sep 17 00:00:00 2001 From: Uwe Krueger Date: Thu, 12 Dec 2024 13:44:16 +0100 Subject: [PATCH 13/13] set ghcr blob limit --- .../extensions/repositories/genericocireg/bloblimits.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/api/ocm/extensions/repositories/genericocireg/bloblimits.go b/api/ocm/extensions/repositories/genericocireg/bloblimits.go index 1fd82927ee..c4ecfc799a 100644 --- a/api/ocm/extensions/repositories/genericocireg/bloblimits.go +++ b/api/ocm/extensions/repositories/genericocireg/bloblimits.go @@ -12,12 +12,19 @@ var ( lock sync.Mutex ) +const ( + KB = int64(1000) + MB = 1000 * KB + GB = 1000 * MB +) + func init() { defaultBlobLimits = config.BlobLimits{} // Add limits for known OCI repositories, here, // or provide init functions in specialized packages // by calling AddDefaultBlobLimit. + AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429 } // AddDefaultBlobLimit can be used to set default blob limits