diff --git a/api/credentials/identity/hostpath/identity.go b/api/credentials/identity/hostpath/identity.go
index baa2aad8a9..968412dda6 100644
--- a/api/credentials/identity/hostpath/identity.go
+++ b/api/credentials/identity/hostpath/identity.go
@@ -157,3 +157,14 @@ func PathPrefix(id cpi.ConsumerIdentity) string {
}
return strings.TrimPrefix(id[ID_PATHPREFIX], "/")
}
+
+func HostPort(id cpi.ConsumerIdentity) string {
+ if id == nil {
+ return ""
+ }
+ host := id[ID_HOSTNAME]
+ if port, ok := id[ID_PORT]; ok {
+ return host + ":" + port
+ }
+ return host
+}
diff --git a/api/oci/extensions/repositories/ocireg/repository.go b/api/oci/extensions/repositories/ocireg/repository.go
index 12fe7276de..cc2c845ac0 100644
--- a/api/oci/extensions/repositories/ocireg/repository.go
+++ b/api/oci/extensions/repositories/ocireg/repository.go
@@ -70,6 +70,7 @@ func NewRepository(ctx cpi.Context, spec *RepositorySpec, info *RepositoryInfo)
spec: spec,
info: info,
}
+ i.logger.Debug("created repository")
return cpi.NewRepository(i), nil
}
diff --git a/api/ocm/cpi/repocpi/bridge_r.go b/api/ocm/cpi/repocpi/bridge_r.go
index f7eea176fe..53ed151f5a 100644
--- a/api/ocm/cpi/repocpi/bridge_r.go
+++ b/api/ocm/cpi/repocpi/bridge_r.go
@@ -34,6 +34,24 @@ type RepositoryImpl interface {
io.Closer
}
+// Chunked is an optional interface, which
+// may be implemented to accept a blob limit for mapping
+// local blobs to an external storage system.
+type Chunked interface {
+ // SetBlobLimit sets the blob limit if possible.
+ // It returns true, if this was successful.
+ SetBlobLimit(s int64) bool
+}
+
+// SetBlobLimit tries to set a blob limit for a repository
+// implementation. It returns true, if this was possible.
+func SetBlobLimit(i RepositoryImpl, s int64) bool {
+ if c, ok := i.(Chunked); ok {
+ return c.SetBlobLimit(s)
+ }
+ return false
+}
+
type _repositoryBridgeBase = resource.ResourceImplBase[cpi.Repository]
type repositoryBridge struct {
diff --git a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go
index 94a0a85966..c661c9f294 100644
--- a/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go
+++ b/api/ocm/extensions/repositories/genericocireg/accessmethod_localblob.go
@@ -1,10 +1,14 @@
package genericocireg
import (
+ "bytes"
"io"
+ "os"
+ "strings"
"sync"
"github.com/mandelsoft/goutils/errors"
+ "github.com/mandelsoft/goutils/finalizer"
"github.com/opencontainers/go-digest"
"ocm.software/ocm/api/oci"
@@ -88,9 +92,19 @@ func (m *localBlobAccessMethod) getBlob() (blobaccess.DataAccess, error) {
return nil, errors.ErrNotImplemented("artifact blob synthesis")
}
}
- _, data, err := m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
- if err != nil {
- return nil, err
+ refs := strings.Split(m.spec.LocalReference, ",")
+
+ var (
+ data blobaccess.DataAccess
+ err error
+ )
+ if len(refs) < 2 {
+ _, data, err = m.namespace.GetBlobData(digest.Digest(m.spec.LocalReference))
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ data = &composedBlock{m, refs}
}
m.data = data
return m.data, err
@@ -111,3 +125,119 @@ func (m *localBlobAccessMethod) Get() ([]byte, error) {
func (m *localBlobAccessMethod) MimeType() string {
return m.spec.MediaType
}
+
+////////////////////////////////////////////////////////////////////////////////
+
+type composedBlock struct {
+ m *localBlobAccessMethod
+ refs []string
+}
+
+var _ blobaccess.DataAccess = (*composedBlock)(nil)
+
+func (c *composedBlock) Get() ([]byte, error) {
+ buf := bytes.NewBuffer(nil)
+ for _, ref := range c.refs {
+ var finalize finalizer.Finalizer
+
+ _, data, err := c.m.namespace.GetBlobData(digest.Digest(ref))
+ if err != nil {
+ return nil, err
+ }
+ finalize.Close(data)
+ r, err := data.Reader()
+ if err != nil {
+ return nil, err
+ }
+ finalize.Close(r)
+ _, err = io.Copy(buf, r)
+ if err != nil {
+ return nil, err
+ }
+ err = finalize.Finalize()
+ if err != nil {
+ return nil, err
+ }
+ }
+ return buf.Bytes(), nil
+}
+
+func (c *composedBlock) Reader() (io.ReadCloser, error) {
+ return &composedReader{
+ m: c.m,
+ refs: c.refs,
+ }, nil
+}
+
+func (c *composedBlock) Close() error {
+ return nil
+}
+
+type composedReader struct {
+ lock sync.Mutex
+ m *localBlobAccessMethod
+ refs []string
+ reader io.ReadCloser
+ data blobaccess.DataAccess
+}
+
+func (c *composedReader) Read(p []byte) (n int, err error) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ for {
+ if c.reader != nil {
+ n, err := c.reader.Read(p)
+
+ if err == io.EOF {
+ c.reader.Close()
+ c.data.Close()
+ c.refs = c.refs[1:]
+ c.reader = nil
+ c.data = nil
+ // start new layer and return partial (>0) read before next layer is started
+ err = nil
+ }
+ // return partial read (even a zero read if layer is not yet finished) or error
+ if c.reader != nil || err != nil || n > 0 {
+ return n, err
+ }
+ // otherwise, we can use the given buffer for the next layer
+
+ // now, we have to check for a next succeeding layer.
+ // This means to finish with the actual reader and continue
+ // with the next one.
+ }
+
+ // If no more layers are available, report EOF.
+ if len(c.refs) == 0 {
+ return 0, io.EOF
+ }
+
+ ref := strings.TrimSpace(c.refs[0])
+ _, c.data, err = c.m.namespace.GetBlobData(digest.Digest(ref))
+ if err != nil {
+ return 0, err
+ }
+ c.reader, err = c.data.Reader()
+ if err != nil {
+ return 0, err
+ }
+ }
+}
+
+func (c *composedReader) Close() error {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if c.reader == nil && c.refs == nil {
+ return os.ErrClosed
+ }
+ if c.reader != nil {
+ c.reader.Close()
+ c.data.Close()
+ c.reader = nil
+ c.refs = nil
+ }
+ return nil
+}
diff --git a/api/ocm/extensions/repositories/genericocireg/bloblimits.go b/api/ocm/extensions/repositories/genericocireg/bloblimits.go
new file mode 100644
index 0000000000..c4ecfc799a
--- /dev/null
+++ b/api/ocm/extensions/repositories/genericocireg/bloblimits.go
@@ -0,0 +1,53 @@
+package genericocireg
+
+import (
+ "sync"
+
+ configctx "ocm.software/ocm/api/config"
+ "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config"
+)
+
+var (
+ defaultBlobLimits config.BlobLimits
+ lock sync.Mutex
+)
+
+const (
+ KB = int64(1000)
+ MB = 1000 * KB
+ GB = 1000 * MB
+)
+
+func init() {
+ defaultBlobLimits = config.BlobLimits{}
+
+ // Add limits for known OCI repositories, here,
+ // or provide init functions in specialized packages
+ // by calling AddDefaultBlobLimit.
+ AddDefaultBlobLimit("ghcr.io", 10*GB) // https://github.com/orgs/community/discussions/77429
+}
+
+// AddDefaultBlobLimit can be used to set default blob limits
+// for known repositories.
+// Those limits will be overwritten, by blob limits
+// given by a configuration object and the repository
+// specification.
+func AddDefaultBlobLimit(name string, limit int64) {
+ lock.Lock()
+ defer lock.Unlock()
+
+ defaultBlobLimits[name] = limit
+}
+
+func ConfigureBlobLimits(ctx configctx.ContextProvider, target config.Configurable) {
+ if target != nil {
+ lock.Lock()
+ defer lock.Unlock()
+
+ target.ConfigureBlobLimits(defaultBlobLimits)
+
+ if ctx != nil {
+ ctx.ConfigContext().ApplyTo(0, target)
+ }
+ }
+}
diff --git a/api/ocm/extensions/repositories/genericocireg/componentversion.go b/api/ocm/extensions/repositories/genericocireg/componentversion.go
index f603fbe509..d332ca2527 100644
--- a/api/ocm/extensions/repositories/genericocireg/componentversion.go
+++ b/api/ocm/extensions/repositories/genericocireg/componentversion.go
@@ -2,13 +2,16 @@ package genericocireg
import (
"fmt"
+ "io"
"path"
"strings"
"github.com/mandelsoft/goutils/errors"
"github.com/mandelsoft/goutils/set"
+ "github.com/mandelsoft/vfs/pkg/vfs"
"github.com/opencontainers/go-digest"
+ "ocm.software/ocm/api/datacontext/attrs/vfsattr"
"ocm.software/ocm/api/oci"
"ocm.software/ocm/api/oci/artdesc"
"ocm.software/ocm/api/oci/extensions/repositories/artifactset"
@@ -25,7 +28,9 @@ import (
ocihdlr "ocm.software/ocm/api/ocm/extensions/blobhandler/handlers/oci"
"ocm.software/ocm/api/utils/accessio"
"ocm.software/ocm/api/utils/accessobj"
+ "ocm.software/ocm/api/utils/blobaccess"
"ocm.software/ocm/api/utils/errkind"
+ "ocm.software/ocm/api/utils/mime"
common "ocm.software/ocm/api/utils/misc"
"ocm.software/ocm/api/utils/refmgmt"
"ocm.software/ocm/api/utils/runtime"
@@ -183,11 +188,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
layers.Add(i)
}
for i, r := range desc.Resources {
- s, l, err := c.evalLayer(r.Access)
+ s, list, err := c.evalLayer(r.Access)
if err != nil {
return false, fmt.Errorf("failed resource layer evaluation: %w", err)
}
- if l > 0 {
+ for _, l := range list {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_RESOURCE,
Identity: r.GetIdentity(desc.Resources),
@@ -199,11 +204,11 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
}
}
for i, r := range desc.Sources {
- s, l, err := c.evalLayer(r.Access)
+ s, list, err := c.evalLayer(r.Access)
if err != nil {
return false, fmt.Errorf("failed source layer evaluation: %w", err)
}
- if l > 0 {
+ for _, l := range list {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Kind: ARTKIND_SOURCE,
Identity: r.GetIdentity(desc.Sources),
@@ -259,32 +264,45 @@ func (c *ComponentVersionContainer) Update() (bool, error) {
return false, nil
}
-func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) {
- var d *artdesc.Descriptor
+func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, []int, error) {
+ var (
+ d *artdesc.Descriptor
+ layernums []int
+ )
spec, err := c.GetContext().AccessSpecForSpec(s)
if err != nil {
- return s, 0, err
+ return s, nil, err
}
if a, ok := spec.(*localblob.AccessSpec); ok {
if ok, _ := artdesc.IsDigest(a.LocalReference); !ok {
- return s, 0, errors.ErrInvalid("digest", a.LocalReference)
+ return s, nil, errors.ErrInvalid("digest", a.LocalReference)
}
- d = &artdesc.Descriptor{Digest: digest.Digest(a.LocalReference), MediaType: a.GetMimeType()}
- }
- if d != nil {
- // find layer
- layers := c.manifest.GetDescriptor().Layers
- maxLen := len(layers) - 1
- for i := range layers {
- l := layers[len(layers)-1-i]
- if i < maxLen && l.Digest == d.Digest && (d.Digest == "" || d.Digest == l.Digest) {
- return s, len(layers) - 1 - i, nil
+ refs := strings.Split(a.LocalReference, ",")
+ media := a.GetMimeType()
+ if len(refs) > 1 {
+ media = mime.MIME_OCTET
+ }
+ for _, ref := range refs {
+ d = &artdesc.Descriptor{Digest: digest.Digest(strings.TrimSpace(ref)), MediaType: media}
+ // find layer
+ layers := c.manifest.GetDescriptor().Layers
+ maxLen := len(layers) - 1
+ found := false
+ for i := maxLen; i > 0; i-- { // layer 0 is the component descriptor
+ l := layers[i]
+ if l.Digest == d.Digest {
+ layernums = append(layernums, i)
+ found = true
+ break
+ }
+ }
+ if !found {
+ return s, nil, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType)
}
}
- return s, 0, fmt.Errorf("resource access %s: no layer found for local blob %s[%s]", spec.Describe(c.GetContext()), d.Digest, d.MediaType)
}
- return s, 0, nil
+ return s, layernums, nil
}
func (c *ComponentVersionContainer) GetDescriptor() *compdesc.ComponentDescriptor {
@@ -299,20 +317,74 @@ func (c *ComponentVersionContainer) GetStorageContext() cpi.StorageContext {
return ocihdlr.New(c.comp.GetName(), c.Repository(), c.comp.repo.ocirepo.GetSpecification().GetKind(), c.comp.repo.ocirepo, c.comp.namespace, c.manifest)
}
+func blobAccessForChunk(blob blobaccess.BlobAccess, fs vfs.FileSystem, r io.Reader, limit int64) (cpi.BlobAccess, bool, error) {
+ f, err := blobaccess.NewTempFile("", "chunk-*", fs)
+ if err != nil {
+ return nil, true, err
+ }
+ written, err := io.CopyN(f.Writer(), r, limit)
+ if err != nil && !errors.Is(err, io.EOF) {
+ f.Close()
+ return nil, false, err
+ }
+ if written <= 0 {
+ f.Close()
+ return nil, false, nil
+ }
+ return f.AsBlob(blob.MimeType()), written == limit, nil
+}
+
func (c *ComponentVersionContainer) AddBlob(blob cpi.BlobAccess, refName string, global cpi.AccessSpec) (cpi.AccessSpec, error) {
if blob == nil {
return nil, errors.New("a resource has to be defined")
}
+ fs := vfsattr.Get(c.GetContext())
+ size := blob.Size()
+ limit := c.comp.repo.blobLimit
+ var refs []string
+ if limit > 0 && size != blobaccess.BLOB_UNKNOWN_SIZE && size > limit {
+ reader, err := blob.Reader()
+ if err != nil {
+ return nil, err
+ }
+ defer reader.Close()
+
+ var b blobaccess.BlobAccess
+ cont := true
+ for cont {
+ b, cont, err = blobAccessForChunk(blob, fs, reader, limit)
+ if err != nil {
+ return nil, err
+ }
+ if b != nil {
+ err = c.addLayer(b, &refs)
+ b.Close()
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+ } else {
+ err := c.addLayer(blob, &refs)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return localblob.New(strings.Join(refs, ","), refName, blob.MimeType(), global), nil
+}
+
+func (c *ComponentVersionContainer) addLayer(blob cpi.BlobAccess, refs *[]string) error {
err := c.manifest.AddBlob(blob)
if err != nil {
- return nil, err
+ return err
}
err = ocihdlr.AssureLayer(c.manifest.GetDescriptor(), blob)
if err != nil {
- return nil, err
+ return err
}
- return localblob.New(blob.Digest().String(), refName, blob.MimeType(), global), nil
+ *refs = append(*refs, blob.Digest().String())
+ return nil
}
// AssureGlobalRef provides a global manifest for a local OCI Artifact.
diff --git a/api/ocm/extensions/repositories/genericocireg/config/type.go b/api/ocm/extensions/repositories/genericocireg/config/type.go
new file mode 100644
index 0000000000..137c1d2688
--- /dev/null
+++ b/api/ocm/extensions/repositories/genericocireg/config/type.go
@@ -0,0 +1,110 @@
+package config
+
+import (
+ "net"
+ "strings"
+
+ "ocm.software/ocm/api/config"
+ cfgcpi "ocm.software/ocm/api/config/cpi"
+ "ocm.software/ocm/api/utils/runtime"
+)
+
+const (
+ ConfigType = "blobLimits.ocireg.ocm" + cfgcpi.OCM_CONFIG_TYPE_SUFFIX
+ ConfigTypeV1 = ConfigType + runtime.VersionSeparator + "v1"
+)
+
+func init() {
+ cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigType, usage))
+ cfgcpi.RegisterConfigType(cfgcpi.NewConfigType[*Config](ConfigTypeV1))
+}
+
+// Config describes a memory based config interface
+// for configuring blob limits for underlying OCI manifest layers.
+type Config struct {
+ runtime.ObjectVersionedType `json:",inline"`
+ // BlobLimits describe the limit setting for host:port
+ // entries. As a special case (for testing) it is possible
+ // to configure limits for CTF, also, by using "@"+filepath.
+ BlobLimits BlobLimits `json:"blobLimits"`
+}
+
+type BlobLimits map[string]int64
+
+func (b BlobLimits) GetLimit(hostport string) int64 {
+ if b == nil {
+ return -1
+ }
+ l, ok := b[hostport]
+ if ok {
+ return l
+ }
+
+ if !strings.HasPrefix(hostport, "@") {
+ host, _, err := net.SplitHostPort(hostport)
+ if err == nil {
+ l, ok = b[host]
+ if ok {
+ return l
+ }
+ }
+ }
+ return -1
+}
+
+type Configurable interface {
+ ConfigureBlobLimits(limits BlobLimits)
+}
+
+// New creates a blob limit ConfigSpec.
+func New() *Config {
+ return &Config{
+ ObjectVersionedType: runtime.NewVersionedTypedObject(ConfigType),
+ }
+}
+
+func (a *Config) GetType() string {
+ return ConfigType
+}
+
+func (a *Config) AddLimit(hostport string, limit int64) {
+ if a.BlobLimits == nil {
+ a.BlobLimits = BlobLimits{}
+ }
+ a.BlobLimits[hostport] = limit
+}
+
+func (a *Config) ApplyTo(ctx config.Context, target interface{}) error {
+ t, ok := target.(Configurable)
+ if !ok {
+ return config.ErrNoContext(ConfigType)
+ }
+ if a.BlobLimits != nil {
+ t.ConfigureBlobLimits(a.BlobLimits)
+ }
+ return nil
+}
+
+const usage = `
+The config type ` + ConfigType + `
can be used to set some
+blob layer limits for particular OCI registries used to host OCM repositories.
+The blobLimits
field maps a OCI registry address to the blob limit to use:
+
+
+ type: ` + ConfigType + ` + blobLimits: + dummy.io: 65564 + dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit ++ +If blob limits apply to a registry, local blobs with a size larger than +the configured limit will be split into several layers with a maximum +size of the given value. + +These settings can be overwritten by explicit settings in an OCM +repository specification for those repositories. + +The most specific entry will be used. If a registry with a dedicated +port is requested, but no explicit configuration is found, the +setting for the sole hostname is used (if configured). +` diff --git a/api/ocm/extensions/repositories/genericocireg/config_test.go b/api/ocm/extensions/repositories/genericocireg/config_test.go new file mode 100644 index 0000000000..2435497a81 --- /dev/null +++ b/api/ocm/extensions/repositories/genericocireg/config_test.go @@ -0,0 +1,62 @@ +package genericocireg_test + +import ( + "reflect" + + . "github.com/mandelsoft/goutils/testutils" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "ocm.software/ocm/api/datacontext" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" + + "github.com/mandelsoft/goutils/finalizer" + "github.com/mandelsoft/vfs/pkg/osfs" + "github.com/mandelsoft/vfs/pkg/vfs" + "ocm.software/ocm/api/oci" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/ocm" + "ocm.software/ocm/api/ocm/cpi/repocpi" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg" + "ocm.software/ocm/api/utils/accessio" + "ocm.software/ocm/api/utils/accessobj" +) + +var _ = Describe("component repository mapping", func() { + var tempfs vfs.FileSystem + + var ocispec oci.RepositorySpec + var spec *genericocireg.RepositorySpec + + BeforeEach(func() { + t, err := osfs.NewTempFileSystem() + Expect(err).To(Succeed()) + tempfs = t + + // ocmlog.Context().AddRule(logging.NewConditionRule(logging.TraceLevel, accessio.ALLOC_REALM)) + + ocispec, err = ctf.NewRepositorySpec(accessobj.ACC_CREATE, "test", accessio.PathFileSystem(tempfs), accessobj.FormatDirectory) + Expect(err).To(Succeed()) + spec = genericocireg.NewRepositorySpec(ocispec, nil) + }) + + AfterEach(func() { + vfs.Cleanup(tempfs) + }) + + It("creates a dummy component with configured chunks", func() { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + ctx := ocm.New(datacontext.MODE_EXTENDED) + + cfg := config.New() + cfg.AddLimit("@test", 5) + ctx.ConfigContext().ApplyConfig(cfg, "direct") + + repo := finalizer.ClosingWith(&finalize, Must(ctx.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + + Expect(impl.(*genericocireg.RepositoryImpl).GetBlobLimit()).To(Equal(int64(5))) + }) +}) diff --git a/api/ocm/extensions/repositories/genericocireg/repo_test.go b/api/ocm/extensions/repositories/genericocireg/repo_test.go index 5ea992c69a..a892bf0f9c 100644 --- a/api/ocm/extensions/repositories/genericocireg/repo_test.go +++ b/api/ocm/extensions/repositories/genericocireg/repo_test.go @@ -2,6 +2,7 @@ package genericocireg_test import ( "fmt" + "io" "path" "reflect" @@ -123,6 +124,156 @@ var _ = Describe("component repository mapping", func() { MustBeSuccessful(finalize.Finalize()) }) + const ref4 = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08,sha256:3a6eb0790f39ac87c94f3856b2dd2c5d110e6811602261a9a923d3bb23adc8b7" + const ref5 = "sha256:a4853613b2a38568ed4e49196238152469097412d06d5e5fc9be8ab92cfdf2bf,sha256:977817f6f61f4dd501df3036a3e16b31452b36f4aa3edcf9a3f3242a79d7170d" + const ref8 = "sha256:" + ocmtesthelper.D_TESTDATA + + DescribeTable("creates a dummy component with chunks", func(limit int, f func(ocm.ResourceAccess), ref string) { + var finalize finalizer.Finalizer + defer Defer(finalize.Finalize) + + repo := finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + impl := Must(repocpi.GetRepositoryImplementation(repo)) + Expect(reflect.TypeOf(impl).String()).To(Equal("*genericocireg.RepositoryImpl")) + repocpi.SetBlobLimit(impl, int64(limit)) + + comp := finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers := finalizer.ClosingWith(&finalize, Must(comp.NewVersion("v1"))) + + m1 := compdesc.NewResourceMeta("rsc1", resourcetypes.PLAIN_TEXT, metav1.LocalRelation) + blob := blobaccess.ForString(mime.MIME_TEXT, ocmtesthelper.S_TESTDATA) + MustBeSuccessful(vers.SetResourceBlob(m1, blob, "", nil)) + + MustBeSuccessful(comp.AddVersion(vers)) + + noref := vers.Repository() + Expect(noref).NotTo(BeNil()) + Expect(noref.IsClosed()).To(BeFalse()) + Expect(noref.Close()).To(Succeed()) + Expect(noref.IsClosed()).To(BeFalse()) + + MustBeSuccessful(finalize.Finalize()) + + Expect(noref.IsClosed()).To(BeTrue()) + Expect(noref.Close()).To(MatchError("closed")) + ExpectError(noref.LookupComponent("dummy")).To(MatchError("closed")) + + // access it again + repo = finalizer.ClosingWith(&finalize, Must(DefaultContext.RepositoryForSpec(spec))) + + ok := Must(repo.ExistsComponentVersion(COMPONENT, "v1")) + Expect(ok).To(BeTrue()) + + comp = finalizer.ClosingWith(&finalize, Must(repo.LookupComponent(COMPONENT))) + vers = finalizer.ClosingWith(&finalize, Must(comp.LookupVersion("v1"))) + + rsc := Must(vers.GetResourceByIndex(0)) + acc := Must(rsc.Access()) + local, ok := acc.(*localblob.AccessSpec) + Expect(ok).To(BeTrue()) + // fmt.Printf("localref: %s\n", local.LocalReference) + Expect(local.LocalReference).To(Equal(ref)) + Expect(rsc.Meta().Digest).NotTo(BeNil()) + Expect(rsc.Meta().Digest.Value).To(Equal(ocmtesthelper.D_TESTDATA)) + + f(rsc) + + MustBeSuccessful(finalize.Finalize()) + }, + Entry("get blob", 5, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob", 5, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + Entry("stream blob with small buffer", 5, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref5), + + Entry("get blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + Entry("stream blob with small buffer (match limit)", len(ocmtesthelper.S_TESTDATA), func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref8), + + Entry("get blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + data := Must(ocmutils.GetResourceData(rsc)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + r := Must(ocmutils.GetResourceReader(rsc)) + data := Must(io.ReadAll(r)) + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + Entry("stream blob with small buffer (match limit/2)", len(ocmtesthelper.S_TESTDATA)/2, func(rsc ocm.ResourceAccess) { + var buf [2]byte + var data []byte + + r := Must(ocmutils.GetResourceReader(rsc)) + + for { + n, err := r.Read(buf[:]) + if n > 0 { + data = append(data, buf[:n]...) + } + if err != nil { + if err == io.EOF { + break + } else { + MustBeSuccessful(err) + } + } + } + Expect(string(data)).To(Equal(ocmtesthelper.S_TESTDATA)) + }, ref4), + ) + It("handles legacylocalociblob access method", func() { var finalize finalizer.Finalizer defer Defer(finalize.Finalize) diff --git a/api/ocm/extensions/repositories/genericocireg/repository.go b/api/ocm/extensions/repositories/genericocireg/repository.go index 70661f76f5..8773350ea5 100644 --- a/api/ocm/extensions/repositories/genericocireg/repository.go +++ b/api/ocm/extensions/repositories/genericocireg/repository.go @@ -11,12 +11,16 @@ import ( "github.com/mandelsoft/goutils/general" "ocm.software/ocm/api/credentials" + "ocm.software/ocm/api/credentials/identity/hostpath" "ocm.software/ocm/api/datacontext" "ocm.software/ocm/api/oci" ocicpi "ocm.software/ocm/api/oci/cpi" + "ocm.software/ocm/api/oci/extensions/repositories/ctf" + "ocm.software/ocm/api/oci/extensions/repositories/ocireg" "ocm.software/ocm/api/ocm/cpi" "ocm.software/ocm/api/ocm/cpi/repocpi" "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/componentmapping" + "ocm.software/ocm/api/ocm/extensions/repositories/genericocireg/config" ) type OCIBasedRepository interface { @@ -44,23 +48,69 @@ type RepositoryImpl struct { nonref cpi.Repository ocirepo oci.Repository readonly bool + // blobLimit is the size limit for layers maintained for the storage of localBlobs. + // The value -1 means an unconfigured value (a default from the blob limit configuration is used), + // a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), + // a value > 0 enabled the usage of the specified size. + blobLimit int64 } var ( _ repocpi.RepositoryImpl = (*RepositoryImpl)(nil) _ credentials.ConsumerIdentityProvider = (*RepositoryImpl)(nil) + _ config.Configurable = (*RepositoryImpl)(nil) ) -func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository) cpi.Repository { +// NewRepository creates a new OCM repository based on any OCI abstraction from +// the OCI context type. +// The optional blobLimit is the size limit for layers maintained for the storage of localBlobs. +// The value -1 means an unconfigured value (a default from the blob limit configuration is used), +// a value == 0 disables the limiting and (a default from the blob limit configuration is ignored), +// a value > 0 enabled the usage of the specified size. +func NewRepository(ctxp cpi.ContextProvider, meta *ComponentRepositoryMeta, ocirepo oci.Repository, blobLimit ...int64) cpi.Repository { ctx := datacontext.InternalContextRef(ctxp.OCMContext()) + impl := &RepositoryImpl{ - ctx: ctx, - meta: *DefaultComponentRepositoryMeta(meta), - ocirepo: ocirepo, + ctx: ctx, + meta: *DefaultComponentRepositoryMeta(meta), + ocirepo: ocirepo, + blobLimit: general.OptionalDefaulted(-1, blobLimit...), + } + if impl.blobLimit < 0 { + ConfigureBlobLimits(ctxp.OCMContext(), impl) } return repocpi.NewRepository(impl, "OCM repo[OCI]") } +func (r *RepositoryImpl) ConfigureBlobLimits(limits config.BlobLimits) { + if len(limits) == 0 { + return + } + if spec, ok := r.ocirepo.GetSpecification().(*ocireg.RepositorySpec); ok { + id := spec.GetConsumerId() + hp := hostpath.HostPort(id) + l := limits.GetLimit(hp) + if l >= 0 { + r.blobLimit = l + } + } + if spec, ok := r.ocirepo.GetSpecification().(*ctf.RepositorySpec); ok { + l := limits.GetLimit("@" + spec.FilePath) + if l >= 0 { + r.blobLimit = l + } + } +} + +func (r *RepositoryImpl) SetBlobLimit(s int64) bool { + r.blobLimit = s + return true +} + +func (r *RepositoryImpl) GetBlobLimit() int64 { + return r.blobLimit +} + func (r *RepositoryImpl) Close() error { return r.ocirepo.Close() } diff --git a/api/ocm/extensions/repositories/genericocireg/type.go b/api/ocm/extensions/repositories/genericocireg/type.go index 809d598140..b3c4b460e5 100644 --- a/api/ocm/extensions/repositories/genericocireg/type.go +++ b/api/ocm/extensions/repositories/genericocireg/type.go @@ -91,6 +91,7 @@ func NewComponentRepositoryMeta(subPath string, mapping ComponentNameMapping) *C type RepositorySpec struct { oci.RepositorySpec ComponentRepositoryMeta + BlobLimit *int64 } var ( @@ -127,19 +128,28 @@ func (a *RepositorySpec) AsUniformSpec(cpi.Context) *cpi.UniformRepositorySpec { return &cpi.UniformRepositorySpec{Type: a.GetKind(), Scheme: spec.Scheme, Host: spec.Host, Info: spec.Info, TypeHint: spec.TypeHint, SubPath: a.SubPath} } +type meta struct { + ComponentRepositoryMeta `json:",inline"` + BlobLimit *int64 `json:"blobLimit,omitempty"` +} + func (u *RepositorySpec) UnmarshalJSON(data []byte) error { logrus.Debugf("unmarshal generic ocireg spec %s\n", string(data)) ocispec := &oci.GenericRepositorySpec{} if err := json.Unmarshal(data, ocispec); err != nil { return err } - compmeta := &ComponentRepositoryMeta{} - if err := json.Unmarshal(data, ocispec); err != nil { + + var m meta + if err := json.Unmarshal(data, &m); err != nil { return err } u.RepositorySpec = ocispec - u.ComponentRepositoryMeta = *compmeta + u.ComponentRepositoryMeta = m.ComponentRepositoryMeta + if m.BlobLimit != nil { + u.BlobLimit = m.BlobLimit + } normalizers.Normalize(u) return nil @@ -154,7 +164,12 @@ func (u RepositorySpec) MarshalJSON() ([]byte, error) { if err != nil { return nil, err } - compmeta, err := runtime.ToUnstructuredObject(u.ComponentRepositoryMeta) + + m := meta{ + ComponentRepositoryMeta: u.ComponentRepositoryMeta, + BlobLimit: u.BlobLimit, + } + compmeta, err := runtime.ToUnstructuredObject(&m) if err != nil { return nil, err } @@ -166,6 +181,9 @@ func (s *RepositorySpec) Repository(ctx cpi.Context, creds credentials.Credentia if err != nil { return nil, err } + if s.BlobLimit != nil { + return NewRepository(ctx, &s.ComponentRepositoryMeta, r, *s.BlobLimit), nil + } return NewRepository(ctx, &s.ComponentRepositoryMeta, r), nil } diff --git a/docs/reference/ocm_configfile.md b/docs/reference/ocm_configfile.md index ae9e51920d..4f5282064d 100644 --- a/docs/reference/ocm_configfile.md +++ b/docs/reference/ocm_configfile.md @@ -24,6 +24,28 @@ The following configuration types are supported: <name>: <yaml defining the attribute> ... +-
blobLimits.ocireg.ocm.config.ocm.software
+ The config type blobLimits.ocireg.ocm.config.ocm.software
can be used to set some
+ blob layer limits for particular OCI registries used to host OCM repositories.
+ The blobLimits
field maps a OCI registry address to the blob limit to use:
+
+ + type: blobLimits.ocireg.ocm.config.ocm.software + blobLimits: + dummy.io: 65564 + dummy.io:8443: 32768 // with :8443 specifying the port and 32768 specifying the byte limit ++ + If blob limits apply to a registry, local blobs with a size larger than + the configured limit will be split into several layers with a maximum + size of the given value. + + These settings can be overwritten by explicit settings in an OCM + repository specification for those repositories. + + The most specific entry will be used. If a registry with a dedicated + port is requested, but no explicit configuration is found, the + setting for the sole hostname is used (if configured). -
cli.ocm.config.ocm.software
The config type cli.ocm.config.ocm.software
is used to handle the
main configuration flags of the OCM command line tool.